Saga Pattern in ConnectSoft Microservice Template¶
Purpose & Overview¶
The Saga Pattern is a critical pattern for managing long-running business processes that span multiple services or require multiple steps to complete. In the ConnectSoft Microservice Template, sagas orchestrate distributed transactions and ensure eventual consistency across microservice boundaries.
Why Saga Pattern?¶
Sagas solve several challenges in distributed systems:
- Long-Running Processes: Manage workflows that take significant time to complete
- Distributed Transactions: Coordinate operations across multiple services without traditional ACID transactions
- Eventual Consistency: Ensure system eventually reaches consistent state through compensation
- Failure Handling: Recover from partial failures through compensating actions
- State Management: Maintain process state across service boundaries and system restarts
- Orchestration: Coordinate complex multi-step business processes
How It's Implemented in Template¶
Architecture Overview¶
The template supports two saga implementation approaches:
- MassTransit State Machine Sagas - Declarative state machine-based sagas
- NServiceBus Traditional Sagas - Handler-based saga orchestration
Both approaches provide: - Persistent saga state storage - Automatic state recovery - Message correlation - Fault handling and compensation - Timeout support
Core Components¶
1. Saga State (MassTransit)¶
// MicroserviceAggregateRootLifeCycleSagaState.cs
public class MicroserviceAggregateRootLifeCycleSagaState :
SagaStateMachineInstance,
ISagaVersion
{
/// <summary>
/// Correlation ID - unique identifier for saga instance
/// </summary>
public Guid CorrelationId { get; set; }
/// <summary>
/// Current state in the state machine
/// </summary>
public string CurrentState { get; set; } = null!;
/// <summary>
/// Business identifier (e.g., order ID, aggregate ID)
/// </summary>
public Guid ObjectId { get; set; }
/// <summary>
/// Version for optimistic concurrency control
/// </summary>
public int Version { get; set; }
/// <summary>
/// Timestamp when saga was started
/// </summary>
public DateTimeOffset StartedAt { get; set; }
// Additional saga-specific data
}
2. Saga State Machine (MassTransit)¶
// MicroserviceAggregateRootLifeCycleSaga.cs
public class MicroserviceAggregateRootLifeCycleSaga :
MassTransitStateMachine<MicroserviceAggregateRootLifeCycleSagaState>
{
// Define states
public State? MicroserviceAggregateRootCreatedEventState { get; }
public State? MicroserviceAggregateRootCreatedFaultedState { get; }
// Define events
public Event<MicroserviceAggregateRootCreatedEvent>? MicroserviceAggregateRootCreatedEvent { get; }
public Event<Fault<MicroserviceAggregateRootCreatedEvent>>? MicroserviceAggregateRootCreatedFaultEvent { get; }
public MicroserviceAggregateRootLifeCycleSaga(
ILogger<MicroserviceAggregateRootLifeCycleSaga> logger,
TimeProvider timeProvider)
{
// Configure state machine
this.InstanceState(x => x.CurrentState);
this.DefineEvents();
this.DefineStateMachineBehavior();
this.DefineStateMachineFaultCompensationBehavior();
}
private void DefineEvents()
{
// Configure event correlation
this.Event(
() => this.MicroserviceAggregateRootCreatedEvent,
configurator =>
{
configurator.CorrelateBy<Guid>(
state => state.ObjectId,
message => message.Message.ObjectId);
configurator.InsertOnInitial = true;
configurator.SetSagaFactory(context =>
{
return new MicroserviceAggregateRootLifeCycleSagaState()
{
CorrelationId = context.Message.ObjectId,
ObjectId = context.Message.ObjectId,
};
});
});
}
private void DefineStateMachineBehavior()
{
// Define state transitions
this.During(
this.Initial,
this.When(this.MicroserviceAggregateRootCreatedEvent)
.Then(context => ProcessMicroserviceAggregateRootCreatedEvent(context))
.TransitionTo(this.MicroserviceAggregateRootCreatedEventState));
}
private void DefineStateMachineFaultCompensationBehavior()
{
// Handle faults and compensation
this.DuringAny(
this.When(this.MicroserviceAggregateRootCreatedFaultEvent)
.Then(context => ProcessMicroserviceAggregateRootCreatedEventFaulted(context))
.TransitionTo(this.MicroserviceAggregateRootCreatedFaultedState));
}
}
3. Traditional Saga (NServiceBus)¶
// MicroserviceAggregateRootLifeCycleSaga.cs
public class MicroserviceAggregateRootLifeCycleSaga :
Saga<MicroserviceAggregateRootSagaData>,
IAmStartedByMessages<MicroserviceAggregateRootCreatedEvent>
{
private readonly ILogger<MicroserviceAggregateRootLifeCycleSaga> logger;
public MicroserviceAggregateRootLifeCycleSaga(
ILogger<MicroserviceAggregateRootLifeCycleSaga> logger)
{
this.logger = logger;
}
/// <summary>
/// Handle message that starts the saga
/// </summary>
public Task Handle(
MicroserviceAggregateRootCreatedEvent message,
IMessageHandlerContext context)
{
// Initialize saga data
this.Data.ObjectId = message.ObjectId;
// Publish commands, send messages, etc.
return Task.CompletedTask;
}
/// <summary>
/// Configure how to find saga instance by message
/// </summary>
protected override void ConfigureHowToFindSaga(
SagaPropertyMapper<MicroserviceAggregateRootSagaData> mapper)
{
mapper.MapSaga(saga => saga.ObjectId)
.ToMessage<MicroserviceAggregateRootCreatedEvent>(msg => msg.ObjectId);
}
}
4. Saga Data (NServiceBus)¶
// MicroserviceAggregateRootSagaData.cs
public class MicroserviceAggregateRootSagaData : ContainSagaData
{
/// <summary>
/// Unique saga identifier (required by NServiceBus)
/// </summary>
public Guid Id { get; set; }
/// <summary>
/// Original message ID (required by NServiceBus)
/// </summary>
public string Originator { get; set; } = null!;
/// <summary>
/// Original message ID (required by NServiceBus)
/// </summary>
public string OriginalMessageId { get; set; } = null!;
/// <summary>
/// Business identifier
/// </summary>
public Guid ObjectId { get; set; }
// Additional saga state
}
Project Structure¶
| Project | Responsibility |
|---|---|
FlowModel |
Saga contracts and base types |
FlowModel.MassTransit |
MassTransit state machine saga implementations |
FlowModel.NServiceBus |
NServiceBus traditional saga implementations |
MessagingModel |
Events and commands used by sagas |
Code Examples¶
MassTransit State Machine Saga¶
Basic Saga State Machine¶
public class OrderProcessingSaga :
MassTransitStateMachine<OrderProcessingSagaState>
{
public State? OrderCreated { get; }
public State? PaymentProcessing { get; }
public State? InventoryReserved { get; }
public State? OrderCompleted { get; }
public State? OrderFailed { get; }
public Event<OrderCreatedEvent>? OrderCreatedEvent { get; }
public Event<PaymentProcessedEvent>? PaymentProcessedEvent { get; }
public Event<InventoryReservedEvent>? InventoryReservedEvent { get; }
public Event<Fault<ProcessPaymentCommand>>? PaymentFailed { get; }
public OrderProcessingSaga()
{
InstanceState(x => x.CurrentState);
// Define events
Event(() => OrderCreatedEvent, e => e
.CorrelateBy<Guid>(state => state.OrderId, context => context.Message.OrderId)
.InsertOnInitial = true);
Event(() => PaymentProcessedEvent, e => e
.CorrelateBy<Guid>(state => state.OrderId, context => context.Message.OrderId));
Event(() => InventoryReservedEvent, e => e
.CorrelateBy<Guid>(state => state.OrderId, context => context.Message.OrderId));
// Define state transitions
Initially(
When(OrderCreatedEvent)
.Then(context =>
{
context.Saga.OrderId = context.Message.OrderId;
context.Saga.CustomerId = context.Message.CustomerId;
context.Saga.StartedAt = DateTimeOffset.UtcNow;
})
.Publish(context => new ProcessPaymentCommand
{
OrderId = context.Saga.OrderId,
Amount = context.Message.TotalAmount
})
.TransitionTo(PaymentProcessing));
During(PaymentProcessing,
When(PaymentProcessedEvent)
.Then(context =>
{
context.Saga.PaymentId = context.Message.PaymentId;
context.Saga.PaymentProcessedAt = DateTimeOffset.UtcNow;
})
.Publish(context => new ReserveInventoryCommand
{
OrderId = context.Saga.OrderId,
Items = context.Saga.OrderItems
})
.TransitionTo(InventoryReserved),
When(PaymentFailed)
.Then(context => LogPaymentFailure(context))
.Publish(context => new CancelOrderCommand { OrderId = context.Saga.OrderId })
.TransitionTo(OrderFailed));
During(InventoryReserved,
When(InventoryReservedEvent)
.Then(context =>
{
context.Saga.InventoryReservedAt = DateTimeOffset.UtcNow;
})
.Publish(context => new CompleteOrderCommand
{
OrderId = context.Saga.OrderId
})
.TransitionTo(OrderCompleted));
}
}
Saga with Timeouts¶
public class OrderProcessingSaga : MassTransitStateMachine<OrderProcessingSagaState>
{
// Timeout events
public Event<OrderTimeoutExpired>? OrderTimeoutExpired { get; }
public OrderProcessingSaga()
{
// ... other configuration ...
During(PaymentProcessing,
When(PaymentProcessedEvent)
// ... payment processing ...
.Schedule(OrderTimeoutExpired, context => new OrderTimeoutExpired
{
OrderId = context.Saga.OrderId
}, context => TimeSpan.FromMinutes(30))
.TransitionTo(InventoryReserved),
When(OrderTimeoutExpired)
.Then(context => HandleTimeout(context))
.Publish(context => new CancelOrderCommand { OrderId = context.Saga.OrderId })
.TransitionTo(OrderFailed));
}
}
NServiceBus Traditional Saga¶
Multi-Step Saga¶
public class OrderProcessingSaga :
Saga<OrderProcessingSagaData>,
IAmStartedByMessages<OrderCreatedEvent>,
IHandleMessages<PaymentProcessedEvent>,
IHandleMessages<InventoryReservedEvent>,
IHandleTimeouts<OrderTimeoutExpired>
{
private readonly ILogger<OrderProcessingSaga> logger;
public OrderProcessingSaga(ILogger<OrderProcessingSaga> logger)
{
this.logger = logger;
}
// Start saga
public async Task Handle(OrderCreatedEvent message, IMessageHandlerContext context)
{
Data.OrderId = message.OrderId;
Data.CustomerId = message.CustomerId;
Data.Status = OrderStatus.Processing;
Data.StartedAt = DateTimeOffset.UtcNow;
// Request timeout
await RequestTimeout<OrderTimeoutExpired>(
context,
TimeSpan.FromMinutes(30));
// Send command to process payment
await context.Send(new ProcessPaymentCommand
{
OrderId = message.OrderId,
Amount = message.TotalAmount
});
}
// Continue saga
public async Task Handle(PaymentProcessedEvent message, IMessageHandlerContext context)
{
Data.PaymentId = message.PaymentId;
Data.PaymentProcessedAt = DateTimeOffset.UtcNow;
// Send command to reserve inventory
await context.Send(new ReserveInventoryCommand
{
OrderId = Data.OrderId,
Items = Data.OrderItems
});
}
// Complete saga
public async Task Handle(InventoryReservedEvent message, IMessageHandlerContext context)
{
Data.InventoryReservedAt = DateTimeOffset.UtcNow;
Data.Status = OrderStatus.Completed;
// Mark saga as complete
MarkAsComplete();
// Publish completion event
await context.Publish(new OrderCompletedEvent
{
OrderId = Data.OrderId
});
}
// Handle timeout
public async Task Timeout(OrderTimeoutExpired state, IMessageHandlerContext context)
{
logger.LogWarning("Order {OrderId} timed out", Data.OrderId);
Data.Status = OrderStatus.Failed;
await context.Send(new CancelOrderCommand
{
OrderId = Data.OrderId
});
MarkAsComplete();
}
protected override void ConfigureHowToFindSaga(
SagaPropertyMapper<OrderProcessingSagaData> mapper)
{
mapper.MapSaga(saga => saga.OrderId)
.ToMessage<OrderCreatedEvent>(msg => msg.OrderId)
.ToMessage<PaymentProcessedEvent>(msg => msg.OrderId)
.ToMessage<InventoryReservedEvent>(msg => msg.OrderId);
}
}
Configuration¶
MassTransit Saga Configuration¶
// MassTransitExtensions.cs
public static IServiceCollection AddMicroserviceMassTransit(
this IServiceCollection services)
{
services.AddMassTransit(x =>
{
// Register saga state machine
x.AddSagaStateMachine<MicroserviceAggregateRootLifeCycleSaga,
MicroserviceAggregateRootLifeCycleSagaState>()
.MongoDbRepository(r =>
{
r.Connection = "mongodb://localhost:27017";
r.DatabaseName = "sagas";
r.CollectionName = nameof(MicroserviceAggregateRootLifeCycleSaga);
});
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
return services;
}
NServiceBus Saga Configuration¶
// NServiceBusExtensions.cs
public static IServiceCollection AddMicroserviceNServiceBus(
this IServiceCollection services)
{
var endpointConfiguration = new EndpointConfiguration("MyService");
// Configure saga persistence
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.SqlDialect<SqlDialect.MsSqlServer>();
persistence.ConnectionBuilder(() => new SqlConnection(connectionString));
// Register saga
endpointConfiguration.RegisterComponents(registration =>
{
registration.ConfigureComponent<MicroserviceAggregateRootLifeCycleSaga>(
DependencyLifecycle.InstancePerCall);
});
// Enable saga auditing
endpointConfiguration.AuditSagaStateChanges(
serviceControlQueue: "audit-queue");
return services;
}
Best Practices¶
Do's¶
- Design sagas for idempotency
- Handle duplicate messages gracefully
-
Use versioning for optimistic concurrency
-
Implement compensation for failures
- Provide compensating actions for each step
-
Ensure system can recover from partial failures
-
Set appropriate timeouts
- Use timeouts to detect stuck processes
-
Implement timeout handling and cleanup
-
Keep saga state minimal
- Store only necessary data in saga state
-
Avoid storing large objects or collections
-
Use correlation IDs correctly
- Map messages to sagas using business identifiers
-
Ensure correlation works across service boundaries
-
Handle faults explicitly
- Define fault states and compensation
- Log failures for observability
Don'ts¶
- Don't perform business logic in saga state machine
- Use sagas for orchestration, not business rules
-
Delegate business logic to domain services
-
Don't create too many saga instances
- Design sagas to complete in reasonable time
-
Consider if process truly needs saga pattern
-
Don't store transient data in saga state
- Use saga state for recovery and correlation only
-
Avoid storing UI state or session data
-
Don't ignore version conflicts
- Handle optimistic concurrency exceptions
-
Implement retry logic for version conflicts
-
Don't create circular dependencies
- Avoid sagas that trigger each other infinitely
- Design clear process boundaries
Integration Points¶
Event-Driven Architecture¶
Sagas integrate with event-driven architecture:
// Saga publishes events when state changes
context.Publish(new OrderStatusChangedEvent
{
OrderId = context.Saga.OrderId,
Status = OrderStatus.Processing
});
Command Pattern¶
Sagas send commands to other services:
// Saga sends commands to perform actions
await context.Send(new ProcessPaymentCommand
{
OrderId = saga.OrderId,
Amount = saga.TotalAmount
});
Domain Events¶
Sagas react to domain events:
// Saga handles domain events
public async Task Handle(OrderCreatedEvent message, IMessageHandlerContext context)
{
// React to domain event
}
Common Scenarios¶
Scenario 1: Order Processing Workflow¶
// MassTransit State Machine
public class OrderProcessingSaga : MassTransitStateMachine<OrderProcessingSagaState>
{
// States: Created -> PaymentProcessing -> InventoryReserved -> Shipping -> Completed
// Failure states: PaymentFailed, InventoryUnavailable, ShippingFailed
Initially(
When(OrderCreated)
.Publish(ProcessPaymentCommand)
.TransitionTo(PaymentProcessing));
During(PaymentProcessing,
When(PaymentProcessed)
.Publish(ReserveInventoryCommand)
.TransitionTo(InventoryReserved),
When(PaymentFailed)
.Publish(CancelOrderCommand)
.TransitionTo(Failed));
During(InventoryReserved,
When(InventoryReserved)
.Publish(CreateShipmentCommand)
.TransitionTo(Shipping),
When(InventoryUnavailable)
.Publish(RefundPaymentCommand)
.TransitionTo(Failed));
}
Scenario 2: Compensation Pattern¶
// When payment succeeds but inventory fails, compensate payment
During(InventoryReserved,
When(InventoryReserved)
.TransitionTo(Completed),
When(InventoryUnavailable)
.Then(context =>
{
// Compensate: refund payment
context.Publish(new RefundPaymentCommand
{
PaymentId = context.Saga.PaymentId,
Amount = context.Saga.TotalAmount
});
})
.TransitionTo(Failed));
Scenario 3: Long-Running Process with Timeouts¶
// Saga with timeout for human approval step
During(ApprovalPending,
When(Approved)
.ContinueWithNextStep(),
When(TimeoutExpired)
.Then(context =>
{
// Auto-reject after timeout
context.Publish(new RejectCommand
{
RequestId = context.Saga.RequestId,
Reason = "Timeout expired"
});
})
.TransitionTo(Rejected));
Troubleshooting¶
Issue: Saga not starting¶
Cause: Event correlation may be incorrect or saga not registered.
Solution: Verify correlation mapping:
// Ensure correlation is correctly configured
Event(() => OrderCreatedEvent, e => e
.CorrelateBy<Guid>(
state => state.OrderId,
context => context.Message.OrderId) // Must match!
.InsertOnInitial = true);
Issue: Saga state not persisting¶
Cause: Saga repository not configured or connection string incorrect.
Solution: Verify saga repository configuration:
// MassTransit MongoDB
.AddSagaStateMachine<MySaga, MySagaState>()
.MongoDbRepository(r =>
{
r.Connection = connectionString; // Verify connection
r.DatabaseName = "sagas";
});
// NServiceBus SQL
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.SqlDialect<SqlDialect.MsSqlServer>();
persistence.ConnectionBuilder(() => new SqlConnection(connectionString));
Issue: Messages not correlating to saga¶
Cause: Message property doesn't match saga correlation property.
Solution: Ensure consistent correlation:
// NServiceBus saga finder
protected override void ConfigureHowToFindSaga(
SagaPropertyMapper<SagaData> mapper)
{
// Property names must match exactly
mapper.MapSaga(saga => saga.OrderId) // Saga property
.ToMessage<OrderCreatedEvent>(msg => msg.OrderId); // Message property
}
Issue: Version conflicts¶
Cause: Multiple messages processed concurrently on same saga instance.
Solution: Implement retry logic:
// MassTransit automatically retries on concurrency exceptions
// NServiceBus - configure retry policy
var recoverability = endpointConfiguration.Recoverability();
recoverability.Immediate(immediate => immediate.NumberOfRetries(3));
recoverability.Delayed(delayed => delayed.NumberOfRetries(5));
References¶
- MassTransit: masstransit.md
- NServiceBus: nservicebus.md
- Messaging Model: messaging-model.md
- Event-Driven Architecture: Event-driven patterns documentation
- Domain-Driven Design: domain-driven-design.md
Summary¶
The Saga Pattern in ConnectSoft Microservice Template:
- Provides orchestration for long-running processes
- Ensures eventual consistency across services
- Supports compensation for failure recovery
- Maintains persistent state across restarts
- Works with MassTransit (state machines) and NServiceBus (traditional)
- Handles faults and timeouts gracefully
- Enables distributed transactions without traditional ACID guarantees
This pattern is essential for building reliable, distributed systems where business processes span multiple services and require coordination and state management.