Skip to content

Flow Model in ConnectSoft Templates

Purpose & Overview

The Flow Model defines the contracts and abstractions for long-running business processes orchestrated through sagas. It provides a clean separation between orchestration contracts (FlowModel) and business message contracts (MessagingModel), enabling saga orchestration patterns while maintaining transport-agnostic abstractions.

Why Flow Model?

The Flow Model offers several key benefits:

  • Separation of Concerns: Orchestration contracts (sagas, timeouts) separate from business messages (commands, events)
  • Saga Contracts: Defines timeout messages and saga-specific message types
  • Technology Agnostic: Flow contracts work with both MassTransit and NServiceBus
  • Clean Architecture: FlowModel sits between MessagingModel and saga implementations
  • Timeout Support: Centralized timeout message contracts for saga orchestration
  • Flow Orchestration: Contracts for complex, multi-step business processes

FlowModel vs MessagingModel Philosophy

MessagingModel contains business messages (commands, events) that represent business operations. FlowModel contains orchestration messages (timeouts, saga-specific messages) that manage the flow and state of long-running processes. This separation ensures clear boundaries between business logic and orchestration concerns.

Architecture Overview

Flow Model Position in Clean Architecture

Service Layer (API)
Application Layer (Use Cases)
MessagingModel (Business Messages)
    ├── Commands (ICommand)
    └── Events (IEvent)
FlowModel (Orchestration Contracts)
    ├── Timeout Messages
    ├── Saga-Specific Messages
    └── Flow Contracts
FlowModel.MassTransit / FlowModel.NServiceBus (Implementations)
    ├── Saga State Machines
    ├── Saga Data
    └── Saga Handlers

Project Structure

ConnectSoft.Template.FlowModel/
    ├── Timeouts/
    │   ├── OrderTimeoutExpired.cs
    │   └── PaymentTimeoutExpired.cs
    ├── SagaMessages/
    │   └── (Saga-specific message contracts)
    └── Contracts/
        └── (Flow orchestration contracts)

ConnectSoft.Template.FlowModel.MassTransit/
    ├── MicroserviceAggregateRootLifeCycleSaga.cs (State Machine)
    ├── MicroserviceAggregateRootLifeCycleSagaState.cs (State)
    └── MicroserviceAggregateRootLifeCycleSagaStateClassMap.cs (MongoDB Mapping)

ConnectSoft.Template.FlowModel.NServiceBus/
    ├── MicroserviceAggregateRootLifeCycleSaga.cs (Saga Handler)
    ├── MicroserviceAggregateRootSagaData.cs (Saga Data)
    └── (SQL Scripts for persistence)

FlowModel vs MessagingModel

Key Differences

Aspect MessagingModel FlowModel
Purpose Business operations Process orchestration
Message Types Commands, Events, Queries Timeouts, Saga Messages
Lifecycle Stateless (fire-and-forget or request-response) Stateful (long-running processes)
Scope Single operation Multi-step process
Correlation Business identifier Saga correlation ID
Usage Direct service-to-service communication Saga orchestration and state management
Persistence Not persisted (eventually persisted via outbox) Saga state persisted
Examples CreateOrderCommand, OrderCreatedEvent OrderTimeoutExpired, PaymentTimeoutExpired

Relationship

Business Flow:
1. Service receives CreateOrderCommand (MessagingModel)
2. Service publishes OrderCreatedEvent (MessagingModel)
3. Saga starts, orchestrates flow
4. Saga schedules OrderTimeoutExpired (FlowModel)
5. Saga handles PaymentProcessedEvent (MessagingModel)
6. Saga handles OrderTimeoutExpired (FlowModel) if timeout occurs

FlowModel orchestrates the process using MessagingModel messages: - Sagas react to MessagingModel events - Sagas send MessagingModel commands - Sagas use FlowModel timeouts for orchestration control

FlowModel Contract Types

1. Timeout Messages

Timeout messages are saga-specific messages used to manage time-based orchestration. They are:

  • Scheduled: Set to fire at a specific time or after a duration
  • Correlated: Linked to a specific saga instance
  • Stateful: Carry saga state information
  • Technology-Specific: Implementation differs between MassTransit and NServiceBus

Timeout Message Contract

namespace ConnectSoft.Template.FlowModel
{
    using System;
    using ConnectSoft.Extensions.MessagingModel;

    /// <summary>
    /// Timeout message indicating that an order processing timeout has expired.
    /// </summary>
    /// <remarks>
    /// This message is used by sagas to handle time-based orchestration.
    /// It is scheduled by the saga and delivered when the timeout expires.
    /// </remarks>
    public class OrderTimeoutExpired : IEvent
    {
        /// <summary>
        /// Gets or sets the order identifier.
        /// </summary>
        /// <remarks>
        /// This identifier is used to correlate the timeout with the saga instance.
        /// It should match the saga's correlation identifier.
        /// </remarks>
        public Guid OrderId { get; set; }

        /// <summary>
        /// Gets or sets the timeout reason or context.
        /// </summary>
        public string? Reason { get; set; }

        /// <summary>
        /// Gets or sets the timestamp when the timeout was originally scheduled.
        /// </summary>
        public DateTimeOffset ScheduledAt { get; set; }

        /// <summary>
        /// Gets or sets the timestamp when the timeout expired.
        /// </summary>
        public DateTimeOffset ExpiredAt { get; set; }
    }
}

Characteristics: - Implements IEvent (from MessagingModel) for consistency - Contains correlation identifier (OrderId) - May include context information (Reason) - Includes timing information for observability

Multiple Timeout Types

Different timeout types can be defined for different orchestration scenarios:

namespace ConnectSoft.Template.FlowModel
{
    /// <summary>
    /// Timeout for payment processing step.
    /// </summary>
    public class PaymentTimeoutExpired : IEvent
    {
        public Guid OrderId { get; set; }
        public Guid PaymentId { get; set; }
        public DateTimeOffset ScheduledAt { get; set; }
        public DateTimeOffset ExpiredAt { get; set; }
    }

    /// <summary>
    /// Timeout for inventory reservation step.
    /// </summary>
    public class InventoryReservationTimeoutExpired : IEvent
    {
        public Guid OrderId { get; set; }
        public DateTimeOffset ScheduledAt { get; set; }
        public DateTimeOffset ExpiredAt { get; set; }
    }

    /// <summary>
    /// Timeout for shipping confirmation step.
    /// </summary>
    public class ShippingConfirmationTimeoutExpired : IEvent
    {
        public Guid OrderId { get; set; }
        public Guid ShipmentId { get; set; }
        public DateTimeOffset ScheduledAt { get; set; }
        public DateTimeOffset ExpiredAt { get; set; }
    }
}

2. Saga-Specific Messages

Saga-specific messages are messages that are only used within saga orchestration and are not part of the general business messaging model. These include:

  • State transition messages: Messages that trigger state changes in sagas
  • Compensation messages: Messages used for compensating actions
  • Internal orchestration messages: Messages used for saga-to-saga communication

Example: Saga State Transition Message

namespace ConnectSoft.Template.FlowModel
{
    using System;
    using ConnectSoft.Extensions.MessagingModel;

    /// <summary>
    /// Message indicating that a saga step has completed.
    /// </summary>
    /// <remarks>
    /// Used internally by sagas to coordinate multi-step processes.
    /// Not published to external services.
    /// </remarks>
    public class SagaStepCompleted : IEvent
    {
        public Guid SagaId { get; set; }
        public string StepName { get; set; } = null!;
        public DateTimeOffset CompletedAt { get; set; }
    }
}

3. Flow Contracts

Flow contracts define the structure and behavior of orchestrated flows:

Flow Contract Interface

namespace ConnectSoft.Template.FlowModel
{
    using System;
    using System.Threading.Tasks;

    /// <summary>
    /// Contract for flow orchestration.
    /// </summary>
    /// <typeparam name="TState">The type of flow state.</typeparam>
    public interface IFlowOrchestrator<TState>
        where TState : class
    {
        /// <summary>
        /// Starts a new flow instance.
        /// </summary>
        /// <param name="correlationId">The correlation identifier.</param>
        /// <param name="initialState">The initial state.</param>
        /// <returns>A task representing the asynchronous operation.</returns>
        Task StartFlowAsync(Guid correlationId, TState initialState);

        /// <summary>
        /// Continues an existing flow instance.
        /// </summary>
        /// <param name="correlationId">The correlation identifier.</param>
        /// <param name="message">The message to process.</param>
        /// <returns>A task representing the asynchronous operation.</returns>
        Task ContinueFlowAsync<TMessage>(Guid correlationId, TMessage message)
            where TMessage : class;

        /// <summary>
        /// Completes a flow instance.
        /// </summary>
        /// <param name="correlationId">The correlation identifier.</param>
        /// <returns>A task representing the asynchronous operation.</returns>
        Task CompleteFlowAsync(Guid correlationId);

        /// <summary>
        /// Compensates a flow instance (rollback).
        /// </summary>
        /// <param name="correlationId">The correlation identifier.</param>
        /// <returns>A task representing the asynchronous operation.</returns>
        Task CompensateFlowAsync(Guid correlationId);
    }
}

Saga Orchestration Patterns

Pattern 1: Timeout-Based Orchestration

Use timeouts to handle scenarios where a step may not complete within expected time:

// FlowModel: OrderTimeoutExpired.cs
public class OrderTimeoutExpired : IEvent
{
    public Guid OrderId { get; set; }
    public DateTimeOffset ScheduledAt { get; set; }
    public DateTimeOffset ExpiredAt { get; set; }
}

// FlowModel.MassTransit: OrderProcessingSaga.cs
public class OrderProcessingSaga : MassTransitStateMachine<OrderProcessingSagaState>
{
    public Event<OrderTimeoutExpired>? OrderTimeoutExpired { get; }

    public OrderProcessingSaga()
    {
        During(PaymentProcessing,
            When(PaymentProcessedEvent)
                .Schedule(OrderTimeoutExpired, context => new OrderTimeoutExpired
                {
                    OrderId = context.Saga.OrderId,
                    ScheduledAt = DateTimeOffset.UtcNow
                }, context => TimeSpan.FromMinutes(30))
                .TransitionTo(InventoryReserved),
            When(OrderTimeoutExpired)
                .Then(context => HandleTimeout(context))
                .Publish(context => new CancelOrderCommand { OrderId = context.Saga.OrderId })
                .TransitionTo(OrderFailed));
    }
}

Pattern 2: Multi-Step Timeout Chain

Use multiple timeouts for different steps in a long-running process:

// FlowModel: Multiple timeout messages
public class PaymentTimeoutExpired : IEvent { public Guid OrderId { get; set; } }
public class InventoryTimeoutExpired : IEvent { public Guid OrderId { get; set; } }
public class ShippingTimeoutExpired : IEvent { public Guid OrderId { get; set; } }

// Saga schedules timeouts at each step
During(PaymentProcessing,
    When(PaymentProcessedEvent)
        .Schedule(PaymentTimeoutExpired, ...)
        .TransitionTo(InventoryReserved));

During(InventoryReserved,
    When(InventoryReservedEvent)
        .Schedule(InventoryTimeoutExpired, ...)
        .TransitionTo(Shipping));

During(Shipping,
    When(ShippingConfirmedEvent)
        .Schedule(ShippingTimeoutExpired, ...)
        .TransitionTo(Completed));

Pattern 3: Compensation Timeout

Use timeouts to trigger compensation actions:

// FlowModel: CompensationTimeoutExpired.cs
public class CompensationTimeoutExpired : IEvent
{
    public Guid OrderId { get; set; }
    public string CompensationReason { get; set; } = null!;
    public DateTimeOffset ScheduledAt { get; set; }
}

// Saga handles compensation
DuringAny(
    When(CompensationTimeoutExpired)
        .Then(context => ExecuteCompensation(context))
        .TransitionTo(Compensated));

Pattern 4: Retry with Timeout

Combine timeouts with retry logic:

// FlowModel: RetryTimeoutExpired.cs
public class RetryTimeoutExpired : IEvent
{
    public Guid OrderId { get; set; }
    public int RetryAttempt { get; set; }
    public int MaxRetries { get; set; }
}

// Saga implements retry with timeout
During(PaymentProcessing,
    When(PaymentFailed)
        .If(context => context.Saga.RetryCount < context.Saga.MaxRetries,
            then => then
                .Then(context => context.Saga.RetryCount++)
                .Schedule(RetryTimeoutExpired, ...)
                .Publish(context => new RetryPaymentCommand { ... }))
        .Else(then => then
            .TransitionTo(OrderFailed)));

MassTransit Implementation

Timeout Scheduling

MassTransit uses the Schedule method to schedule timeouts:

// FlowModel.MassTransit: OrderProcessingSaga.cs
public class OrderProcessingSaga : MassTransitStateMachine<OrderProcessingSagaState>
{
    public Event<OrderTimeoutExpired>? OrderTimeoutExpired { get; }

    public OrderProcessingSaga()
    {
        // Schedule timeout
        During(PaymentProcessing,
            When(PaymentProcessedEvent)
                .Schedule(OrderTimeoutExpired, 
                    context => new OrderTimeoutExpired
                    {
                        OrderId = context.Saga.OrderId,
                        ScheduledAt = DateTimeOffset.UtcNow
                    },
                    context => TimeSpan.FromMinutes(30))  // Timeout duration
                .TransitionTo(InventoryReserved));

        // Handle timeout
        During(PaymentProcessing,
            When(OrderTimeoutExpired)
                .Then(context => HandleTimeout(context))
                .TransitionTo(OrderFailed));
    }

    private void HandleTimeout(BehaviorContext<OrderProcessingSagaState, OrderTimeoutExpired> context)
    {
        // Log timeout
        this.logger.LogWarning(
            "Order {OrderId} timed out after {Duration}",
            context.Saga.OrderId,
            DateTimeOffset.UtcNow - context.Message.ScheduledAt);

        // Publish compensation command
        context.Publish(new CancelOrderCommand
        {
            OrderId = context.Saga.OrderId,
            Reason = "Timeout expired"
        });
    }
}

Timeout Configuration

MassTransit requires timeout persistence configuration:

// ApplicationModel: MassTransitExtensions.cs
services.AddMassTransit(x =>
{
    x.AddSagaStateMachine<OrderProcessingSaga, OrderProcessingSagaState>()
        .MongoDbRepository(r =>
        {
            r.Connection = connectionString;
            r.DatabaseName = "sagas";
        });

    // Configure timeout message scheduling
    x.UsingRabbitMq((context, cfg) =>
    {
        // Timeout messages are handled by the message scheduler
        cfg.UseMessageScheduler(new Uri("rabbitmq://localhost/quartz"));
        cfg.ConfigureEndpoints(context);
    });
});

NServiceBus Implementation

Timeout Request

NServiceBus uses RequestTimeout to schedule timeouts:

// FlowModel.NServiceBus: OrderProcessingSaga.cs
public class OrderProcessingSaga :
    Saga<OrderProcessingSagaData>,
    IAmStartedByMessages<OrderCreatedEvent>,
    IHandleTimeouts<OrderTimeoutExpired>
{
    public async Task Handle(OrderCreatedEvent message, IMessageHandlerContext context)
    {
        Data.OrderId = message.OrderId;
        Data.StartedAt = DateTimeOffset.UtcNow;

        // Request timeout
        await RequestTimeout<OrderTimeoutExpired>(
            context,
            TimeSpan.FromMinutes(30),
            new OrderTimeoutExpired
            {
                OrderId = message.OrderId,
                ScheduledAt = DateTimeOffset.UtcNow
            });

        // Continue saga
        await context.Send(new ProcessPaymentCommand { ... });
    }

    // Handle timeout
    public async Task Timeout(OrderTimeoutExpired state, IMessageHandlerContext context)
    {
        this.logger.LogWarning(
            "Order {OrderId} timed out",
            state.OrderId);

        await context.Send(new CancelOrderCommand
        {
            OrderId = state.OrderId,
            Reason = "Timeout expired"
        });

        MarkAsComplete();
    }
}

Timeout Data Structure

NServiceBus timeout messages are serialized and stored in the timeout store:

// FlowModel: OrderTimeoutExpired.cs
// The message is serialized and stored in NServiceBus timeout table
public class OrderTimeoutExpired : IEvent
{
    public Guid OrderId { get; set; }
    public DateTimeOffset ScheduledAt { get; set; }
    public DateTimeOffset ExpiredAt { get; set; }
}

Timeout Persistence Configuration

NServiceBus requires timeout persistence:

// ApplicationModel: NServiceBusExtensions.cs
var endpointConfiguration = new EndpointConfiguration("MyService");

// Configure timeout persistence
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.SqlDialect<SqlDialect.MsSqlServer>();
persistence.ConnectionBuilder(() => new SqlConnection(connectionString));

// Timeout messages are stored in TimeoutData table
// SQL scripts are provided in FlowModel.NServiceBus/NServiceBusSqlScripts/

FlowModel Project Structure

ConnectSoft.Template.FlowModel/
├── Timeouts/
│   ├── OrderTimeoutExpired.cs
│   ├── PaymentTimeoutExpired.cs
│   ├── InventoryTimeoutExpired.cs
│   └── ShippingTimeoutExpired.cs
├── SagaMessages/
│   ├── SagaStepCompleted.cs
│   ├── SagaCompensationRequested.cs
│   └── SagaStateTransition.cs
├── Contracts/
│   ├── IFlowOrchestrator.cs
│   ├── IFlowState.cs
│   └── FlowContractBase.cs
└── ConnectSoft.Template.FlowModel.csproj

Project Dependencies

FlowModel depends on: - MessagingModel: For IEvent and ICommand interfaces - DomainModel: For domain concepts (optional, if flow contracts reference domain)

FlowModel does NOT depend on: - FlowModel.MassTransit - FlowModel.NServiceBus - Any messaging framework assemblies

Best Practices

Do's

  1. Define Timeout Messages in FlowModel

    // ✅ GOOD - Timeout in FlowModel
    namespace ConnectSoft.MyService.FlowModel
    {
        public class OrderTimeoutExpired : IEvent
        {
            public Guid OrderId { get; set; }
        }
    }
    

  2. Use Descriptive Timeout Names

    // ✅ GOOD - Clear timeout purpose
    public class PaymentProcessingTimeoutExpired : IEvent { }
    
    // ❌ BAD - Unclear purpose
    public class Timeout1 : IEvent { }
    

  3. Include Correlation Information

    // ✅ GOOD - Contains correlation ID
    public class OrderTimeoutExpired : IEvent
    {
        public Guid OrderId { get; set; }  // Correlation ID
        public DateTimeOffset ScheduledAt { get; set; }
    }
    

  4. Separate Business Messages from Flow Messages

    // ✅ GOOD - Business message in MessagingModel
    // MessagingModel: OrderCreatedEvent.cs
    
    // ✅ GOOD - Flow message in FlowModel
    // FlowModel: OrderTimeoutExpired.cs
    

  5. Document Timeout Purpose

    /// <summary>
    /// Timeout message indicating that order processing has exceeded
    /// the maximum allowed duration (30 minutes).
    /// </summary>
    /// <remarks>
    /// This timeout triggers compensation actions to cancel the order
    /// and refund any payments made.
    /// </remarks>
    public class OrderTimeoutExpired : IEvent
    {
        // ...
    }
    

Don'ts

  1. Don't Put Business Messages in FlowModel

    // ❌ BAD - Business event in FlowModel
    // FlowModel: OrderCreatedEvent.cs
    
    // ✅ GOOD - Business event in MessagingModel
    // MessagingModel: OrderCreatedEvent.cs
    

  2. Don't Mix Flow Contracts with Implementation

    // ❌ BAD - Implementation in FlowModel
    // FlowModel: OrderProcessingSaga.cs (MassTransit implementation)
    
    // ✅ GOOD - Implementation in FlowModel.MassTransit
    // FlowModel.MassTransit: OrderProcessingSaga.cs
    

  3. Don't Create Too Many Timeout Types

    // ❌ BAD - Too granular
    public class PaymentStep1TimeoutExpired : IEvent { }
    public class PaymentStep2TimeoutExpired : IEvent { }
    public class PaymentStep3TimeoutExpired : IEvent { }
    
    // ✅ GOOD - Single timeout per saga step
    public class PaymentTimeoutExpired : IEvent { }
    

  4. Don't Store Large Objects in Timeout Messages

    // ❌ BAD - Large object in timeout
    public class OrderTimeoutExpired : IEvent
    {
        public Guid OrderId { get; set; }
        public OrderDetails OrderDetails { get; set; }  // Large object
    }
    
    // ✅ GOOD - Minimal data in timeout
    public class OrderTimeoutExpired : IEvent
    {
        public Guid OrderId { get; set; }  // Correlation only
    }
    

Integration with MessagingModel

Using MessagingModel Events in Sagas

Sagas react to MessagingModel events:

// MessagingModel: OrderCreatedEvent.cs
public class OrderCreatedEvent : IEvent
{
    public Guid OrderId { get; set; }
    public decimal TotalAmount { get; set; }
}

// FlowModel.MassTransit: OrderProcessingSaga.cs
public class OrderProcessingSaga : MassTransitStateMachine<OrderProcessingSagaState>
{
    public Event<OrderCreatedEvent>? OrderCreatedEvent { get; }  // From MessagingModel

    public OrderProcessingSaga()
    {
        Initially(
            When(OrderCreatedEvent)  // Reacts to MessagingModel event
                .Then(context => InitializeSaga(context))
                .Schedule(OrderTimeoutExpired, ...)  // Uses FlowModel timeout
                .TransitionTo(Processing));
    }
}

Sending MessagingModel Commands from Sagas

Sagas send MessagingModel commands:

// MessagingModel: ProcessPaymentCommand.cs
public class ProcessPaymentCommand : ICommand
{
    public Guid OrderId { get; set; }
    public decimal Amount { get; set; }
}

// FlowModel.MassTransit: OrderProcessingSaga.cs
public class OrderProcessingSaga : MassTransitStateMachine<OrderProcessingSagaState>
{
    public OrderProcessingSaga()
    {
        Initially(
            When(OrderCreatedEvent)
                .Publish(context => new ProcessPaymentCommand  // Sends MessagingModel command
                {
                    OrderId = context.Saga.OrderId,
                    Amount = context.Message.TotalAmount
                })
                .TransitionTo(PaymentProcessing));
    }
}

Common Scenarios

Scenario 1: Order Processing with Timeout

// FlowModel: OrderTimeoutExpired.cs
public class OrderTimeoutExpired : IEvent
{
    public Guid OrderId { get; set; }
    public DateTimeOffset ScheduledAt { get; set; }
}

// FlowModel.MassTransit: OrderProcessingSaga.cs
public class OrderProcessingSaga : MassTransitStateMachine<OrderProcessingSagaState>
{
    public Event<OrderTimeoutExpired>? OrderTimeoutExpired { get; }

    public OrderProcessingSaga()
    {
        Initially(
            When(OrderCreatedEvent)
                .Schedule(OrderTimeoutExpired, 
                    context => new OrderTimeoutExpired
                    {
                        OrderId = context.Saga.OrderId,
                        ScheduledAt = DateTimeOffset.UtcNow
                    },
                    TimeSpan.FromMinutes(30))
                .TransitionTo(Processing));

        During(Processing,
            When(OrderCompletedEvent)
                .Unschedule(OrderTimeoutExpired)  // Cancel timeout
                .TransitionTo(Completed),
            When(OrderTimeoutExpired)
                .Then(context => HandleTimeout(context))
                .Publish(context => new CancelOrderCommand { OrderId = context.Saga.OrderId })
                .TransitionTo(Failed));
    }
}

Scenario 2: Multi-Step Process with Step Timeouts

// FlowModel: Multiple timeout messages
public class PaymentTimeoutExpired : IEvent { public Guid OrderId { get; set; } }
public class InventoryTimeoutExpired : IEvent { public Guid OrderId { get; set; } }

// FlowModel.MassTransit: OrderProcessingSaga.cs
public class OrderProcessingSaga : MassTransitStateMachine<OrderProcessingSagaState>
{
    public Event<PaymentTimeoutExpired>? PaymentTimeoutExpired { get; }
    public Event<InventoryTimeoutExpired>? InventoryTimeoutExpired { get; }

    public OrderProcessingSaga()
    {
        During(PaymentProcessing,
            When(PaymentProcessedEvent)
                .Unschedule(PaymentTimeoutExpired)
                .Schedule(InventoryTimeoutExpired, ...)
                .TransitionTo(InventoryReserved),
            When(PaymentTimeoutExpired)
                .Then(context => HandlePaymentTimeout(context))
                .TransitionTo(Failed));

        During(InventoryReserved,
            When(InventoryReservedEvent)
                .Unschedule(InventoryTimeoutExpired)
                .TransitionTo(Completed),
            When(InventoryTimeoutExpired)
                .Then(context => HandleInventoryTimeout(context))
                .TransitionTo(Failed));
    }
}

Scenario 3: Compensation with Timeout

// FlowModel: CompensationTimeoutExpired.cs
public class CompensationTimeoutExpired : IEvent
{
    public Guid OrderId { get; set; }
    public string CompensationReason { get; set; } = null!;
}

// FlowModel.MassTransit: OrderProcessingSaga.cs
public class OrderProcessingSaga : MassTransitStateMachine<OrderProcessingSagaState>
{
    public Event<CompensationTimeoutExpired>? CompensationTimeoutExpired { get; }

    public OrderProcessingSaga()
    {
        DuringAny(
            When(OrderFailedEvent)
                .Schedule(CompensationTimeoutExpired,
                    context => new CompensationTimeoutExpired
                    {
                        OrderId = context.Saga.OrderId,
                        CompensationReason = "Order processing failed"
                    },
                    TimeSpan.FromMinutes(5))  // Quick compensation
                .TransitionTo(Compensating));

        During(Compensating,
            When(CompensationTimeoutExpired)
                .Then(context => ExecuteCompensation(context))
                .TransitionTo(Compensated));
    }
}

Troubleshooting

Issue: Timeout Not Firing

Symptoms: Timeout message scheduled but never received.

Solutions: 1. Verify timeout persistence is configured correctly 2. Check timeout message scheduler is running (MassTransit Quartz, NServiceBus timeout manager) 3. Verify timeout duration is reasonable (not too short) 4. Check saga state is persisted correctly

Issue: Timeout Firing Too Early or Late

Symptoms: Timeout fires at unexpected times.

Solutions: 1. Verify system clock synchronization (NTP) 2. Check timeout scheduling time calculation 3. Verify timeout persistence timezone handling 4. Review timeout scheduler configuration

Issue: Multiple Timeouts for Same Saga

Symptoms: Multiple timeout messages received for same saga instance.

Solutions: 1. Cancel previous timeouts before scheduling new ones 2. Use Unschedule before scheduling new timeout 3. Check timeout correlation is correct 4. Verify saga instance uniqueness

References

Summary

The Flow Model in ConnectSoft ConnectSoft Templates provides:

  • Separation of Concerns: Orchestration contracts separate from business messages
  • Timeout Support: Centralized timeout message contracts
  • Saga Contracts: Contracts for saga-specific messages
  • Technology Agnostic: Works with both MassTransit and NServiceBus
  • Clean Architecture: Proper layering between MessagingModel and implementations
  • Flow Orchestration: Contracts for long-running business processes

By following these patterns, teams can:

  • Orchestrate Complex Processes: Manage multi-step business workflows
  • Handle Timeouts: Implement time-based orchestration controls
  • Maintain Clean Boundaries: Separate business messages from orchestration
  • Support Multiple Frameworks: Use same contracts with different messaging frameworks
  • Build Reliable Systems: Implement compensation and timeout handling

The Flow Model is essential for building reliable, long-running business processes that span multiple services and require orchestration, state management, and time-based controls.