Skip to content

Event Sourcing in ConnectSoft Microservice Template

Purpose & Overview

Event Sourcing is an architectural pattern where state changes are stored as a sequence of events rather than as current state snapshots. In the ConnectSoft Microservice Template, Event Sourcing can be implemented to provide complete audit trails, enable time travel, and support advanced features like event replay and projection building.

Why Event Sourcing?

Event Sourcing provides several powerful benefits:

  • Complete Audit Trail: Every state change is recorded as an event
  • Time Travel: Reconstruct state at any point in time
  • Event Replay: Reprocess events to rebuild read models or fix issues
  • Flexibility: Build new read models from existing event history
  • Debugging: Understand exactly what happened and in what order
  • Compliance: Meet regulatory requirements for auditability
  • Eventual Consistency: Natural fit with CQRS and event-driven architecture

How It's Implemented in Template

Architecture Overview

Event Sourcing can be layered on top of the existing template architecture:

Domain Layer
    ├── Aggregate Root (raises domain events)
    ├── Domain Events
    └── Event Store (persists events)
        ↓
Event Stream
    ├── Event 1: AggregateCreated
    ├── Event 2: AggregateUpdated
    └── Event 3: AggregateCompleted
        ↓
Projection Layer
    ├── Read Model 1 (Current State)
    ├── Read Model 2 (Denormalized View)
    └── Read Model 3 (Analytics View)

Core Components

1. Domain Events

The template already has domain events in MessagingModel:

// MicroserviceAggregateRootCreatedEvent.cs
public class MicroserviceAggregateRootCreatedEvent : IEvent
{
    [Required]
    [NotDefault]
    public Guid ObjectId { get; set; }
}

2. Event Store Interface

Event Store interface for persisting and retrieving events:

public interface IEventStore
{
    Task AppendEventsAsync<T>(
        Guid aggregateId,
        IEnumerable<IDomainEvent> events,
        int expectedVersion,
        CancellationToken ct = default);

    Task<IEnumerable<IDomainEvent>> GetEventsAsync<T>(
        Guid aggregateId,
        CancellationToken ct = default);

    Task<IEnumerable<IDomainEvent>> GetEventsAsync(
        Guid aggregateId,
        long fromVersion,
        CancellationToken ct = default);
}

public interface IDomainEvent
{
    Guid EventId { get; }
    Guid AggregateId { get; }
    DateTimeOffset OccurredAt { get; }
    long Version { get; }
}

3. Event Sourced Aggregate Root

Aggregate root that stores and replays events:

public abstract class EventSourcedAggregateRoot<TId> : AggregateRoot<TId>
{
    private readonly List<IDomainEvent> uncommittedEvents = new();
    private long version = 0;

    protected EventSourcedAggregateRoot(TId id) : base(id)
    {
    }

    // Replay events to rebuild state
    public void LoadFromHistory(IEnumerable<IDomainEvent> events)
    {
        foreach (var @event in events)
        {
            ApplyEvent(@event, isReplay: true);
            version = @event.Version;
        }
    }

    // Get uncommitted events
    public IEnumerable<IDomainEvent> GetUncommittedEvents()
    {
        return uncommittedEvents;
    }

    // Mark events as committed
    public void MarkEventsAsCommitted()
    {
        uncommittedEvents.Clear();
    }

    // Raise domain event
    protected void RaiseEvent(IDomainEvent @event)
    {
        uncommittedEvents.Add(@event);
        ApplyEvent(@event, isReplay: false);
    }

    // Apply event (template method)
    protected abstract void ApplyEvent(IDomainEvent @event, bool isReplay);
}

4. Event Store Implementation

Example implementation using database:

public class SqlEventStore : IEventStore
{
    private readonly IDbConnection connection;
    private readonly IEventSerializer serializer;

    public async Task AppendEventsAsync<T>(
        Guid aggregateId,
        IEnumerable<IDomainEvent> events,
        int expectedVersion,
        CancellationToken ct)
    {
        // Check optimistic concurrency
        var currentVersion = await GetCurrentVersionAsync(aggregateId, ct);
        if (currentVersion != expectedVersion)
        {
            throw new ConcurrencyException($"Expected version {expectedVersion}, got {currentVersion}");
        }

        // Append events
        foreach (var @event in events)
        {
            await InsertEventAsync(aggregateId, @event, expectedVersion + 1, ct);
            expectedVersion++;
        }
    }

    public async Task<IEnumerable<IDomainEvent>> GetEventsAsync(
        Guid aggregateId,
        CancellationToken ct)
    {
        var eventData = await connection.QueryAsync<EventData>(
            "SELECT * FROM Events WHERE AggregateId = @AggregateId ORDER BY Version",
            new { AggregateId = aggregateId });

        return eventData.Select(e => serializer.Deserialize(e.Type, e.Data));
    }

    private async Task InsertEventAsync(
        Guid aggregateId,
        IDomainEvent @event,
        long version,
        CancellationToken ct)
    {
        var eventData = new EventData
        {
            EventId = @event.EventId,
            AggregateId = aggregateId,
            EventType = @event.GetType().AssemblyQualifiedName,
            Data = serializer.Serialize(@event),
            Version = version,
            OccurredAt = @event.OccurredAt
        };

        await connection.ExecuteAsync(
            @"INSERT INTO Events (EventId, AggregateId, EventType, Data, Version, OccurredAt)
              VALUES (@EventId, @AggregateId, @EventType, @Data, @Version, @OccurredAt)",
            eventData);
    }
}

Project Structure

Component Responsibility
MessagingModel Domain event definitions
PersistenceModel.EventStore Event store interface and implementations
DomainModel Event-sourced aggregate roots
ProjectionModel Read model projections from events

Code Examples

Event-Sourced Aggregate

Aggregate Root Implementation

public class OrderAggregate : EventSourcedAggregateRoot<Guid>
{
    public OrderStatus Status { get; private set; }
    public decimal TotalAmount { get; private set; }
    public Guid CustomerId { get; private set; }

    // Constructor for creating new aggregate
    public OrderAggregate(Guid orderId, Guid customerId, decimal amount)
        : base(orderId)
    {
        RaiseEvent(new OrderCreatedEvent
        {
            OrderId = orderId,
            CustomerId = customerId,
            TotalAmount = amount,
            OccurredAt = DateTimeOffset.UtcNow
        });
    }

    // Constructor for loading from history
    public OrderAggregate(Guid orderId, IEnumerable<IDomainEvent> history)
        : base(orderId)
    {
        LoadFromHistory(history);
    }

    public void ProcessPayment(Guid paymentId, decimal amount)
    {
        if (Status != OrderStatus.Created)
            throw new InvalidOperationException("Order must be in Created status");

        RaiseEvent(new PaymentProcessedEvent
        {
            OrderId = Id,
            PaymentId = paymentId,
            Amount = amount,
            OccurredAt = DateTimeOffset.UtcNow
        });
    }

    public void Complete()
    {
        if (Status != OrderStatus.PaymentReceived)
            throw new InvalidOperationException("Order must have payment received");

        RaiseEvent(new OrderCompletedEvent
        {
            OrderId = Id,
            CompletedAt = DateTimeOffset.UtcNow
        });
    }

    protected override void ApplyEvent(IDomainEvent @event, bool isReplay)
    {
        switch (@event)
        {
            case OrderCreatedEvent e:
                Status = OrderStatus.Created;
                CustomerId = e.CustomerId;
                TotalAmount = e.TotalAmount;
                break;

            case PaymentProcessedEvent e:
                Status = OrderStatus.PaymentReceived;
                break;

            case OrderCompletedEvent e:
                Status = OrderStatus.Completed;
                break;
        }
    }
}

Saving Events

Repository with Event Store

public class EventSourcedRepository<TAggregate, TId> 
    where TAggregate : EventSourcedAggregateRoot<TId>
{
    private readonly IEventStore eventStore;
    private readonly Func<TId, IEnumerable<IDomainEvent>, TAggregate> aggregateFactory;

    public async Task<TAggregate> GetByIdAsync(TId id, CancellationToken ct)
    {
        var events = await eventStore.GetEventsAsync(id, ct);

        if (!events.Any())
            return null;

        return aggregateFactory(id, events);
    }

    public async Task SaveAsync(TAggregate aggregate, CancellationToken ct)
    {
        var uncommittedEvents = aggregate.GetUncommittedEvents().ToList();

        if (!uncommittedEvents.Any())
            return;

        var expectedVersion = aggregate.Version - uncommittedEvents.Count;

        await eventStore.AppendEventsAsync<TAggregate>(
            aggregate.Id,
            uncommittedEvents,
            expectedVersion,
            ct);

        aggregate.MarkEventsAsCommitted();
    }
}

Event Projections

Building Read Models from Events

public class OrderReadModelProjector
{
    private readonly IOrderReadModelRepository readModelRepository;

    public async Task Project(OrderCreatedEvent @event, CancellationToken ct)
    {
        var readModel = new OrderReadModel
        {
            OrderId = @event.OrderId,
            CustomerId = @event.CustomerId,
            TotalAmount = @event.TotalAmount,
            Status = "Created",
            CreatedAt = @event.OccurredAt
        };

        await readModelRepository.SaveAsync(readModel, ct);
    }

    public async Task Project(PaymentProcessedEvent @event, CancellationToken ct)
    {
        var readModel = await readModelRepository.GetByIdAsync(@event.OrderId, ct);
        if (readModel != null)
        {
            readModel.Status = "PaymentReceived";
            readModel.PaymentId = @event.PaymentId;
            readModel.PaymentProcessedAt = @event.OccurredAt;

            await readModelRepository.SaveAsync(readModel, ct);
        }
    }

    public async Task Project(OrderCompletedEvent @event, CancellationToken ct)
    {
        var readModel = await readModelRepository.GetByIdAsync(@event.OrderId, ct);
        if (readModel != null)
        {
            readModel.Status = "Completed";
            readModel.CompletedAt = @event.OccurredAt;

            await readModelRepository.SaveAsync(readModel, ct);
        }
    }
}

Event Replay

Rebuilding Aggregates from Event Stream

public class AggregateRebuilder
{
    private readonly IEventStore eventStore;

    public async Task<TAggregate> RebuildAggregateAsync<TAggregate>(
        Guid aggregateId,
        Func<Guid, IEnumerable<IDomainEvent>, TAggregate> factory,
        CancellationToken ct)
        where TAggregate : EventSourcedAggregateRoot<Guid>
    {
        // Get all events for aggregate
        var events = await eventStore.GetEventsAsync(aggregateId, ct);

        // Rebuild aggregate from events
        return factory(aggregateId, events);
    }

    public async Task RebuildReadModelAsync(
        Guid aggregateId,
        IReadModelProjector projector,
        CancellationToken ct)
    {
        // Get all events
        var events = await eventStore.GetEventsAsync(aggregateId, ct);

        // Replay events to projector
        foreach (var @event in events)
        {
            await projector.ProjectAsync(@event, ct);
        }
    }
}

Snapshot Support

Optimizing with Snapshots

public class SnapshotStore
{
    public async Task SaveSnapshotAsync(
        Guid aggregateId,
        object snapshot,
        long version,
        CancellationToken ct)
    {
        // Save snapshot with version
        await connection.ExecuteAsync(
            @"INSERT INTO Snapshots (AggregateId, Data, Version, CreatedAt)
              VALUES (@AggregateId, @Data, @Version, @CreatedAt)",
            new
            {
                AggregateId = aggregateId,
                Data = serializer.Serialize(snapshot),
                Version = version,
                CreatedAt = DateTimeOffset.UtcNow
            });
    }

    public async Task<(object Snapshot, long Version)> GetLatestSnapshotAsync(
        Guid aggregateId,
        CancellationToken ct)
    {
        var snapshotData = await connection.QueryFirstOrDefaultAsync<SnapshotData>(
            "SELECT * FROM Snapshots WHERE AggregateId = @AggregateId ORDER BY Version DESC",
            new { AggregateId = aggregateId });

        if (snapshotData == null)
            return (null, 0);

        var snapshot = serializer.Deserialize(snapshotData.Data);
        return (snapshot, snapshotData.Version);
    }
}

// Repository using snapshots
public async Task<TAggregate> GetByIdAsync(TId id, CancellationToken ct)
{
    // Try to load from snapshot first
    var (snapshot, snapshotVersion) = await snapshotStore.GetLatestSnapshotAsync(id, ct);

    IEnumerable<IDomainEvent> events;

    if (snapshot != null)
    {
        // Load events after snapshot
        events = await eventStore.GetEventsAsync(id, snapshotVersion + 1, ct);
    }
    else
    {
        // Load all events
        events = await eventStore.GetEventsAsync(id, ct);
    }

    // Rebuild aggregate
    var aggregate = factory(id, events);

    if (snapshot != null)
    {
        aggregate.RestoreFromSnapshot(snapshot);
    }

    return aggregate;
}

Configuration

Event Store Setup

// Dependency Injection
services.AddScoped<IEventStore, SqlEventStore>();
services.AddScoped<IEventSerializer, JsonEventSerializer>();
services.AddScoped<ISnapshotStore, SqlSnapshotStore>();

// Event Store Repository
services.AddScoped<IEventSourcedRepository<OrderAggregate, Guid>>(
    provider => new EventSourcedRepository<OrderAggregate, Guid>(
        provider.GetRequiredService<IEventStore>(),
        (id, events) => new OrderAggregate(id, events)));

Event Stream Table

CREATE TABLE Events (
    EventId UNIQUEIDENTIFIER PRIMARY KEY,
    AggregateId UNIQUEIDENTIFIER NOT NULL,
    AggregateType NVARCHAR(500) NOT NULL,
    EventType NVARCHAR(500) NOT NULL,
    EventData NVARCHAR(MAX) NOT NULL,
    Version BIGINT NOT NULL,
    OccurredAt DATETIME2 NOT NULL,
    Metadata NVARCHAR(MAX) NULL,

    CONSTRAINT UK_Events_AggregateId_Version 
        UNIQUE (AggregateId, Version)
);

CREATE INDEX IX_Events_AggregateId 
    ON Events (AggregateId, Version);

Best Practices

Do's

  1. Make events immutable
  2. Events represent facts that happened
  3. Never modify events after they're created
  4. Version events if schema needs to change

  5. Store events in order

  6. Maintain strict ordering by version
  7. Use optimistic concurrency for appends
  8. Enforce ordering at database level

  9. Use snapshots for performance

  10. Create snapshots periodically for long event streams
  11. Balance snapshot frequency vs. storage cost
  12. Rebuild from snapshot + events after snapshot

  13. Project events asynchronously

  14. Build read models from events asynchronously
  15. Don't block command processing for projections
  16. Handle projection failures gracefully

  17. Version events carefully

  18. Add new event types rather than modifying existing
  19. Support multiple event versions during migration
  20. Document event schemas clearly

Don'ts

  1. Don't store mutable state in events
  2. Events should be immutable facts
  3. Don't include calculated fields that might change
  4. Store what happened, not derived state

  5. Don't query event store for reads

  6. Use read models for queries
  7. Event store is optimized for writes
  8. Build projections for query needs

  9. Don't delete events

  10. Events are immutable facts
  11. Use event versioning instead
  12. Archive old events if needed, don't delete

  13. Don't mix event sourcing with traditional persistence

  14. Choose one pattern per aggregate
  15. Don't try to sync both approaches
  16. Be consistent within aggregate boundaries

  17. Don't forget about event migration

  18. Plan for event schema evolution
  19. Support reading old event versions
  20. Have migration strategy for event changes

Integration Points

CQRS Pattern

Event Sourcing pairs naturally with CQRS:

// Command side: Store events
public async Task CreateOrder(CreateOrderCommand command)
{
    var order = new OrderAggregate(command.OrderId, command.CustomerId, command.Amount);
    await eventStoreRepository.SaveAsync(order, ct);
}

// Query side: Read from projections
public async Task<OrderReadModel> GetOrder(Guid orderId, CancellationToken ct)
{
    return await readModelRepository.GetByIdAsync(orderId, ct);
}

Domain Events

Domain events in template form foundation for Event Sourcing:

// Template domain events can be persisted to event store
public class OrderCreatedEvent : IDomainEvent
{
    public Guid EventId { get; set; }
    public Guid AggregateId { get; set; }
    public DateTimeOffset OccurredAt { get; set; }
    public long Version { get; set; }
    // Additional event data
}

Saga Pattern

Sagas can be event-sourced:

// Saga state can be rebuilt from events
public class OrderProcessingSaga : EventSourcedAggregateRoot<Guid>
{
    public void Handle(OrderCreatedEvent @event)
    {
        RaiseEvent(new SagaStartedEvent { OrderId = @event.OrderId });
    }
}

Common Scenarios

Scenario 1: Full Event Sourcing

// Create event-sourced aggregate
var order = new OrderAggregate(orderId, customerId, amount);

// Process commands
order.ProcessPayment(paymentId, amount);
order.Complete();

// Save events
await eventStoreRepository.SaveAsync(order, ct);

// Rebuild from events later
var rebuiltOrder = await eventStoreRepository.GetByIdAsync(orderId, ct);

Scenario 2: Event Sourcing with Snapshots

// Periodically create snapshots
if (aggregate.Version % 100 == 0)
{
    var snapshot = aggregate.CreateSnapshot();
    await snapshotStore.SaveSnapshotAsync(aggregate.Id, snapshot, aggregate.Version, ct);
}

// Load with snapshot optimization
var aggregate = await repository.GetByIdAsync(id, ct);
// Automatically uses snapshot + events

Scenario 3: Event Replay for Projections

// Rebuild read model from events
public async Task RebuildReadModel(Guid aggregateId, CancellationToken ct)
{
    var events = await eventStore.GetEventsAsync(aggregateId, ct);

    foreach (var @event in events)
    {
        await projector.ProjectAsync(@event, ct);
    }
}

Scenario 4: Time Travel Query

// Get state at specific point in time
public async Task<OrderAggregate> GetOrderAtTime(
    Guid orderId, 
    DateTimeOffset asOf,
    CancellationToken ct)
{
    // Get events up to specific time
    var events = await eventStore.GetEventsAsync(
        orderId,
        until: asOf,
        ct);

    // Rebuild aggregate state
    return new OrderAggregate(orderId, events);
}

Troubleshooting

Issue: Event stream too long, slow rebuild

Cause: Aggregates with many events take time to rebuild.

Solution: - Implement snapshots - Create snapshots periodically (e.g., every 100 events) - Load from snapshot + events after snapshot

Issue: Events not in order

Cause: Concurrent writes or missing ordering constraints.

Solution: - Add unique constraint on (AggregateId, Version) - Use optimistic concurrency (expected version) - Handle concurrency exceptions properly

Issue: Read models out of sync

Cause: Projection processing failed or delayed.

Solution: - Monitor projection processing - Implement projection replay - Handle projection failures and retry

Issue: Event schema changes

Cause: Need to evolve event structure over time.

Solution: - Version events (EventV1, EventV2) - Support multiple versions during migration - Use upcasters to convert old events to new format

References

Summary

Event Sourcing in ConnectSoft Microservice Template:

  • Provides complete audit trail of all state changes
  • Enables time travel and event replay
  • Supports flexible read models through projections
  • Works naturally with CQRS pattern
  • Pairs with existing domain events
  • Requires event store for persistence
  • Benefits from snapshots for performance
  • Enables event replay for debugging and recovery

This pattern is powerful for systems requiring complete auditability, time travel capabilities, and flexible read model generation, though it adds complexity and should be chosen when these benefits outweigh the costs.