Skip to content

Outbox Pattern in ConnectSoft Microservice Template

Purpose & Overview

The Outbox Pattern is a critical pattern for ensuring reliable message publishing in distributed systems. It guarantees exactly-once message delivery by storing outbound messages in the same database transaction as the business data, then publishing them asynchronously. This prevents message loss or duplicate publishing when services fail or restart.

Why Outbox Pattern?

The Outbox Pattern solves critical problems in event-driven architectures:

  • Atomicity: Ensures database changes and message publishing happen atomically
  • Exactly-Once Delivery: Prevents message loss or duplicate publishing
  • Failure Recovery: Messages are persisted and can be retried after service restart
  • Transactional Consistency: Guarantees messages are only published after successful transaction commit
  • Reliability: Provides durability guarantees for message publishing

How It's Implemented in Template

Architecture Overview

The Outbox Pattern works in conjunction with:

  • MassTransit - Supports outbox with Entity Framework Core and MongoDB
  • NServiceBus - Built-in outbox feature
  • Unit of Work - Transactional boundaries for outbox operations
Application Service
    ↓
1. Save business data to database
2. Save outbound messages to Outbox table (same transaction)
    ↓
Transaction Commit
    ↓
Background Process (Outbox Processor)
    ↓
3. Read messages from Outbox
4. Publish to message broker
5. Mark messages as published

Core Components

1. Outbox Table Structure

The outbox table stores outbound messages:

-- Entity Framework Core Outbox
CREATE TABLE OutboxMessage (
    Id UNIQUEIDENTIFIER PRIMARY KEY,
    MessageId UNIQUEIDENTIFIER NOT NULL,
    MessageType NVARCHAR(500) NOT NULL,
    MessageBody NVARCHAR(MAX) NOT NULL,
    Destination NVARCHAR(500),
    CreatedAt DATETIME2 NOT NULL,
    PublishedAt DATETIME2 NULL,
    ProcessedAt DATETIME2 NULL,
    RetryCount INT NOT NULL DEFAULT 0,
    ErrorMessage NVARCHAR(MAX) NULL
);

CREATE INDEX IX_OutboxMessage_ProcessedAt 
    ON OutboxMessage (ProcessedAt) 
    WHERE ProcessedAt IS NULL;

2. MassTransit Outbox Configuration

MassTransit supports outbox via Entity Framework Core or MongoDB:

// Entity Framework Core Outbox
services.AddMassTransit(x =>
{
    x.AddEntityFrameworkOutbox<MyDbContext>(o =>
    {
        o.QueryDelay = TimeSpan.FromSeconds(10);
        o.UsePostgres(); // or UseSqlServer(), UseMySql(), etc.

        o.UseBusOutbox(bus =>
        {
            bus.Durable = true;
            bus.ConcurrentMessageLimit = 10;
        });
    });

    x.AddConsumer<MyConsumer>();

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.ConfigureEndpoints(context);
    });
});

// MongoDB Outbox
services.AddMassTransit(x =>
{
    x.AddMongoDbOutbox(o =>
    {
        o.Connection = "mongodb://localhost:27017";
        o.DatabaseName = "outbox";
        o.CollectionName = "outbox";
    });

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.ConfigureEndpoints(context);
    });
});

3. NServiceBus Outbox Configuration

NServiceBus has built-in outbox support:

// NServiceBusExtensions.cs
public static IServiceCollection AddMicroserviceNServiceBus(
    this IServiceCollection services)
{
    var endpointConfiguration = new EndpointConfiguration("MyService");

    // Enable outbox
    var outboxSettings = endpointConfiguration.EnableOutbox();

    // Configure outbox persistence
    var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
    persistence.SqlDialect<SqlDialect.MsSqlServer>();
    persistence.ConnectionBuilder(() => new SqlConnection(connectionString));

    // Outbox settings
    outboxSettings.TimeToKeepDeduplicationData(TimeSpan.FromDays(7));
    outboxSettings.DisableCleanup(); // Optional: for testing

    return services;
}

Project Structure

Component Responsibility
MassTransit.Extensions Outbox infrastructure for MassTransit
NServiceBus Outbox Built-in outbox feature
Database Schema Outbox table/collection for message storage
Background Process Outbox processor that publishes messages

Code Examples

Using Outbox with MassTransit

Publishing Messages Within Transaction

public class OrderService
{
    private readonly IBus bus;
    private readonly MyDbContext dbContext;
    private readonly IPublishEndpoint publishEndpoint;

    public async Task CreateOrderAsync(CreateOrderCommand command)
    {
        // Begin transaction
        await using var transaction = await dbContext.Database.BeginTransactionAsync();

        try
        {
            // Save business data
            var order = new Order
            {
                Id = command.OrderId,
                CustomerId = command.CustomerId,
                TotalAmount = command.TotalAmount
            };
            dbContext.Orders.Add(order);

            // Publish message (stored in outbox, not sent immediately)
            await publishEndpoint.Publish(new OrderCreatedEvent
            {
                OrderId = order.Id,
                CustomerId = order.CustomerId,
                TotalAmount = order.TotalAmount
            });

            // Commit transaction (outbox messages committed atomically)
            await dbContext.SaveChangesAsync();
            await transaction.CommitAsync();
        }
        catch
        {
            await transaction.RollbackAsync();
            throw;
        }
    }
}

Using Outbox with Consume Context

public class OrderCreatedConsumer : IConsumer<OrderCreatedEvent>
{
    private readonly MyDbContext dbContext;

    public async Task Consume(ConsumeContext<OrderCreatedEvent> context)
    {
        // MassTransit automatically uses outbox for messages published
        // within a consumer's scope

        // Save business data
        var order = new Order { /* ... */ };
        dbContext.Orders.Add(order);
        await dbContext.SaveChangesAsync();

        // Publish message (automatically goes to outbox)
        await context.Publish(new InventoryReservationRequestedEvent
        {
            OrderId = order.Id,
            Items = order.Items
        });

        // All operations are within outbox transaction
    }
}

Using Outbox with NServiceBus

Publishing Messages with Outbox

public class OrderHandler : IHandleMessages<CreateOrderCommand>
{
    private readonly MyDbContext dbContext;

    public async Task Handle(CreateOrderCommand message, IMessageHandlerContext context)
    {
        // Save business data
        var order = new Order { /* ... */ };
        dbContext.Orders.Add(order);
        await dbContext.SaveChangesAsync();

        // Publish message (NServiceBus automatically uses outbox)
        await context.Publish(new OrderCreatedEvent
        {
            OrderId = order.Id,
            CustomerId = order.CustomerId
        });

        // Transaction commits, messages go to outbox automatically
    }
}

Sending Commands with Outbox

public class OrderHandler : IHandleMessages<CreateOrderCommand>
{
    public async Task Handle(CreateOrderCommand message, IMessageHandlerContext context)
    {
        // Process order
        var order = ProcessOrder(message);

        // Send command (outbox ensures delivery)
        await context.Send(new ProcessPaymentCommand
        {
            OrderId = order.Id,
            Amount = order.TotalAmount
        });
    }
}

Custom Outbox Implementation

If you need a custom outbox implementation:

public interface IOutboxRepository
{
    Task SaveOutboxMessagesAsync(
        IEnumerable<OutboxMessage> messages,
        CancellationToken ct);

    Task<IEnumerable<OutboxMessage>> GetUnpublishedMessagesAsync(
        int batchSize,
        CancellationToken ct);

    Task MarkAsPublishedAsync(
        Guid messageId,
        CancellationToken ct);

    Task IncrementRetryCountAsync(
        Guid messageId,
        string errorMessage,
        CancellationToken ct);
}

public class OutboxMessage
{
    public Guid Id { get; set; }
    public Guid MessageId { get; set; }
    public string MessageType { get; set; } = null!;
    public string MessageBody { get; set; } = null!;
    public string? Destination { get; set; }
    public DateTimeOffset CreatedAt { get; set; }
    public DateTimeOffset? PublishedAt { get; set; }
    public int RetryCount { get; set; }
    public string? ErrorMessage { get; set; }
}

Configuration

MassTransit Entity Framework Outbox

// Startup.cs or Program.cs
services.AddDbContext<MyDbContext>(options =>
    options.UseSqlServer(connectionString));

services.AddMassTransit(x =>
{
    // Add outbox support
    x.AddEntityFrameworkOutbox<MyDbContext>(o =>
    {
        // Configure outbox query delay
        o.QueryDelay = TimeSpan.FromSeconds(10);

        // Configure database provider
        o.UseSqlServer(); // or UsePostgres(), UseMySql()

        // Configure bus outbox
        o.UseBusOutbox(bus =>
        {
            bus.Durable = true;
            bus.ConcurrentMessageLimit = 10;
        });
    });

    // Register consumers
    x.AddConsumer<OrderCreatedConsumer>();

    // Configure transport
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.ConfigureEndpoints(context);
    });
});

// Add hosted service to process outbox
services.AddHostedService<MassTransitOutboxProcessorHostedService>();

MassTransit MongoDB Outbox

services.AddMassTransit(x =>
{
    x.AddMongoDbOutbox(o =>
    {
        o.Connection = "mongodb://localhost:27017";
        o.DatabaseName = "microservice";
        o.CollectionName = "outbox";

        // Configure query delay
        o.QueryDelay = TimeSpan.FromSeconds(10);

        // Configure cleanup
        o.DisableCleanup = false;
        o.DuplicateDetectionWindow = TimeSpan.FromDays(7);
    });

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.ConfigureEndpoints(context);
    });
});

NServiceBus Outbox

// NServiceBusExtensions.cs
public static IServiceCollection AddMicroserviceNServiceBus(
    this IServiceCollection services,
    IConfiguration configuration)
{
    var endpointConfiguration = new EndpointConfiguration("MyService");

    // Enable outbox
    var outboxSettings = endpointConfiguration.EnableOutbox();

    // Configure persistence (required for outbox)
    var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
    persistence.SqlDialect<SqlDialect.MsSqlServer>();
    persistence.ConnectionBuilder(() => 
        new SqlConnection(configuration.GetConnectionString("DefaultConnection")));

    // Configure outbox settings
    outboxSettings.TimeToKeepDeduplicationData(TimeSpan.FromDays(7));
    outboxSettings.DisableCleanup(); // Optional: for development

    // Transport configuration
    var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
    transport.ConnectionString(configuration.GetConnectionString("RabbitMQ"));
    transport.UseConventionalRoutingTopology();

    return services;
}

Best Practices

Do's

  1. Always use outbox for critical messages
  2. Ensure important events/commands use outbox
  3. Prevents message loss during failures

  4. Configure appropriate query delay

  5. Balance between latency and throughput
  6. Typical values: 5-30 seconds

  7. Implement deduplication

  8. Use message IDs to prevent duplicate processing
  9. Configure retention window appropriately

  10. Monitor outbox metrics

  11. Track messages pending publication
  12. Alert on high retry counts
  13. Monitor outbox table growth

  14. Handle outbox cleanup

  15. Archive or delete old published messages
  16. Prevent outbox table from growing unbounded

  17. Use same database for outbox

  18. Store outbox in same database as business data
  19. Ensures transactional consistency

Don'ts

  1. Don't bypass outbox for critical messages
  2. Always use outbox when publishing from within transactions
  3. Direct publishing can cause message loss

  4. Don't set query delay too low

  5. Very low delays can impact database performance
  6. Balance latency vs. resource usage

  7. Don't ignore outbox failures

  8. Monitor and alert on outbox processing failures
  9. Implement retry logic for failed publications

  10. Don't store large messages in outbox

  11. Keep message bodies reasonable size
  12. Consider using message references for large payloads

  13. Don't forget to clean up old messages

  14. Implement cleanup job for published messages
  15. Configure retention policies

Integration Points

Unit of Work Pattern

Outbox integrates with Unit of Work:

// Outbox messages stored within same transaction
unitOfWork.ExecuteTransactional(() =>
{
    repository.Save(entity);
    outboxRepository.SaveMessage(eventMessage); // Same transaction
});

Domain Events

Outbox publishes domain events:

// Domain event automatically stored in outbox
await eventBus.PublishEvent(new OrderCreatedEvent
{
    OrderId = order.Id
});

Transaction Management

Outbox ensures transactional consistency:

// All operations in single transaction
await using var transaction = await dbContext.Database.BeginTransactionAsync();
try
{
    // Business data
    await dbContext.SaveChangesAsync();

    // Outbox messages
    await publishEndpoint.Publish(events);

    // Commit atomically
    await transaction.CommitAsync();
}
catch
{
    await transaction.RollbackAsync();
    throw;
}

Common Scenarios

Scenario 1: Order Creation with Events

public async Task<Order> CreateOrderAsync(CreateOrderCommand command)
{
    await using var transaction = await dbContext.Database.BeginTransactionAsync();

    try
    {
        // Create order
        var order = new Order { /* ... */ };
        dbContext.Orders.Add(order);

        // Publish domain events (goes to outbox)
        await publishEndpoint.Publish(new OrderCreatedEvent
        {
            OrderId = order.Id,
            CustomerId = order.CustomerId,
            TotalAmount = order.TotalAmount
        });

        await dbContext.SaveChangesAsync();
        await transaction.CommitAsync();

        return order;
    }
    catch
    {
        await transaction.RollbackAsync();
        throw;
    }
}

Scenario 2: Compensating Actions

// When order cancellation happens, publish compensation events via outbox
public async Task CancelOrderAsync(Guid orderId)
{
    await using var transaction = await dbContext.Database.BeginTransactionAsync();

    try
    {
        var order = await dbContext.Orders.FindAsync(orderId);
        order.Status = OrderStatus.Cancelled;

        // Publish compensation events (outbox ensures delivery)
        await publishEndpoint.Publish(new OrderCancelledEvent
        {
            OrderId = orderId
        });

        await publishEndpoint.Publish(new RefundPaymentCommand
        {
            PaymentId = order.PaymentId,
            Amount = order.TotalAmount
        });

        await dbContext.SaveChangesAsync();
        await transaction.CommitAsync();
    }
    catch
    {
        await transaction.RollbackAsync();
        throw;
    }
}

Scenario 3: Batch Operations

// Multiple messages in single transaction
public async Task ProcessBatchAsync(BatchData batch)
{
    await using var transaction = await dbContext.Database.BeginTransactionAsync();

    try
    {
        // Save multiple entities
        dbContext.Entities.AddRange(batch.Entities);

        // Publish multiple events (all in outbox)
        foreach (var entity in batch.Entities)
        {
            await publishEndpoint.Publish(new EntityCreatedEvent
            {
                EntityId = entity.Id
            });
        }

        await dbContext.SaveChangesAsync();
        await transaction.CommitAsync();
    }
    catch
    {
        await transaction.RollbackAsync();
        throw;
    }
}

Troubleshooting

Issue: Messages not being published

Cause: Outbox processor not running or query delay too high.

Solution: - Verify outbox processor hosted service is running - Check outbox table for unpublished messages - Reduce query delay if latency is acceptable - Check logs for outbox processing errors

Issue: Duplicate messages being published

Cause: Outbox deduplication not configured or message IDs not set.

Solution: - Enable deduplication in outbox configuration - Ensure message IDs are unique - Configure appropriate deduplication window

Issue: Outbox table growing too large

Cause: Cleanup not configured or failing.

Solution: - Enable outbox cleanup - Configure retention policy - Implement custom cleanup job if needed - Archive old messages before deletion

Issue: High database load from outbox queries

Cause: Query delay too low or concurrent limit too high.

Solution: - Increase query delay - Reduce concurrent message limit - Optimize outbox table indexes - Consider separate database for outbox

Issue: Messages published but consumers not receiving

Cause: Message broker connectivity or routing issue.

Solution: - Verify message broker connection - Check message routing configuration - Verify consumer subscriptions - Check message broker logs

References

Summary

The Outbox Pattern in ConnectSoft Microservice Template:

  • Ensures atomic transactions between database and message publishing
  • Provides exactly-once message delivery guarantees
  • Prevents message loss during service failures
  • Supports reliable event publishing from domain operations
  • Works with MassTransit (EF Core, MongoDB) and NServiceBus
  • Enables eventual consistency across services
  • Provides failure recovery through message persistence

This pattern is essential for building reliable, event-driven microservices where message publishing must be transactional and guaranteed.