MassTransit in ConnectSoft Microservice Template¶
Purpose & Overview¶
MassTransit is the default messaging and orchestration engine integrated into the ConnectSoft Microservice Template. It provides a robust, scalable, and developer-friendly abstraction over distributed message-based communication, enabling event-driven architectures, long-running process orchestration, and reliable message delivery.
Why MassTransit?¶
MassTransit offers several key benefits for the template:
- Open Source (MIT License): Free to use without commercial licensing concerns
- Pub/Sub and Request/Response: Supports both messaging patterns
- Saga Orchestration: Built-in support for long-running business processes
- Transport Abstraction: Works with RabbitMQ, Azure Service Bus, Amazon SQS, and Kafka
- Built-in Reliability: Message retries, dead-letter queues, and fault handling
- Dependency Injection: Seamless integration with ASP.NET Core DI
- Testing Support: Comprehensive test harness for unit and integration testing
- Observability: First-class OpenTelemetry and structured logging support
- Outbox Pattern: Supports transactional outbox for exactly-once delivery
MassTransit vs Alternatives
MassTransit was chosen over alternatives like NServiceBus due to its non-commercial license, active community, and flexible customization model that aligns well with ConnectSoft's DDD, AI, and Event-Sourced ecosystem.
Architecture Overview¶
MassTransit integrates at the messaging layer of Clean Architecture:
Domain Services (DomainModel)
├── Processors (Commands/Writes)
└── Retrievers (Queries/Reads)
↓ (Publish/Subscribe)
MassTransit Bus
├── Transport (RabbitMQ/Azure Service Bus)
├── Consumers (Message Handlers)
├── Sagas (Long-running Orchestration)
└── Middleware (Retry, Outbox, Tracing)
↓
Message Broker
└── Queues/Exchanges/Topics
Key Integration Points¶
| Layer | Component | Responsibility |
|---|---|---|
| MessagingModel | Commands/Events | Message contracts (DTOs) |
| DomainModel | Services | Publish events via IPublishEndpoint |
| ApplicationModel | MassTransitExtensions | Bus configuration and registration |
| FlowModel.MassTransit | Sagas | Long-running process orchestration |
| PersistenceModel | Outbox/Saga State | Message persistence and saga state |
Core Components¶
1. Service Registration¶
MassTransit is registered via a single extension method:
2. Configuration Extension¶
The AddMicroserviceMassTransit() extension method configures all MassTransit services:
// MassTransitExtensions.cs
internal static IServiceCollection AddMicroserviceMassTransit(this IServiceCollection services)
{
ArgumentNullException.ThrowIfNull(services);
MassTransitOptions massTransitOptions = OptionsExtensions.MassTransitOptions;
// Configure host options
AddAndConfigureMassTransitHostOptions(services, massTransitOptions);
// Configure correlation ID mapping
GlobalTopology.Send.UseCorrelationId<MicroserviceAggregateRootCreatedEvent>(x => x.ObjectId);
services.AddMassTransit(configurator =>
{
// Use kebab-case for endpoint names
configurator.SetKebabCaseEndpointNameFormatter();
// Register saga state machine
var sagaRegistrationConfigurator = configurator
.AddSagaStateMachine<MicroserviceAggregateRootLifeCycleSaga, MicroserviceAggregateRootLifeCycleSagaState>();
#if UseMassTransitMongoDBPersistence
// Configure MongoDB saga persistence
sagaRegistrationConfigurator.MongoDbRepository(repo =>
{
repo.Connection = massTransitOptions.MongoDbPersistence.Connection;
repo.DatabaseName = massTransitOptions.MongoDbPersistence.DatabaseName;
repo.CollectionName = nameof(MicroserviceAggregateRootLifeCycleSaga);
});
#endif
// Configure transport
#if UseMassTransitRabbitMQTransport
ConfigureUsingRabbitMq(configurator, massTransitOptions);
#endif
#if UseMassTransitAzureServiceBusTransport
ConfigureUsingAzureServiceBus(configurator, massTransitOptions);
#endif
});
// Register event bus adapter
services.AddSingleton<IEventBus, MassTransitAdapter>();
services.ActivateSingleton<IEventBus>();
return services;
}
3. Transport Configuration¶
RabbitMQ Transport¶
private static void ConfigureUsingRabbitMq(
IBusRegistrationConfigurator configurator,
MassTransitOptions massTransitOptions)
{
configurator.UsingRabbitMq((context, config) =>
{
// Configure RabbitMQ host
config.Host(
massTransitOptions.RabbitMqTransport.RabbitMqTransportHost.Host,
massTransitOptions.RabbitMqTransport.RabbitMqTransportHost.Port,
massTransitOptions.RabbitMqTransport.RabbitMqTransportHost.VirtualHost,
host =>
{
host.Username(massTransitOptions.RabbitMqTransport.RabbitMqTransportHost.UserName);
host.Password(massTransitOptions.RabbitMqTransport.RabbitMqTransportHost.Password);
if (massTransitOptions.RabbitMqTransport.RabbitMqTransportHost.UseSsl)
{
host.UseSsl(ssl =>
{
ssl.Protocol = SslProtocols.Tls12;
});
}
host.PublisherConfirmation = massTransitOptions.RabbitMqTransport.RabbitMqHostBehaviour.PublisherConfirmation;
host.Heartbeat(massTransitOptions.RabbitMqTransport.RabbitMqHostBehaviour.Heartbeat);
host.RequestedChannelMax(massTransitOptions.RabbitMqTransport.RabbitMqHostBehaviour.RequestedChannelMax);
host.RequestedConnectionTimeout(massTransitOptions.RabbitMqTransport.RabbitMqHostBehaviour.RequestedConnectionTimeout);
host.ContinuationTimeout(massTransitOptions.RabbitMqTransport.RabbitMqHostBehaviour.ContinuationTimeout);
});
// Configure endpoint settings
config.PrefetchCount = massTransitOptions.RabbitMqTransport.RabbitMqReceiveEndpoint.PrefetchCount;
config.PurgeOnStartup = massTransitOptions.RabbitMqTransport.RabbitMqReceiveEndpoint.PurgeOnStartup;
config.AutoDelete = massTransitOptions.RabbitMqTransport.RabbitMqReceiveEndpoint.AutoDelete;
config.Durable = massTransitOptions.RabbitMqTransport.RabbitMqReceiveEndpoint.Durable;
config.ConcurrentMessageLimit = massTransitOptions.RabbitMqTransport.RabbitMqReceiveEndpoint.ConcurrentMessageLimit;
// Configure retry policy
config.UseMessageRetry(retryConfig =>
{
retryConfig.Interval(
retryCount: 3,
interval: TimeSpan.FromSeconds(5));
});
// Auto-configure endpoints for consumers and sagas
config.ConfigureEndpoints(context);
});
}
Azure Service Bus Transport¶
private static void ConfigureUsingAzureServiceBus(
IBusRegistrationConfigurator configurator,
MassTransitOptions massTransitOptions)
{
configurator.UsingAzureServiceBus((context, config) =>
{
if (!string.IsNullOrWhiteSpace(massTransitOptions.AzureServiceBusTransport.AzureServiceBusHost.ConnectionString))
{
config.Host(massTransitOptions.AzureServiceBusTransport.AzureServiceBusHost.ConnectionString, host =>
{
host.RetryLimit = massTransitOptions.AzureServiceBusTransport.RetryLimit;
host.RetryMaxBackoff = TimeSpan.FromSeconds(
massTransitOptions.AzureServiceBusTransport.RetryMaxBackoffSeconds ?? 30);
host.RetryMinBackoff = TimeSpan.FromSeconds(
massTransitOptions.AzureServiceBusTransport.RetryMinBackoffSeconds ?? 5);
});
}
else
{
config.Host(massTransitOptions.AzureServiceBusTransport.AzureServiceBusHost.FullyQualifiedNamespace, host =>
{
if (massTransitOptions.AzureServiceBusTransport.AzureServiceBusHost.UseManagedIdentity)
{
if (!string.IsNullOrWhiteSpace(
massTransitOptions.AzureServiceBusTransport.AzureServiceBusHost.ClientId))
{
host.TokenCredential = new Azure.Identity.DefaultAzureCredential(
new Azure.Identity.DefaultAzureCredentialOptions
{
ManagedIdentityClientId =
massTransitOptions.AzureServiceBusTransport.AzureServiceBusHost.ClientId,
});
}
else
{
host.TokenCredential = new Azure.Identity.DefaultAzureCredential();
}
}
if (massTransitOptions.AzureServiceBusTransport.AzureServiceBusHost.UseWebSockets)
{
host.TransportType = Azure.Messaging.ServiceBus.ServiceBusTransportType.AmqpWebSockets;
}
else
{
host.TransportType = Azure.Messaging.ServiceBus.ServiceBusTransportType.AmqpTcp;
}
host.RetryLimit = massTransitOptions.AzureServiceBusTransport.RetryLimit;
host.RetryMaxBackoff = TimeSpan.FromSeconds(
massTransitOptions.AzureServiceBusTransport.RetryMaxBackoffSeconds ?? 30);
host.RetryMinBackoff = TimeSpan.FromSeconds(
massTransitOptions.AzureServiceBusTransport.RetryMinBackoffSeconds ?? 5);
});
}
// Configure receive endpoint
config.ReceiveEndpoint(
queueName: "microservice-masstransit",
configureEndpoint: endpointConfig =>
{
endpointConfig.PrefetchCount =
massTransitOptions.AzureServiceBusTransport.AzureServiceBusReceiveEndpoint.PrefetchCount;
endpointConfig.ConcurrentMessageLimit =
massTransitOptions.AzureServiceBusTransport.AzureServiceBusReceiveEndpoint.ConcurrentMessageLimit;
});
config.PrefetchCount =
massTransitOptions.AzureServiceBusTransport.AzureServiceBusReceiveEndpoint.PrefetchCount;
config.ConcurrentMessageLimit =
massTransitOptions.AzureServiceBusTransport.AzureServiceBusReceiveEndpoint.PrefetchCount;
config.ConfigureEndpoints(context);
});
}
4. Publishing Messages¶
Using IPublishEndpoint¶
The recommended way to publish messages is via IPublishEndpoint:
public class MicroserviceAggregateRootsProcessor : IMicroserviceAggregateRootsProcessor
{
private readonly IPublishEndpoint publishEndpoint;
private readonly ILogger<MicroserviceAggregateRootsProcessor> logger;
public MicroserviceAggregateRootsProcessor(
IPublishEndpoint publishEndpoint,
ILogger<MicroserviceAggregateRootsProcessor> logger)
{
this.publishEndpoint = publishEndpoint;
this.logger = logger;
}
public async Task<IMicroserviceAggregateRoot> CreateMicroserviceAggregateRoot(
CreateMicroserviceAggregateRootInput input,
CancellationToken token = default)
{
// Execute domain logic
var entity = new MicroserviceAggregateRoot { ObjectId = input.ObjectId };
// ... save to repository ...
// Publish domain event
await this.publishEndpoint.Publish(new MicroserviceAggregateRootCreatedEvent
{
ObjectId = entity.ObjectId
}, token);
this.logger.LogInformation(
"Published MicroserviceAggregateRootCreatedEvent for {ObjectId}",
entity.ObjectId);
return entity;
}
}
IPublishEndpoint vs IBus
Always use IPublishEndpoint instead of IBus for publishing. IPublishEndpoint is scoped per request and more testable.
Publishing from Domain Services¶
// Domain service can publish events after business operations
public async Task ProcessOrder(Order order)
{
// Business logic
await repository.SaveAsync(order);
// Publish event (using scoped IPublishEndpoint)
await publishEndpoint.Publish(new OrderCreatedEvent
{
OrderId = order.Id,
CustomerId = order.CustomerId,
TotalAmount = order.TotalAmount
});
}
5. Consuming Messages¶
Consumers implement IConsumer<T> and handle incoming messages:
// Example consumer (if implemented in template)
public class CreateMicroserviceAggregateRootCommandConsumer :
IConsumer<CreateMicroserviceAggregateRootCommand>
{
private readonly IMicroserviceAggregateRootsProcessor processor;
private readonly ILogger<CreateMicroserviceAggregateRootCommandConsumer> logger;
public CreateMicroserviceAggregateRootCommandConsumer(
IMicroserviceAggregateRootsProcessor processor,
ILogger<CreateMicroserviceAggregateRootCommandConsumer> logger)
{
this.processor = processor;
this.logger = logger;
}
public async Task Consume(ConsumeContext<CreateMicroserviceAggregateRootCommand> context)
{
this.logger.LogInformation(
"Consuming CreateMicroserviceAggregateRootCommand for {ObjectId}",
context.Message.ObjectId);
try
{
var input = new CreateMicroserviceAggregateRootInput
{
ObjectId = context.Message.ObjectId
};
await this.processor.CreateMicroserviceAggregateRoot(input, context.CancellationToken);
// Optionally publish success event
await context.Publish(new MicroserviceAggregateRootCreatedEvent
{
ObjectId = context.Message.ObjectId
}, context.CancellationToken);
this.logger.LogInformation(
"Successfully processed CreateMicroserviceAggregateRootCommand for {ObjectId}",
context.Message.ObjectId);
}
catch (Exception ex)
{
this.logger.LogError(
ex,
"Error processing CreateMicroserviceAggregateRootCommand for {ObjectId}",
context.Message.ObjectId);
throw; // Let MassTransit handle retry/fault
}
}
}
Consumer Registration¶
Consumers are automatically registered when using namespace scanning:
// Auto-register all consumers in namespace
configurator.AddConsumersFromNamespaceContaining<CreateMicroserviceAggregateRootCommandConsumer>();
// Or register manually
configurator.AddConsumer<CreateMicroserviceAggregateRootCommandConsumer>();
6. Sagas¶
MassTransit sagas orchestrate long-running business processes. See Saga Pattern for detailed information.
Saga State Machine Example¶
public class MicroserviceAggregateRootLifeCycleSaga :
MassTransitStateMachine<MicroserviceAggregateRootLifeCycleSagaState>
{
public State? MicroserviceAggregateRootCreatedEventState { get; private set; }
public State? MicroserviceAggregateRootCreatedFaultedState { get; private set; }
public Event<MicroserviceAggregateRootCreatedEvent>? MicroserviceAggregateRootCreatedEvent { get; private set; }
public Event<Fault<MicroserviceAggregateRootCreatedEvent>>? MicroserviceAggregateRootCreatedFaultEvent { get; private set; }
public MicroserviceAggregateRootLifeCycleSaga(
ILogger<MicroserviceAggregateRootLifeCycleSaga> logger,
TimeProvider timeProvider)
{
this.InstanceState(x => x.CurrentState);
// Define events and correlation
this.Event(() => this.MicroserviceAggregateRootCreatedEvent, e =>
{
e.CorrelateBy<Guid>(state => state.ObjectId, context => context.Message.ObjectId);
e.InsertOnInitial = true;
e.SetSagaFactory(ctx => new MicroserviceAggregateRootLifeCycleSagaState
{
CorrelationId = ctx.Message.ObjectId,
ObjectId = ctx.Message.ObjectId
});
});
this.Event(() => this.MicroserviceAggregateRootCreatedFaultEvent, configurator =>
{
configurator.CorrelateById(ctx => ctx.InitiatorId ?? ctx.Message.Message.ObjectId);
});
// Define state machine behavior
this.Initially(
When(this.MicroserviceAggregateRootCreatedEvent)
.Then(context =>
{
logger.LogInformation(
"Saga started for ObjectId={ObjectId}",
context.Message.ObjectId);
context.Saga.StartedAt = timeProvider.GetUtcNow();
})
.TransitionTo(this.MicroserviceAggregateRootCreatedEventState));
// Define fault compensation
this.DuringAny(
When(this.MicroserviceAggregateRootCreatedFaultEvent)
.Then(context => this.ProcessMicroserviceAggregateRootCreatedEventFaulted(context))
.TransitionTo(this.MicroserviceAggregateRootCreatedFaultedState));
}
private void ProcessMicroserviceAggregateRootCreatedEventFaulted(
BehaviorContext<MicroserviceAggregateRootLifeCycleSagaState,
Fault<MicroserviceAggregateRootCreatedEvent>> context)
{
// Log fault and handle compensation
// ...
}
}
Configuration¶
appsettings.json¶
{
"MassTransit": {
"MassTransitHost": {
"StopTimeout": "00:00:30",
"StartTimeout": "00:01:00",
"WaitUntilStarted": true,
"ConsumerStopTimeout": "00:00:30"
},
"RabbitMqTransport": {
"RabbitMqTransportHost": {
"Host": "localhost",
"Port": 5672,
"VirtualHost": "/",
"UserName": "guest",
"Password": "guest",
"UseSsl": false
},
"RabbitMqHostBehaviour": {
"PublisherConfirmation": true,
"Heartbeat": "00:00:30",
"RequestedChannelMax": 0,
"RequestedConnectionTimeout": "00:01:00",
"ContinuationTimeout": "00:00:20"
},
"RabbitMqReceiveEndpoint": {
"PrefetchCount": 16,
"PurgeOnStartup": false,
"AutoDelete": false,
"Durable": true,
"ConcurrentMessageLimit": 10
}
},
"MongoDbPersistence": {
"Connection": "mongodb://localhost:27017",
"DatabaseName": "SagaStateDb"
}
}
}
Environment-Specific Configuration¶
// appsettings.Development.json
{
"MassTransit": {
"RabbitMqTransport": {
"RabbitMqTransportHost": {
"Host": "localhost",
"UserName": "dev_user",
"Password": "dev_pass"
}
},
"MongoDbPersistence": {
"Connection": "mongodb://localhost:27017",
"DatabaseName": "DevSagaStateDb"
}
}
}
// appsettings.Production.json
{
"MassTransit": {
"RabbitMqTransport": {
"RabbitMqTransportHost": {
"Host": "prod-rabbitmq.internal",
"UserName": "${RABBITMQ_USERNAME}",
"Password": "${RABBITMQ_PASSWORD}"
}
},
"MongoDbPersistence": {
"Connection": "${MONGODB_CONNECTION_STRING}",
"DatabaseName": "ProdSagaStateDb"
}
}
}
Message Patterns¶
Publish/Subscribe (Events)¶
Events are published and can have multiple subscribers:
// Publish event
await publishEndpoint.Publish(new MicroserviceAggregateRootCreatedEvent
{
ObjectId = entity.ObjectId
});
// Multiple consumers can handle the same event
// - Saga receives event for orchestration
// - Audit consumer logs the event
// - Notification consumer sends alerts
Request/Response¶
For request/response patterns:
// Client side
var client = bus.CreateRequestClient<GetEntityQuery>(TimeSpan.FromSeconds(30));
var response = await client.GetResponse<GetEntityResponse>(
new GetEntityQuery { Id = entityId });
// Handler side
public class GetEntityQueryConsumer : IConsumer<GetEntityQuery>
{
public async Task Consume(ConsumeContext<GetEntityQuery> context)
{
var entity = await repository.GetByIdAsync(context.Message.Id);
await context.RespondAsync(new GetEntityResponse
{
Entity = entity
});
}
}
When to Use Each Pattern
- Publish/Subscribe: For domain events, notifications, multiple interested parties
- Request/Response: For queries, synchronous operations requiring immediate response
Retry and Error Handling¶
Retry Configuration¶
config.UseMessageRetry(retryConfig =>
{
// Immediate retries
retryConfig.Interval(
retryCount: 3,
interval: TimeSpan.FromSeconds(5));
// Or exponential backoff
retryConfig.Exponential(
retryLimit: 5,
minInterval: TimeSpan.FromSeconds(1),
maxInterval: TimeSpan.FromSeconds(30),
intervalDelta: TimeSpan.FromSeconds(2));
});
// Configure retry filter (retry only on specific exceptions)
config.UseMessageRetry(retryConfig =>
{
retryConfig.Except<ValidationException>()
.Interval(3, TimeSpan.FromSeconds(5));
});
Delayed Redelivery¶
For longer delays between attempts:
config.UseDelayedRedelivery(redeliveryConfig =>
{
redeliveryConfig.Intervals(
TimeSpan.FromSeconds(10),
TimeSpan.FromSeconds(30),
TimeSpan.FromMinutes(1));
});
Dead Letter Queues¶
After all retries are exhausted, messages are moved to error queues:
- RabbitMQ:
queue-name_error - Azure Service Bus: Dead-letter queue (automatically created)
Messages in error queues contain: - Original message - Exception details - Retry count - Timestamp
Fault Handling¶
Sagas can handle faults explicitly:
// Define fault event
public Event<Fault<CreateCommand>>? CreateCommandFaulted { get; private set; }
// Handle fault in saga
During(Any,
When(CreateCommandFaulted)
.Then(context =>
{
logger.LogError(
"Command failed: {Error}",
context.Message.Exceptions.First().Message);
// Perform compensation
})
.TransitionTo(FailedState));
Observability¶
OpenTelemetry Integration¶
MassTransit automatically creates spans for message operations:
// In OpenTelemetryExtensions.cs
services.AddOpenTelemetry()
.WithTracing(tracerProviderBuilder =>
{
tracerProviderBuilder
.AddSource("MassTransit")
.AddSource("MassTransit.Saga")
.AddMassTransitInstrumentation(); // Enable MassTransit tracing
});
Traces include: - Message publishing - Message consumption - Saga transitions - Retry attempts - Fault handling
Structured Logging¶
Use ILogger<T> with correlation IDs:
public async Task Consume(ConsumeContext<MyMessage> context)
{
using (logger.BeginScope(new Dictionary<string, object>
{
["MessageId"] = context.MessageId,
["CorrelationId"] = context.CorrelationId,
["ObjectId"] = context.Message.ObjectId
}))
{
logger.LogInformation("Processing message");
// Process message
logger.LogInformation("Message processed successfully");
}
}
Health Checks¶
MassTransit integrates with ASP.NET Core health checks:
// HealthChecksExtensions.cs
builder.AddMassTransitHealthChecks(services);
// Health check endpoint
// GET /health → Includes MassTransit bus status
Testing¶
Unit Testing with Test Harness¶
MassTransit provides a test harness for in-memory testing:
[TestMethod]
public async Task MicroserviceAggregateRootLifeCycleSaga_Should_Be_Initialized_When_Event_Published()
{
// Arrange
var services = new ServiceCollection();
services.AddMassTransitTestHarness(configurator =>
{
configurator.AddSagaStateMachine<
MicroserviceAggregateRootLifeCycleSaga,
MicroserviceAggregateRootLifeCycleSagaState>();
});
var provider = services.BuildServiceProvider();
var harness = provider.GetRequiredService<ITestHarness>();
await harness.Start();
var @event = new MicroserviceAggregateRootCreatedEvent
{
ObjectId = Guid.NewGuid()
};
// Act
await harness.Bus.Publish(@event);
// Assert
Assert.IsTrue(await harness.Published.Any<MicroserviceAggregateRootCreatedEvent>());
Assert.IsTrue(await harness.Consumed.Any<MicroserviceAggregateRootCreatedEvent>());
var sagaHarness = harness.GetSagaStateMachineHarness<
MicroserviceAggregateRootLifeCycleSaga,
MicroserviceAggregateRootLifeCycleSagaState>();
Assert.IsTrue(await sagaHarness.Created.Any());
var instance = sagaHarness.Created.ContainsInState(
correlationId: @event.ObjectId,
machine: sagaHarness.StateMachine,
state: sagaHarness.StateMachine.MicroserviceAggregateRootCreatedEventState);
Assert.IsNotNull(instance);
Assert.AreEqual(@event.ObjectId, instance.ObjectId);
Assert.AreEqual(@event.ObjectId, instance.CorrelationId);
}
Testing Consumers¶
[TestMethod]
public async Task Consumer_Should_Process_Command()
{
var services = new ServiceCollection();
services.AddMassTransitTestHarness(configurator =>
{
configurator.AddConsumer<CreateMicroserviceAggregateRootCommandConsumer>();
});
var provider = services.BuildServiceProvider();
var harness = provider.GetRequiredService<ITestHarness>();
await harness.Start();
var command = new CreateMicroserviceAggregateRootCommand
{
ObjectId = Guid.NewGuid()
};
await harness.InputQueueSendEndpoint.Send(command);
Assert.IsTrue(await harness.Consumed.Any<CreateMicroserviceAggregateRootCommand>());
Assert.IsTrue(await harness.Published.Any<MicroserviceAggregateRootCreatedEvent>());
}
Integration with Domain Services¶
Publishing from Processors¶
public class MicroserviceAggregateRootsProcessor : IMicroserviceAggregateRootsProcessor
{
private readonly IPublishEndpoint publishEndpoint;
private readonly IMicroserviceAggregateRootsRepository repository;
public async Task<IMicroserviceAggregateRoot> CreateMicroserviceAggregateRoot(
CreateMicroserviceAggregateRootInput input,
CancellationToken token)
{
// Domain logic
var entity = new MicroserviceAggregateRoot { ObjectId = input.ObjectId };
repository.Insert(entity);
await unitOfWork.SaveChangesAsync(token);
// Publish domain event
await this.publishEndpoint.Publish(new MicroserviceAggregateRootCreatedEvent
{
ObjectId = entity.ObjectId
}, token);
return entity;
}
}
Using Outbox Pattern¶
For guaranteed delivery, use the outbox pattern:
// Enable outbox in configuration
services.AddMassTransit(x =>
{
x.AddEntityFrameworkOutbox<MyDbContext>(o =>
{
o.QueryDelay = TimeSpan.FromSeconds(10);
o.UseSqlServer();
o.UseBusOutbox(bus =>
{
bus.Durable = true;
bus.ConcurrentMessageLimit = 10;
});
});
});
See Outbox Pattern for detailed information.
Best Practices¶
Do's¶
- Use IPublishEndpoint for publishing
- Scoped per request
- More testable than IBus
-
Better integration with DI
-
Keep consumers thin
- Delegate business logic to domain services
-
Consumers should only handle message concerns
-
Use correlation IDs
- Include CorrelationId in all messages
- Use for tracing across services
-
Enable with
GlobalTopology.Send.UseCorrelationId<T>() -
Handle faults explicitly
- Define fault events in sagas
- Implement compensation logic
-
Log faults with context
-
Configure retry policies appropriately
- Different policies for different exception types
- Use exponential backoff for transient failures
-
Set reasonable retry limits
-
Use structured logging
- Include correlation IDs in log scope
- Log message start and completion
-
Include relevant business identifiers
-
Test with MassTransit Test Harness
- Use in-memory transport for tests
- Verify message publishing and consumption
-
Test saga state transitions
-
Separate message contracts from domain
- Messages in
MessagingModelproject - Keep contracts simple POCOs
- No business logic in messages
Don'ts¶
- Don't put business logic in consumers
- Consumers should delegate to services
-
Keep orchestration separate from business rules
-
Don't use IBus directly
- Prefer
IPublishEndpointfor publishing -
Prefer
ISendEndpointProviderfor sending -
Don't ignore faults
- Always handle
Fault<T>in sagas - Log faults appropriately
-
Implement compensation
-
Don't share domain entities as messages
- Use DTOs in
MessagingModel -
Keep contracts independent of persistence
-
Don't configure endpoints manually
- Use
ConfigureEndpoints(context)for auto-configuration -
Let MassTransit handle routing
-
Don't swallow exceptions
- Let exceptions propagate for retry/fault handling
- Only catch if you have a specific recovery strategy
Common Scenarios¶
Scenario 1: Simple Event Publishing¶
// Publish event after domain operation
public async Task CreateEntity(CreateEntityInput input)
{
var entity = new Entity { Id = input.Id };
repository.Insert(entity);
await unitOfWork.SaveChangesAsync();
// Publish event
await publishEndpoint.Publish(new EntityCreatedEvent
{
EntityId = entity.Id
});
}
Scenario 2: Saga Orchestration¶
// Saga handles multiple events to orchestrate workflow
public class OrderProcessingSaga : MassTransitStateMachine<OrderProcessingSagaState>
{
Initially(
When(OrderCreated)
.Publish(ProcessPaymentCommand)
.TransitionTo(PaymentProcessing));
During(PaymentProcessing,
When(PaymentProcessed)
.Publish(ReserveInventoryCommand)
.TransitionTo(InventoryReserved),
When(PaymentFailed)
.Publish(CancelOrderCommand)
.TransitionTo(Failed));
}
Scenario 3: Request/Response Pattern¶
// Client requests data
var client = bus.CreateRequestClient<GetEntityQuery>();
var response = await client.GetResponse<GetEntityResponse>(
new GetEntityQuery { Id = entityId });
// Handler responds
public class GetEntityQueryConsumer : IConsumer<GetEntityQuery>
{
public async Task Consume(ConsumeContext<GetEntityQuery> context)
{
var entity = await repository.GetByIdAsync(context.Message.Id);
await context.RespondAsync(new GetEntityResponse { Entity = entity });
}
}
Configuration Reference¶
MassTransitOptions¶
| Option | Description | Default |
|---|---|---|
MassTransitHost.StopTimeout |
Time to wait for bus to stop | 00:00:30 |
MassTransitHost.StartTimeout |
Time to wait for bus to start | 00:01:00 |
MassTransitHost.WaitUntilStarted |
Wait for bus to start before continuing | true |
RabbitMqTransport.RabbitMqReceiveEndpoint.PrefetchCount |
Messages to prefetch | 16 |
RabbitMqTransport.RabbitMqReceiveEndpoint.ConcurrentMessageLimit |
Max concurrent messages | 10 |
Endpoint Naming¶
MassTransit uses kebab-case for endpoint names:
- CreateMicroserviceAggregateRootCommand → create-microservice-aggregate-root-command
- MicroserviceAggregateRootCreatedEvent → microservice-aggregate-root-created-event
Summary¶
MassTransit in the ConnectSoft Microservice Template provides:
- ✅ Reliable Messaging: Retry, dead-letter queues, fault handling
- ✅ Saga Orchestration: Long-running process management
- ✅ Transport Flexibility: RabbitMQ, Azure Service Bus support
- ✅ Observability: OpenTelemetry and structured logging
- ✅ Testing: Comprehensive test harness
- ✅ Clean Architecture: Fits seamlessly with DDD and CQRS patterns
- ✅ Outbox Support: Guaranteed message delivery
By following these patterns and best practices, MassTransit becomes a powerful messaging infrastructure that enables scalable, reliable, and observable microservices.