Event Sourcing in ConnectSoft Microservice Template¶
Purpose & Overview¶
Event Sourcing is an architectural pattern where state changes are stored as a sequence of events rather than as current state snapshots. In the ConnectSoft Microservice Template, Event Sourcing can be implemented to provide complete audit trails, enable time travel, and support advanced features like event replay and projection building.
Why Event Sourcing?¶
Event Sourcing provides several powerful benefits:
- Complete Audit Trail: Every state change is recorded as an event
- Time Travel: Reconstruct state at any point in time
- Event Replay: Reprocess events to rebuild read models or fix issues
- Flexibility: Build new read models from existing event history
- Debugging: Understand exactly what happened and in what order
- Compliance: Meet regulatory requirements for auditability
- Eventual Consistency: Natural fit with CQRS and event-driven architecture
How It's Implemented in Template¶
Architecture Overview¶
Event Sourcing can be layered on top of the existing template architecture:
Domain Layer
├── Aggregate Root (raises domain events)
├── Domain Events
└── Event Store (persists events)
↓
Event Stream
├── Event 1: AggregateCreated
├── Event 2: AggregateUpdated
└── Event 3: AggregateCompleted
↓
Projection Layer
├── Read Model 1 (Current State)
├── Read Model 2 (Denormalized View)
└── Read Model 3 (Analytics View)
Core Components¶
1. Domain Events¶
The template already has domain events in MessagingModel:
// MicroserviceAggregateRootCreatedEvent.cs
public class MicroserviceAggregateRootCreatedEvent : IEvent
{
[Required]
[NotDefault]
public Guid ObjectId { get; set; }
}
2. Event Store Interface¶
Event Store interface for persisting and retrieving events:
public interface IEventStore
{
Task AppendEventsAsync<T>(
Guid aggregateId,
IEnumerable<IDomainEvent> events,
int expectedVersion,
CancellationToken ct = default);
Task<IEnumerable<IDomainEvent>> GetEventsAsync<T>(
Guid aggregateId,
CancellationToken ct = default);
Task<IEnumerable<IDomainEvent>> GetEventsAsync(
Guid aggregateId,
long fromVersion,
CancellationToken ct = default);
}
public interface IDomainEvent
{
Guid EventId { get; }
Guid AggregateId { get; }
DateTimeOffset OccurredAt { get; }
long Version { get; }
}
3. Event Sourced Aggregate Root¶
Aggregate root that stores and replays events:
public abstract class EventSourcedAggregateRoot<TId> : AggregateRoot<TId>
{
private readonly List<IDomainEvent> uncommittedEvents = new();
private long version = 0;
protected EventSourcedAggregateRoot(TId id) : base(id)
{
}
// Replay events to rebuild state
public void LoadFromHistory(IEnumerable<IDomainEvent> events)
{
foreach (var @event in events)
{
ApplyEvent(@event, isReplay: true);
version = @event.Version;
}
}
// Get uncommitted events
public IEnumerable<IDomainEvent> GetUncommittedEvents()
{
return uncommittedEvents;
}
// Mark events as committed
public void MarkEventsAsCommitted()
{
uncommittedEvents.Clear();
}
// Raise domain event
protected void RaiseEvent(IDomainEvent @event)
{
uncommittedEvents.Add(@event);
ApplyEvent(@event, isReplay: false);
}
// Apply event (template method)
protected abstract void ApplyEvent(IDomainEvent @event, bool isReplay);
}
4. Event Store Implementation¶
Example implementation using database:
public class SqlEventStore : IEventStore
{
private readonly IDbConnection connection;
private readonly IEventSerializer serializer;
public async Task AppendEventsAsync<T>(
Guid aggregateId,
IEnumerable<IDomainEvent> events,
int expectedVersion,
CancellationToken ct)
{
// Check optimistic concurrency
var currentVersion = await GetCurrentVersionAsync(aggregateId, ct);
if (currentVersion != expectedVersion)
{
throw new ConcurrencyException($"Expected version {expectedVersion}, got {currentVersion}");
}
// Append events
foreach (var @event in events)
{
await InsertEventAsync(aggregateId, @event, expectedVersion + 1, ct);
expectedVersion++;
}
}
public async Task<IEnumerable<IDomainEvent>> GetEventsAsync(
Guid aggregateId,
CancellationToken ct)
{
var eventData = await connection.QueryAsync<EventData>(
"SELECT * FROM Events WHERE AggregateId = @AggregateId ORDER BY Version",
new { AggregateId = aggregateId });
return eventData.Select(e => serializer.Deserialize(e.Type, e.Data));
}
private async Task InsertEventAsync(
Guid aggregateId,
IDomainEvent @event,
long version,
CancellationToken ct)
{
var eventData = new EventData
{
EventId = @event.EventId,
AggregateId = aggregateId,
EventType = @event.GetType().AssemblyQualifiedName,
Data = serializer.Serialize(@event),
Version = version,
OccurredAt = @event.OccurredAt
};
await connection.ExecuteAsync(
@"INSERT INTO Events (EventId, AggregateId, EventType, Data, Version, OccurredAt)
VALUES (@EventId, @AggregateId, @EventType, @Data, @Version, @OccurredAt)",
eventData);
}
}
Project Structure¶
| Component | Responsibility |
|---|---|
MessagingModel |
Domain event definitions |
PersistenceModel.EventStore |
Event store interface and implementations |
DomainModel |
Event-sourced aggregate roots |
ProjectionModel |
Read model projections from events |
Code Examples¶
Event-Sourced Aggregate¶
Aggregate Root Implementation¶
public class OrderAggregate : EventSourcedAggregateRoot<Guid>
{
public OrderStatus Status { get; private set; }
public decimal TotalAmount { get; private set; }
public Guid CustomerId { get; private set; }
// Constructor for creating new aggregate
public OrderAggregate(Guid orderId, Guid customerId, decimal amount)
: base(orderId)
{
RaiseEvent(new OrderCreatedEvent
{
OrderId = orderId,
CustomerId = customerId,
TotalAmount = amount,
OccurredAt = DateTimeOffset.UtcNow
});
}
// Constructor for loading from history
public OrderAggregate(Guid orderId, IEnumerable<IDomainEvent> history)
: base(orderId)
{
LoadFromHistory(history);
}
public void ProcessPayment(Guid paymentId, decimal amount)
{
if (Status != OrderStatus.Created)
throw new InvalidOperationException("Order must be in Created status");
RaiseEvent(new PaymentProcessedEvent
{
OrderId = Id,
PaymentId = paymentId,
Amount = amount,
OccurredAt = DateTimeOffset.UtcNow
});
}
public void Complete()
{
if (Status != OrderStatus.PaymentReceived)
throw new InvalidOperationException("Order must have payment received");
RaiseEvent(new OrderCompletedEvent
{
OrderId = Id,
CompletedAt = DateTimeOffset.UtcNow
});
}
protected override void ApplyEvent(IDomainEvent @event, bool isReplay)
{
switch (@event)
{
case OrderCreatedEvent e:
Status = OrderStatus.Created;
CustomerId = e.CustomerId;
TotalAmount = e.TotalAmount;
break;
case PaymentProcessedEvent e:
Status = OrderStatus.PaymentReceived;
break;
case OrderCompletedEvent e:
Status = OrderStatus.Completed;
break;
}
}
}
Saving Events¶
Repository with Event Store¶
public class EventSourcedRepository<TAggregate, TId>
where TAggregate : EventSourcedAggregateRoot<TId>
{
private readonly IEventStore eventStore;
private readonly Func<TId, IEnumerable<IDomainEvent>, TAggregate> aggregateFactory;
public async Task<TAggregate> GetByIdAsync(TId id, CancellationToken ct)
{
var events = await eventStore.GetEventsAsync(id, ct);
if (!events.Any())
return null;
return aggregateFactory(id, events);
}
public async Task SaveAsync(TAggregate aggregate, CancellationToken ct)
{
var uncommittedEvents = aggregate.GetUncommittedEvents().ToList();
if (!uncommittedEvents.Any())
return;
var expectedVersion = aggregate.Version - uncommittedEvents.Count;
await eventStore.AppendEventsAsync<TAggregate>(
aggregate.Id,
uncommittedEvents,
expectedVersion,
ct);
aggregate.MarkEventsAsCommitted();
}
}
Event Projections¶
Building Read Models from Events¶
public class OrderReadModelProjector
{
private readonly IOrderReadModelRepository readModelRepository;
public async Task Project(OrderCreatedEvent @event, CancellationToken ct)
{
var readModel = new OrderReadModel
{
OrderId = @event.OrderId,
CustomerId = @event.CustomerId,
TotalAmount = @event.TotalAmount,
Status = "Created",
CreatedAt = @event.OccurredAt
};
await readModelRepository.SaveAsync(readModel, ct);
}
public async Task Project(PaymentProcessedEvent @event, CancellationToken ct)
{
var readModel = await readModelRepository.GetByIdAsync(@event.OrderId, ct);
if (readModel != null)
{
readModel.Status = "PaymentReceived";
readModel.PaymentId = @event.PaymentId;
readModel.PaymentProcessedAt = @event.OccurredAt;
await readModelRepository.SaveAsync(readModel, ct);
}
}
public async Task Project(OrderCompletedEvent @event, CancellationToken ct)
{
var readModel = await readModelRepository.GetByIdAsync(@event.OrderId, ct);
if (readModel != null)
{
readModel.Status = "Completed";
readModel.CompletedAt = @event.OccurredAt;
await readModelRepository.SaveAsync(readModel, ct);
}
}
}
Event Replay¶
Rebuilding Aggregates from Event Stream¶
public class AggregateRebuilder
{
private readonly IEventStore eventStore;
public async Task<TAggregate> RebuildAggregateAsync<TAggregate>(
Guid aggregateId,
Func<Guid, IEnumerable<IDomainEvent>, TAggregate> factory,
CancellationToken ct)
where TAggregate : EventSourcedAggregateRoot<Guid>
{
// Get all events for aggregate
var events = await eventStore.GetEventsAsync(aggregateId, ct);
// Rebuild aggregate from events
return factory(aggregateId, events);
}
public async Task RebuildReadModelAsync(
Guid aggregateId,
IReadModelProjector projector,
CancellationToken ct)
{
// Get all events
var events = await eventStore.GetEventsAsync(aggregateId, ct);
// Replay events to projector
foreach (var @event in events)
{
await projector.ProjectAsync(@event, ct);
}
}
}
Snapshot Support¶
Optimizing with Snapshots¶
public class SnapshotStore
{
public async Task SaveSnapshotAsync(
Guid aggregateId,
object snapshot,
long version,
CancellationToken ct)
{
// Save snapshot with version
await connection.ExecuteAsync(
@"INSERT INTO Snapshots (AggregateId, Data, Version, CreatedAt)
VALUES (@AggregateId, @Data, @Version, @CreatedAt)",
new
{
AggregateId = aggregateId,
Data = serializer.Serialize(snapshot),
Version = version,
CreatedAt = DateTimeOffset.UtcNow
});
}
public async Task<(object Snapshot, long Version)> GetLatestSnapshotAsync(
Guid aggregateId,
CancellationToken ct)
{
var snapshotData = await connection.QueryFirstOrDefaultAsync<SnapshotData>(
"SELECT * FROM Snapshots WHERE AggregateId = @AggregateId ORDER BY Version DESC",
new { AggregateId = aggregateId });
if (snapshotData == null)
return (null, 0);
var snapshot = serializer.Deserialize(snapshotData.Data);
return (snapshot, snapshotData.Version);
}
}
// Repository using snapshots
public async Task<TAggregate> GetByIdAsync(TId id, CancellationToken ct)
{
// Try to load from snapshot first
var (snapshot, snapshotVersion) = await snapshotStore.GetLatestSnapshotAsync(id, ct);
IEnumerable<IDomainEvent> events;
if (snapshot != null)
{
// Load events after snapshot
events = await eventStore.GetEventsAsync(id, snapshotVersion + 1, ct);
}
else
{
// Load all events
events = await eventStore.GetEventsAsync(id, ct);
}
// Rebuild aggregate
var aggregate = factory(id, events);
if (snapshot != null)
{
aggregate.RestoreFromSnapshot(snapshot);
}
return aggregate;
}
Configuration¶
Event Store Setup¶
// Dependency Injection
services.AddScoped<IEventStore, SqlEventStore>();
services.AddScoped<IEventSerializer, JsonEventSerializer>();
services.AddScoped<ISnapshotStore, SqlSnapshotStore>();
// Event Store Repository
services.AddScoped<IEventSourcedRepository<OrderAggregate, Guid>>(
provider => new EventSourcedRepository<OrderAggregate, Guid>(
provider.GetRequiredService<IEventStore>(),
(id, events) => new OrderAggregate(id, events)));
Event Stream Table¶
CREATE TABLE Events (
EventId UNIQUEIDENTIFIER PRIMARY KEY,
AggregateId UNIQUEIDENTIFIER NOT NULL,
AggregateType NVARCHAR(500) NOT NULL,
EventType NVARCHAR(500) NOT NULL,
EventData NVARCHAR(MAX) NOT NULL,
Version BIGINT NOT NULL,
OccurredAt DATETIME2 NOT NULL,
Metadata NVARCHAR(MAX) NULL,
CONSTRAINT UK_Events_AggregateId_Version
UNIQUE (AggregateId, Version)
);
CREATE INDEX IX_Events_AggregateId
ON Events (AggregateId, Version);
Best Practices¶
Do's¶
- Make events immutable
- Events represent facts that happened
- Never modify events after they're created
-
Version events if schema needs to change
-
Store events in order
- Maintain strict ordering by version
- Use optimistic concurrency for appends
-
Enforce ordering at database level
-
Use snapshots for performance
- Create snapshots periodically for long event streams
- Balance snapshot frequency vs. storage cost
-
Rebuild from snapshot + events after snapshot
-
Project events asynchronously
- Build read models from events asynchronously
- Don't block command processing for projections
-
Handle projection failures gracefully
-
Version events carefully
- Add new event types rather than modifying existing
- Support multiple event versions during migration
- Document event schemas clearly
Don'ts¶
- Don't store mutable state in events
- Events should be immutable facts
- Don't include calculated fields that might change
-
Store what happened, not derived state
-
Don't query event store for reads
- Use read models for queries
- Event store is optimized for writes
-
Build projections for query needs
-
Don't delete events
- Events are immutable facts
- Use event versioning instead
-
Archive old events if needed, don't delete
-
Don't mix event sourcing with traditional persistence
- Choose one pattern per aggregate
- Don't try to sync both approaches
-
Be consistent within aggregate boundaries
-
Don't forget about event migration
- Plan for event schema evolution
- Support reading old event versions
- Have migration strategy for event changes
Integration Points¶
CQRS Pattern¶
Event Sourcing pairs naturally with CQRS:
// Command side: Store events
public async Task CreateOrder(CreateOrderCommand command)
{
var order = new OrderAggregate(command.OrderId, command.CustomerId, command.Amount);
await eventStoreRepository.SaveAsync(order, ct);
}
// Query side: Read from projections
public async Task<OrderReadModel> GetOrder(Guid orderId, CancellationToken ct)
{
return await readModelRepository.GetByIdAsync(orderId, ct);
}
Domain Events¶
Domain events in template form foundation for Event Sourcing:
// Template domain events can be persisted to event store
public class OrderCreatedEvent : IDomainEvent
{
public Guid EventId { get; set; }
public Guid AggregateId { get; set; }
public DateTimeOffset OccurredAt { get; set; }
public long Version { get; set; }
// Additional event data
}
Saga Pattern¶
Sagas can be event-sourced:
// Saga state can be rebuilt from events
public class OrderProcessingSaga : EventSourcedAggregateRoot<Guid>
{
public void Handle(OrderCreatedEvent @event)
{
RaiseEvent(new SagaStartedEvent { OrderId = @event.OrderId });
}
}
Common Scenarios¶
Scenario 1: Full Event Sourcing¶
// Create event-sourced aggregate
var order = new OrderAggregate(orderId, customerId, amount);
// Process commands
order.ProcessPayment(paymentId, amount);
order.Complete();
// Save events
await eventStoreRepository.SaveAsync(order, ct);
// Rebuild from events later
var rebuiltOrder = await eventStoreRepository.GetByIdAsync(orderId, ct);
Scenario 2: Event Sourcing with Snapshots¶
// Periodically create snapshots
if (aggregate.Version % 100 == 0)
{
var snapshot = aggregate.CreateSnapshot();
await snapshotStore.SaveSnapshotAsync(aggregate.Id, snapshot, aggregate.Version, ct);
}
// Load with snapshot optimization
var aggregate = await repository.GetByIdAsync(id, ct);
// Automatically uses snapshot + events
Scenario 3: Event Replay for Projections¶
// Rebuild read model from events
public async Task RebuildReadModel(Guid aggregateId, CancellationToken ct)
{
var events = await eventStore.GetEventsAsync(aggregateId, ct);
foreach (var @event in events)
{
await projector.ProjectAsync(@event, ct);
}
}
Scenario 4: Time Travel Query¶
// Get state at specific point in time
public async Task<OrderAggregate> GetOrderAtTime(
Guid orderId,
DateTimeOffset asOf,
CancellationToken ct)
{
// Get events up to specific time
var events = await eventStore.GetEventsAsync(
orderId,
until: asOf,
ct);
// Rebuild aggregate state
return new OrderAggregate(orderId, events);
}
Troubleshooting¶
Issue: Event stream too long, slow rebuild¶
Cause: Aggregates with many events take time to rebuild.
Solution: - Implement snapshots - Create snapshots periodically (e.g., every 100 events) - Load from snapshot + events after snapshot
Issue: Events not in order¶
Cause: Concurrent writes or missing ordering constraints.
Solution: - Add unique constraint on (AggregateId, Version) - Use optimistic concurrency (expected version) - Handle concurrency exceptions properly
Issue: Read models out of sync¶
Cause: Projection processing failed or delayed.
Solution: - Monitor projection processing - Implement projection replay - Handle projection failures and retry
Issue: Event schema changes¶
Cause: Need to evolve event structure over time.
Solution: - Version events (EventV1, EventV2) - Support multiple versions during migration - Use upcasters to convert old events to new format
References¶
- Domain-Driven Design: domain-driven-design.md
- CQRS Pattern: cqrs.md
- Domain Events: Domain events documentation
- Saga Pattern: saga-pattern.md
- Messaging Model: messaging-model.md
Summary¶
Event Sourcing in ConnectSoft Microservice Template:
- Provides complete audit trail of all state changes
- Enables time travel and event replay
- Supports flexible read models through projections
- Works naturally with CQRS pattern
- Pairs with existing domain events
- Requires event store for persistence
- Benefits from snapshots for performance
- Enables event replay for debugging and recovery
This pattern is powerful for systems requiring complete auditability, time travel capabilities, and flexible read model generation, though it adds complexity and should be chosen when these benefits outweigh the costs.