Skip to content

Saga Pattern in ConnectSoft Microservice Template

Purpose & Overview

The Saga Pattern is a critical pattern for managing long-running business processes that span multiple services or require multiple steps to complete. In the ConnectSoft Microservice Template, sagas orchestrate distributed transactions and ensure eventual consistency across microservice boundaries.

Why Saga Pattern?

Sagas solve several challenges in distributed systems:

  • Long-Running Processes: Manage workflows that take significant time to complete
  • Distributed Transactions: Coordinate operations across multiple services without traditional ACID transactions
  • Eventual Consistency: Ensure system eventually reaches consistent state through compensation
  • Failure Handling: Recover from partial failures through compensating actions
  • State Management: Maintain process state across service boundaries and system restarts
  • Orchestration: Coordinate complex multi-step business processes

How It's Implemented in Template

Architecture Overview

The template supports two saga implementation approaches:

  1. MassTransit State Machine Sagas - Declarative state machine-based sagas
  2. NServiceBus Traditional Sagas - Handler-based saga orchestration

Both approaches provide: - Persistent saga state storage - Automatic state recovery - Message correlation - Fault handling and compensation - Timeout support

Core Components

1. Saga State (MassTransit)

// MicroserviceAggregateRootLifeCycleSagaState.cs
public class MicroserviceAggregateRootLifeCycleSagaState :
    SagaStateMachineInstance,
    ISagaVersion
{
    /// <summary>
    /// Correlation ID - unique identifier for saga instance
    /// </summary>
    public Guid CorrelationId { get; set; }

    /// <summary>
    /// Current state in the state machine
    /// </summary>
    public string CurrentState { get; set; } = null!;

    /// <summary>
    /// Business identifier (e.g., order ID, aggregate ID)
    /// </summary>
    public Guid ObjectId { get; set; }

    /// <summary>
    /// Version for optimistic concurrency control
    /// </summary>
    public int Version { get; set; }

    /// <summary>
    /// Timestamp when saga was started
    /// </summary>
    public DateTimeOffset StartedAt { get; set; }

    // Additional saga-specific data
}

2. Saga State Machine (MassTransit)

// MicroserviceAggregateRootLifeCycleSaga.cs
public class MicroserviceAggregateRootLifeCycleSaga :
    MassTransitStateMachine<MicroserviceAggregateRootLifeCycleSagaState>
{
    // Define states
    public State? MicroserviceAggregateRootCreatedEventState { get; }
    public State? MicroserviceAggregateRootCreatedFaultedState { get; }

    // Define events
    public Event<MicroserviceAggregateRootCreatedEvent>? MicroserviceAggregateRootCreatedEvent { get; }
    public Event<Fault<MicroserviceAggregateRootCreatedEvent>>? MicroserviceAggregateRootCreatedFaultEvent { get; }

    public MicroserviceAggregateRootLifeCycleSaga(
        ILogger<MicroserviceAggregateRootLifeCycleSaga> logger,
        TimeProvider timeProvider)
    {
        // Configure state machine
        this.InstanceState(x => x.CurrentState);
        this.DefineEvents();
        this.DefineStateMachineBehavior();
        this.DefineStateMachineFaultCompensationBehavior();
    }

    private void DefineEvents()
    {
        // Configure event correlation
        this.Event(
            () => this.MicroserviceAggregateRootCreatedEvent,
            configurator =>
            {
                configurator.CorrelateBy<Guid>(
                    state => state.ObjectId,
                    message => message.Message.ObjectId);
                configurator.InsertOnInitial = true;
                configurator.SetSagaFactory(context =>
                {
                    return new MicroserviceAggregateRootLifeCycleSagaState()
                    {
                        CorrelationId = context.Message.ObjectId,
                        ObjectId = context.Message.ObjectId,
                    };
                });
            });
    }

    private void DefineStateMachineBehavior()
    {
        // Define state transitions
        this.During(
            this.Initial,
            this.When(this.MicroserviceAggregateRootCreatedEvent)
                .Then(context => ProcessMicroserviceAggregateRootCreatedEvent(context))
                .TransitionTo(this.MicroserviceAggregateRootCreatedEventState));
    }

    private void DefineStateMachineFaultCompensationBehavior()
    {
        // Handle faults and compensation
        this.DuringAny(
            this.When(this.MicroserviceAggregateRootCreatedFaultEvent)
                .Then(context => ProcessMicroserviceAggregateRootCreatedEventFaulted(context))
                .TransitionTo(this.MicroserviceAggregateRootCreatedFaultedState));
    }
}

3. Traditional Saga (NServiceBus)

// MicroserviceAggregateRootLifeCycleSaga.cs
public class MicroserviceAggregateRootLifeCycleSaga :
    Saga<MicroserviceAggregateRootSagaData>,
    IAmStartedByMessages<MicroserviceAggregateRootCreatedEvent>
{
    private readonly ILogger<MicroserviceAggregateRootLifeCycleSaga> logger;

    public MicroserviceAggregateRootLifeCycleSaga(
        ILogger<MicroserviceAggregateRootLifeCycleSaga> logger)
    {
        this.logger = logger;
    }

    /// <summary>
    /// Handle message that starts the saga
    /// </summary>
    public Task Handle(
        MicroserviceAggregateRootCreatedEvent message,
        IMessageHandlerContext context)
    {
        // Initialize saga data
        this.Data.ObjectId = message.ObjectId;

        // Publish commands, send messages, etc.

        return Task.CompletedTask;
    }

    /// <summary>
    /// Configure how to find saga instance by message
    /// </summary>
    protected override void ConfigureHowToFindSaga(
        SagaPropertyMapper<MicroserviceAggregateRootSagaData> mapper)
    {
        mapper.MapSaga(saga => saga.ObjectId)
            .ToMessage<MicroserviceAggregateRootCreatedEvent>(msg => msg.ObjectId);
    }
}

4. Saga Data (NServiceBus)

// MicroserviceAggregateRootSagaData.cs
public class MicroserviceAggregateRootSagaData : ContainSagaData
{
    /// <summary>
    /// Unique saga identifier (required by NServiceBus)
    /// </summary>
    public Guid Id { get; set; }

    /// <summary>
    /// Original message ID (required by NServiceBus)
    /// </summary>
    public string Originator { get; set; } = null!;

    /// <summary>
    /// Original message ID (required by NServiceBus)
    /// </summary>
    public string OriginalMessageId { get; set; } = null!;

    /// <summary>
    /// Business identifier
    /// </summary>
    public Guid ObjectId { get; set; }

    // Additional saga state
}

Project Structure

Project Responsibility
FlowModel Saga contracts and base types
FlowModel.MassTransit MassTransit state machine saga implementations
FlowModel.NServiceBus NServiceBus traditional saga implementations
MessagingModel Events and commands used by sagas

Code Examples

MassTransit State Machine Saga

Basic Saga State Machine

public class OrderProcessingSaga :
    MassTransitStateMachine<OrderProcessingSagaState>
{
    public State? OrderCreated { get; }
    public State? PaymentProcessing { get; }
    public State? InventoryReserved { get; }
    public State? OrderCompleted { get; }
    public State? OrderFailed { get; }

    public Event<OrderCreatedEvent>? OrderCreatedEvent { get; }
    public Event<PaymentProcessedEvent>? PaymentProcessedEvent { get; }
    public Event<InventoryReservedEvent>? InventoryReservedEvent { get; }
    public Event<Fault<ProcessPaymentCommand>>? PaymentFailed { get; }

    public OrderProcessingSaga()
    {
        InstanceState(x => x.CurrentState);

        // Define events
        Event(() => OrderCreatedEvent, e => e
            .CorrelateBy<Guid>(state => state.OrderId, context => context.Message.OrderId)
            .InsertOnInitial = true);

        Event(() => PaymentProcessedEvent, e => e
            .CorrelateBy<Guid>(state => state.OrderId, context => context.Message.OrderId));

        Event(() => InventoryReservedEvent, e => e
            .CorrelateBy<Guid>(state => state.OrderId, context => context.Message.OrderId));

        // Define state transitions
        Initially(
            When(OrderCreatedEvent)
                .Then(context =>
                {
                    context.Saga.OrderId = context.Message.OrderId;
                    context.Saga.CustomerId = context.Message.CustomerId;
                    context.Saga.StartedAt = DateTimeOffset.UtcNow;
                })
                .Publish(context => new ProcessPaymentCommand
                {
                    OrderId = context.Saga.OrderId,
                    Amount = context.Message.TotalAmount
                })
                .TransitionTo(PaymentProcessing));

        During(PaymentProcessing,
            When(PaymentProcessedEvent)
                .Then(context =>
                {
                    context.Saga.PaymentId = context.Message.PaymentId;
                    context.Saga.PaymentProcessedAt = DateTimeOffset.UtcNow;
                })
                .Publish(context => new ReserveInventoryCommand
                {
                    OrderId = context.Saga.OrderId,
                    Items = context.Saga.OrderItems
                })
                .TransitionTo(InventoryReserved),
            When(PaymentFailed)
                .Then(context => LogPaymentFailure(context))
                .Publish(context => new CancelOrderCommand { OrderId = context.Saga.OrderId })
                .TransitionTo(OrderFailed));

        During(InventoryReserved,
            When(InventoryReservedEvent)
                .Then(context =>
                {
                    context.Saga.InventoryReservedAt = DateTimeOffset.UtcNow;
                })
                .Publish(context => new CompleteOrderCommand
                {
                    OrderId = context.Saga.OrderId
                })
                .TransitionTo(OrderCompleted));
    }
}

Saga with Timeouts

public class OrderProcessingSaga : MassTransitStateMachine<OrderProcessingSagaState>
{
    // Timeout events
    public Event<OrderTimeoutExpired>? OrderTimeoutExpired { get; }

    public OrderProcessingSaga()
    {
        // ... other configuration ...

        During(PaymentProcessing,
            When(PaymentProcessedEvent)
                // ... payment processing ...
                .Schedule(OrderTimeoutExpired, context => new OrderTimeoutExpired
                {
                    OrderId = context.Saga.OrderId
                }, context => TimeSpan.FromMinutes(30))
                .TransitionTo(InventoryReserved),
            When(OrderTimeoutExpired)
                .Then(context => HandleTimeout(context))
                .Publish(context => new CancelOrderCommand { OrderId = context.Saga.OrderId })
                .TransitionTo(OrderFailed));
    }
}

NServiceBus Traditional Saga

Multi-Step Saga

public class OrderProcessingSaga :
    Saga<OrderProcessingSagaData>,
    IAmStartedByMessages<OrderCreatedEvent>,
    IHandleMessages<PaymentProcessedEvent>,
    IHandleMessages<InventoryReservedEvent>,
    IHandleTimeouts<OrderTimeoutExpired>
{
    private readonly ILogger<OrderProcessingSaga> logger;

    public OrderProcessingSaga(ILogger<OrderProcessingSaga> logger)
    {
        this.logger = logger;
    }

    // Start saga
    public async Task Handle(OrderCreatedEvent message, IMessageHandlerContext context)
    {
        Data.OrderId = message.OrderId;
        Data.CustomerId = message.CustomerId;
        Data.Status = OrderStatus.Processing;
        Data.StartedAt = DateTimeOffset.UtcNow;

        // Request timeout
        await RequestTimeout<OrderTimeoutExpired>(
            context,
            TimeSpan.FromMinutes(30));

        // Send command to process payment
        await context.Send(new ProcessPaymentCommand
        {
            OrderId = message.OrderId,
            Amount = message.TotalAmount
        });
    }

    // Continue saga
    public async Task Handle(PaymentProcessedEvent message, IMessageHandlerContext context)
    {
        Data.PaymentId = message.PaymentId;
        Data.PaymentProcessedAt = DateTimeOffset.UtcNow;

        // Send command to reserve inventory
        await context.Send(new ReserveInventoryCommand
        {
            OrderId = Data.OrderId,
            Items = Data.OrderItems
        });
    }

    // Complete saga
    public async Task Handle(InventoryReservedEvent message, IMessageHandlerContext context)
    {
        Data.InventoryReservedAt = DateTimeOffset.UtcNow;
        Data.Status = OrderStatus.Completed;

        // Mark saga as complete
        MarkAsComplete();

        // Publish completion event
        await context.Publish(new OrderCompletedEvent
        {
            OrderId = Data.OrderId
        });
    }

    // Handle timeout
    public async Task Timeout(OrderTimeoutExpired state, IMessageHandlerContext context)
    {
        logger.LogWarning("Order {OrderId} timed out", Data.OrderId);

        Data.Status = OrderStatus.Failed;

        await context.Send(new CancelOrderCommand
        {
            OrderId = Data.OrderId
        });

        MarkAsComplete();
    }

    protected override void ConfigureHowToFindSaga(
        SagaPropertyMapper<OrderProcessingSagaData> mapper)
    {
        mapper.MapSaga(saga => saga.OrderId)
            .ToMessage<OrderCreatedEvent>(msg => msg.OrderId)
            .ToMessage<PaymentProcessedEvent>(msg => msg.OrderId)
            .ToMessage<InventoryReservedEvent>(msg => msg.OrderId);
    }
}

Configuration

MassTransit Saga Configuration

// MassTransitExtensions.cs
public static IServiceCollection AddMicroserviceMassTransit(
    this IServiceCollection services)
{
    services.AddMassTransit(x =>
    {
        // Register saga state machine
        x.AddSagaStateMachine<MicroserviceAggregateRootLifeCycleSaga, 
            MicroserviceAggregateRootLifeCycleSagaState>()
            .MongoDbRepository(r =>
            {
                r.Connection = "mongodb://localhost:27017";
                r.DatabaseName = "sagas";
                r.CollectionName = nameof(MicroserviceAggregateRootLifeCycleSaga);
            });

        x.UsingRabbitMq((context, cfg) =>
        {
            cfg.ConfigureEndpoints(context);
        });
    });

    return services;
}

NServiceBus Saga Configuration

// NServiceBusExtensions.cs
public static IServiceCollection AddMicroserviceNServiceBus(
    this IServiceCollection services)
{
    var endpointConfiguration = new EndpointConfiguration("MyService");

    // Configure saga persistence
    var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
    persistence.SqlDialect<SqlDialect.MsSqlServer>();
    persistence.ConnectionBuilder(() => new SqlConnection(connectionString));

    // Register saga
    endpointConfiguration.RegisterComponents(registration =>
    {
        registration.ConfigureComponent<MicroserviceAggregateRootLifeCycleSaga>(
            DependencyLifecycle.InstancePerCall);
    });

    // Enable saga auditing
    endpointConfiguration.AuditSagaStateChanges(
        serviceControlQueue: "audit-queue");

    return services;
}

Best Practices

Do's

  1. Design sagas for idempotency
  2. Handle duplicate messages gracefully
  3. Use versioning for optimistic concurrency

  4. Implement compensation for failures

  5. Provide compensating actions for each step
  6. Ensure system can recover from partial failures

  7. Set appropriate timeouts

  8. Use timeouts to detect stuck processes
  9. Implement timeout handling and cleanup

  10. Keep saga state minimal

  11. Store only necessary data in saga state
  12. Avoid storing large objects or collections

  13. Use correlation IDs correctly

  14. Map messages to sagas using business identifiers
  15. Ensure correlation works across service boundaries

  16. Handle faults explicitly

  17. Define fault states and compensation
  18. Log failures for observability

Don'ts

  1. Don't perform business logic in saga state machine
  2. Use sagas for orchestration, not business rules
  3. Delegate business logic to domain services

  4. Don't create too many saga instances

  5. Design sagas to complete in reasonable time
  6. Consider if process truly needs saga pattern

  7. Don't store transient data in saga state

  8. Use saga state for recovery and correlation only
  9. Avoid storing UI state or session data

  10. Don't ignore version conflicts

  11. Handle optimistic concurrency exceptions
  12. Implement retry logic for version conflicts

  13. Don't create circular dependencies

  14. Avoid sagas that trigger each other infinitely
  15. Design clear process boundaries

Integration Points

Event-Driven Architecture

Sagas integrate with event-driven architecture:

// Saga publishes events when state changes
context.Publish(new OrderStatusChangedEvent
{
    OrderId = context.Saga.OrderId,
    Status = OrderStatus.Processing
});

Command Pattern

Sagas send commands to other services:

// Saga sends commands to perform actions
await context.Send(new ProcessPaymentCommand
{
    OrderId = saga.OrderId,
    Amount = saga.TotalAmount
});

Domain Events

Sagas react to domain events:

// Saga handles domain events
public async Task Handle(OrderCreatedEvent message, IMessageHandlerContext context)
{
    // React to domain event
}

Common Scenarios

Scenario 1: Order Processing Workflow

// MassTransit State Machine
public class OrderProcessingSaga : MassTransitStateMachine<OrderProcessingSagaState>
{
    // States: Created -> PaymentProcessing -> InventoryReserved -> Shipping -> Completed
    // Failure states: PaymentFailed, InventoryUnavailable, ShippingFailed

    Initially(
        When(OrderCreated)
            .Publish(ProcessPaymentCommand)
            .TransitionTo(PaymentProcessing));

    During(PaymentProcessing,
        When(PaymentProcessed)
            .Publish(ReserveInventoryCommand)
            .TransitionTo(InventoryReserved),
        When(PaymentFailed)
            .Publish(CancelOrderCommand)
            .TransitionTo(Failed));

    During(InventoryReserved,
        When(InventoryReserved)
            .Publish(CreateShipmentCommand)
            .TransitionTo(Shipping),
        When(InventoryUnavailable)
            .Publish(RefundPaymentCommand)
            .TransitionTo(Failed));
}

Scenario 2: Compensation Pattern

// When payment succeeds but inventory fails, compensate payment
During(InventoryReserved,
    When(InventoryReserved)
        .TransitionTo(Completed),
    When(InventoryUnavailable)
        .Then(context =>
        {
            // Compensate: refund payment
            context.Publish(new RefundPaymentCommand
            {
                PaymentId = context.Saga.PaymentId,
                Amount = context.Saga.TotalAmount
            });
        })
        .TransitionTo(Failed));

Scenario 3: Long-Running Process with Timeouts

// Saga with timeout for human approval step
During(ApprovalPending,
    When(Approved)
        .ContinueWithNextStep(),
    When(TimeoutExpired)
        .Then(context =>
        {
            // Auto-reject after timeout
            context.Publish(new RejectCommand
            {
                RequestId = context.Saga.RequestId,
                Reason = "Timeout expired"
            });
        })
        .TransitionTo(Rejected));

Troubleshooting

Issue: Saga not starting

Cause: Event correlation may be incorrect or saga not registered.

Solution: Verify correlation mapping:

// Ensure correlation is correctly configured
Event(() => OrderCreatedEvent, e => e
    .CorrelateBy<Guid>(
        state => state.OrderId,
        context => context.Message.OrderId) // Must match!
    .InsertOnInitial = true);

Issue: Saga state not persisting

Cause: Saga repository not configured or connection string incorrect.

Solution: Verify saga repository configuration:

// MassTransit MongoDB
.AddSagaStateMachine<MySaga, MySagaState>()
.MongoDbRepository(r =>
{
    r.Connection = connectionString; // Verify connection
    r.DatabaseName = "sagas";
});

// NServiceBus SQL
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.SqlDialect<SqlDialect.MsSqlServer>();
persistence.ConnectionBuilder(() => new SqlConnection(connectionString));

Issue: Messages not correlating to saga

Cause: Message property doesn't match saga correlation property.

Solution: Ensure consistent correlation:

// NServiceBus saga finder
protected override void ConfigureHowToFindSaga(
    SagaPropertyMapper<SagaData> mapper)
{
    // Property names must match exactly
    mapper.MapSaga(saga => saga.OrderId) // Saga property
        .ToMessage<OrderCreatedEvent>(msg => msg.OrderId); // Message property
}

Issue: Version conflicts

Cause: Multiple messages processed concurrently on same saga instance.

Solution: Implement retry logic:

// MassTransit automatically retries on concurrency exceptions
// NServiceBus - configure retry policy
var recoverability = endpointConfiguration.Recoverability();
recoverability.Immediate(immediate => immediate.NumberOfRetries(3));
recoverability.Delayed(delayed => delayed.NumberOfRetries(5));

References

Summary

The Saga Pattern in ConnectSoft Microservice Template:

  • Provides orchestration for long-running processes
  • Ensures eventual consistency across services
  • Supports compensation for failure recovery
  • Maintains persistent state across restarts
  • Works with MassTransit (state machines) and NServiceBus (traditional)
  • Handles faults and timeouts gracefully
  • Enables distributed transactions without traditional ACID guarantees

This pattern is essential for building reliable, distributed systems where business processes span multiple services and require coordination and state management.