Outbox Pattern in ConnectSoft Microservice Template¶
Purpose & Overview¶
The Outbox Pattern is a critical pattern for ensuring reliable message publishing in distributed systems. It guarantees exactly-once message delivery by storing outbound messages in the same database transaction as the business data, then publishing them asynchronously. This prevents message loss or duplicate publishing when services fail or restart.
Why Outbox Pattern?¶
The Outbox Pattern solves critical problems in event-driven architectures:
- Atomicity: Ensures database changes and message publishing happen atomically
- Exactly-Once Delivery: Prevents message loss or duplicate publishing
- Failure Recovery: Messages are persisted and can be retried after service restart
- Transactional Consistency: Guarantees messages are only published after successful transaction commit
- Reliability: Provides durability guarantees for message publishing
How It's Implemented in Template¶
Architecture Overview¶
The Outbox Pattern works in conjunction with:
- MassTransit - Supports outbox with Entity Framework Core and MongoDB
- NServiceBus - Built-in outbox feature
- Unit of Work - Transactional boundaries for outbox operations
Application Service
↓
1. Save business data to database
2. Save outbound messages to Outbox table (same transaction)
↓
Transaction Commit
↓
Background Process (Outbox Processor)
↓
3. Read messages from Outbox
4. Publish to message broker
5. Mark messages as published
Core Components¶
1. Outbox Table Structure¶
The outbox table stores outbound messages:
-- Entity Framework Core Outbox
CREATE TABLE OutboxMessage (
Id UNIQUEIDENTIFIER PRIMARY KEY,
MessageId UNIQUEIDENTIFIER NOT NULL,
MessageType NVARCHAR(500) NOT NULL,
MessageBody NVARCHAR(MAX) NOT NULL,
Destination NVARCHAR(500),
CreatedAt DATETIME2 NOT NULL,
PublishedAt DATETIME2 NULL,
ProcessedAt DATETIME2 NULL,
RetryCount INT NOT NULL DEFAULT 0,
ErrorMessage NVARCHAR(MAX) NULL
);
CREATE INDEX IX_OutboxMessage_ProcessedAt
ON OutboxMessage (ProcessedAt)
WHERE ProcessedAt IS NULL;
2. MassTransit Outbox Configuration¶
MassTransit supports outbox via Entity Framework Core or MongoDB:
// Entity Framework Core Outbox
services.AddMassTransit(x =>
{
x.AddEntityFrameworkOutbox<MyDbContext>(o =>
{
o.QueryDelay = TimeSpan.FromSeconds(10);
o.UsePostgres(); // or UseSqlServer(), UseMySql(), etc.
o.UseBusOutbox(bus =>
{
bus.Durable = true;
bus.ConcurrentMessageLimit = 10;
});
});
x.AddConsumer<MyConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
// MongoDB Outbox
services.AddMassTransit(x =>
{
x.AddMongoDbOutbox(o =>
{
o.Connection = "mongodb://localhost:27017";
o.DatabaseName = "outbox";
o.CollectionName = "outbox";
});
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
3. NServiceBus Outbox Configuration¶
NServiceBus has built-in outbox support:
// NServiceBusExtensions.cs
public static IServiceCollection AddMicroserviceNServiceBus(
this IServiceCollection services)
{
var endpointConfiguration = new EndpointConfiguration("MyService");
// Enable outbox
var outboxSettings = endpointConfiguration.EnableOutbox();
// Configure outbox persistence
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.SqlDialect<SqlDialect.MsSqlServer>();
persistence.ConnectionBuilder(() => new SqlConnection(connectionString));
// Outbox settings
outboxSettings.TimeToKeepDeduplicationData(TimeSpan.FromDays(7));
outboxSettings.DisableCleanup(); // Optional: for testing
return services;
}
Project Structure¶
| Component | Responsibility |
|---|---|
MassTransit.Extensions |
Outbox infrastructure for MassTransit |
NServiceBus Outbox |
Built-in outbox feature |
| Database Schema | Outbox table/collection for message storage |
| Background Process | Outbox processor that publishes messages |
Code Examples¶
Using Outbox with MassTransit¶
Publishing Messages Within Transaction¶
public class OrderService
{
private readonly IBus bus;
private readonly MyDbContext dbContext;
private readonly IPublishEndpoint publishEndpoint;
public async Task CreateOrderAsync(CreateOrderCommand command)
{
// Begin transaction
await using var transaction = await dbContext.Database.BeginTransactionAsync();
try
{
// Save business data
var order = new Order
{
Id = command.OrderId,
CustomerId = command.CustomerId,
TotalAmount = command.TotalAmount
};
dbContext.Orders.Add(order);
// Publish message (stored in outbox, not sent immediately)
await publishEndpoint.Publish(new OrderCreatedEvent
{
OrderId = order.Id,
CustomerId = order.CustomerId,
TotalAmount = order.TotalAmount
});
// Commit transaction (outbox messages committed atomically)
await dbContext.SaveChangesAsync();
await transaction.CommitAsync();
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
}
Using Outbox with Consume Context¶
public class OrderCreatedConsumer : IConsumer<OrderCreatedEvent>
{
private readonly MyDbContext dbContext;
public async Task Consume(ConsumeContext<OrderCreatedEvent> context)
{
// MassTransit automatically uses outbox for messages published
// within a consumer's scope
// Save business data
var order = new Order { /* ... */ };
dbContext.Orders.Add(order);
await dbContext.SaveChangesAsync();
// Publish message (automatically goes to outbox)
await context.Publish(new InventoryReservationRequestedEvent
{
OrderId = order.Id,
Items = order.Items
});
// All operations are within outbox transaction
}
}
Using Outbox with NServiceBus¶
Publishing Messages with Outbox¶
public class OrderHandler : IHandleMessages<CreateOrderCommand>
{
private readonly MyDbContext dbContext;
public async Task Handle(CreateOrderCommand message, IMessageHandlerContext context)
{
// Save business data
var order = new Order { /* ... */ };
dbContext.Orders.Add(order);
await dbContext.SaveChangesAsync();
// Publish message (NServiceBus automatically uses outbox)
await context.Publish(new OrderCreatedEvent
{
OrderId = order.Id,
CustomerId = order.CustomerId
});
// Transaction commits, messages go to outbox automatically
}
}
Sending Commands with Outbox¶
public class OrderHandler : IHandleMessages<CreateOrderCommand>
{
public async Task Handle(CreateOrderCommand message, IMessageHandlerContext context)
{
// Process order
var order = ProcessOrder(message);
// Send command (outbox ensures delivery)
await context.Send(new ProcessPaymentCommand
{
OrderId = order.Id,
Amount = order.TotalAmount
});
}
}
Custom Outbox Implementation¶
If you need a custom outbox implementation:
public interface IOutboxRepository
{
Task SaveOutboxMessagesAsync(
IEnumerable<OutboxMessage> messages,
CancellationToken ct);
Task<IEnumerable<OutboxMessage>> GetUnpublishedMessagesAsync(
int batchSize,
CancellationToken ct);
Task MarkAsPublishedAsync(
Guid messageId,
CancellationToken ct);
Task IncrementRetryCountAsync(
Guid messageId,
string errorMessage,
CancellationToken ct);
}
public class OutboxMessage
{
public Guid Id { get; set; }
public Guid MessageId { get; set; }
public string MessageType { get; set; } = null!;
public string MessageBody { get; set; } = null!;
public string? Destination { get; set; }
public DateTimeOffset CreatedAt { get; set; }
public DateTimeOffset? PublishedAt { get; set; }
public int RetryCount { get; set; }
public string? ErrorMessage { get; set; }
}
Configuration¶
MassTransit Entity Framework Outbox¶
// Startup.cs or Program.cs
services.AddDbContext<MyDbContext>(options =>
options.UseSqlServer(connectionString));
services.AddMassTransit(x =>
{
// Add outbox support
x.AddEntityFrameworkOutbox<MyDbContext>(o =>
{
// Configure outbox query delay
o.QueryDelay = TimeSpan.FromSeconds(10);
// Configure database provider
o.UseSqlServer(); // or UsePostgres(), UseMySql()
// Configure bus outbox
o.UseBusOutbox(bus =>
{
bus.Durable = true;
bus.ConcurrentMessageLimit = 10;
});
});
// Register consumers
x.AddConsumer<OrderCreatedConsumer>();
// Configure transport
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
// Add hosted service to process outbox
services.AddHostedService<MassTransitOutboxProcessorHostedService>();
MassTransit MongoDB Outbox¶
services.AddMassTransit(x =>
{
x.AddMongoDbOutbox(o =>
{
o.Connection = "mongodb://localhost:27017";
o.DatabaseName = "microservice";
o.CollectionName = "outbox";
// Configure query delay
o.QueryDelay = TimeSpan.FromSeconds(10);
// Configure cleanup
o.DisableCleanup = false;
o.DuplicateDetectionWindow = TimeSpan.FromDays(7);
});
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
NServiceBus Outbox¶
// NServiceBusExtensions.cs
public static IServiceCollection AddMicroserviceNServiceBus(
this IServiceCollection services,
IConfiguration configuration)
{
var endpointConfiguration = new EndpointConfiguration("MyService");
// Enable outbox
var outboxSettings = endpointConfiguration.EnableOutbox();
// Configure persistence (required for outbox)
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.SqlDialect<SqlDialect.MsSqlServer>();
persistence.ConnectionBuilder(() =>
new SqlConnection(configuration.GetConnectionString("DefaultConnection")));
// Configure outbox settings
outboxSettings.TimeToKeepDeduplicationData(TimeSpan.FromDays(7));
outboxSettings.DisableCleanup(); // Optional: for development
// Transport configuration
var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.ConnectionString(configuration.GetConnectionString("RabbitMQ"));
transport.UseConventionalRoutingTopology();
return services;
}
Best Practices¶
Do's¶
- Always use outbox for critical messages
- Ensure important events/commands use outbox
-
Prevents message loss during failures
-
Configure appropriate query delay
- Balance between latency and throughput
-
Typical values: 5-30 seconds
-
Implement deduplication
- Use message IDs to prevent duplicate processing
-
Configure retention window appropriately
-
Monitor outbox metrics
- Track messages pending publication
- Alert on high retry counts
-
Monitor outbox table growth
-
Handle outbox cleanup
- Archive or delete old published messages
-
Prevent outbox table from growing unbounded
-
Use same database for outbox
- Store outbox in same database as business data
- Ensures transactional consistency
Don'ts¶
- Don't bypass outbox for critical messages
- Always use outbox when publishing from within transactions
-
Direct publishing can cause message loss
-
Don't set query delay too low
- Very low delays can impact database performance
-
Balance latency vs. resource usage
-
Don't ignore outbox failures
- Monitor and alert on outbox processing failures
-
Implement retry logic for failed publications
-
Don't store large messages in outbox
- Keep message bodies reasonable size
-
Consider using message references for large payloads
-
Don't forget to clean up old messages
- Implement cleanup job for published messages
- Configure retention policies
Integration Points¶
Unit of Work Pattern¶
Outbox integrates with Unit of Work:
// Outbox messages stored within same transaction
unitOfWork.ExecuteTransactional(() =>
{
repository.Save(entity);
outboxRepository.SaveMessage(eventMessage); // Same transaction
});
Domain Events¶
Outbox publishes domain events:
// Domain event automatically stored in outbox
await eventBus.PublishEvent(new OrderCreatedEvent
{
OrderId = order.Id
});
Transaction Management¶
Outbox ensures transactional consistency:
// All operations in single transaction
await using var transaction = await dbContext.Database.BeginTransactionAsync();
try
{
// Business data
await dbContext.SaveChangesAsync();
// Outbox messages
await publishEndpoint.Publish(events);
// Commit atomically
await transaction.CommitAsync();
}
catch
{
await transaction.RollbackAsync();
throw;
}
Common Scenarios¶
Scenario 1: Order Creation with Events¶
public async Task<Order> CreateOrderAsync(CreateOrderCommand command)
{
await using var transaction = await dbContext.Database.BeginTransactionAsync();
try
{
// Create order
var order = new Order { /* ... */ };
dbContext.Orders.Add(order);
// Publish domain events (goes to outbox)
await publishEndpoint.Publish(new OrderCreatedEvent
{
OrderId = order.Id,
CustomerId = order.CustomerId,
TotalAmount = order.TotalAmount
});
await dbContext.SaveChangesAsync();
await transaction.CommitAsync();
return order;
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
Scenario 2: Compensating Actions¶
// When order cancellation happens, publish compensation events via outbox
public async Task CancelOrderAsync(Guid orderId)
{
await using var transaction = await dbContext.Database.BeginTransactionAsync();
try
{
var order = await dbContext.Orders.FindAsync(orderId);
order.Status = OrderStatus.Cancelled;
// Publish compensation events (outbox ensures delivery)
await publishEndpoint.Publish(new OrderCancelledEvent
{
OrderId = orderId
});
await publishEndpoint.Publish(new RefundPaymentCommand
{
PaymentId = order.PaymentId,
Amount = order.TotalAmount
});
await dbContext.SaveChangesAsync();
await transaction.CommitAsync();
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
Scenario 3: Batch Operations¶
// Multiple messages in single transaction
public async Task ProcessBatchAsync(BatchData batch)
{
await using var transaction = await dbContext.Database.BeginTransactionAsync();
try
{
// Save multiple entities
dbContext.Entities.AddRange(batch.Entities);
// Publish multiple events (all in outbox)
foreach (var entity in batch.Entities)
{
await publishEndpoint.Publish(new EntityCreatedEvent
{
EntityId = entity.Id
});
}
await dbContext.SaveChangesAsync();
await transaction.CommitAsync();
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
Troubleshooting¶
Issue: Messages not being published¶
Cause: Outbox processor not running or query delay too high.
Solution: - Verify outbox processor hosted service is running - Check outbox table for unpublished messages - Reduce query delay if latency is acceptable - Check logs for outbox processing errors
Issue: Duplicate messages being published¶
Cause: Outbox deduplication not configured or message IDs not set.
Solution: - Enable deduplication in outbox configuration - Ensure message IDs are unique - Configure appropriate deduplication window
Issue: Outbox table growing too large¶
Cause: Cleanup not configured or failing.
Solution: - Enable outbox cleanup - Configure retention policy - Implement custom cleanup job if needed - Archive old messages before deletion
Issue: High database load from outbox queries¶
Cause: Query delay too low or concurrent limit too high.
Solution: - Increase query delay - Reduce concurrent message limit - Optimize outbox table indexes - Consider separate database for outbox
Issue: Messages published but consumers not receiving¶
Cause: Message broker connectivity or routing issue.
Solution: - Verify message broker connection - Check message routing configuration - Verify consumer subscriptions - Check message broker logs
References¶
- MassTransit: masstransit.md
- NServiceBus: nservicebus.md
- Messaging Model: messaging-model.md
- Unit of Work Pattern: persistence-and-data-modeling.md
- Saga Pattern: saga-pattern.md
Summary¶
The Outbox Pattern in ConnectSoft Microservice Template:
- Ensures atomic transactions between database and message publishing
- Provides exactly-once message delivery guarantees
- Prevents message loss during service failures
- Supports reliable event publishing from domain operations
- Works with MassTransit (EF Core, MongoDB) and NServiceBus
- Enables eventual consistency across services
- Provides failure recovery through message persistence
This pattern is essential for building reliable, event-driven microservices where message publishing must be transactional and guaranteed.