NServiceBus in ConnectSoft Microservice Template¶
Purpose & Overview¶
NServiceBus is a mature, enterprise-grade service bus framework that provides reliable message-based communication and orchestration. In the ConnectSoft Microservice Template, NServiceBus is integrated as a pluggable alternative to MassTransit, offering advanced features like SQL-based persistence, ServiceControl integration, and built-in outbox support.
Why NServiceBus?¶
NServiceBus offers several key benefits for enterprise scenarios:
- Enterprise-Grade Reliability: SQL-based saga persistence with transaction support
- ServiceControl Integration: Built-in monitoring, heartbeats, and operational visibility
- Outbox Pattern: Built-in outbox support for guaranteed message delivery
- Advanced Recoverability: Sophisticated retry policies and error handling
- Interface-Based Contracts: Type-safe message contracts using
ICommandandIEvent - Unobtrusive Mode: Minimal coupling with NServiceBus assemblies in message contracts
- SQL Server Transport: Native SQL Server transport option for database-centric deployments
- Commercial Support: Available commercial support and training
NServiceBus vs MassTransit
NServiceBus is an alternative to MassTransit in the template. Choose NServiceBus when you need SQL-based persistence, ServiceControl monitoring, or commercial support. Choose MassTransit for open-source-only requirements or when preferring POCO-based message contracts.
Architecture Overview¶
NServiceBus integrates at the messaging layer of Clean Architecture:
Domain Services (DomainModel)
├── Processors (Commands/Writes)
└── Retrievers (Queries/Reads)
↓ (Publish/Subscribe via IMessageSession)
NServiceBus Endpoint
├── Transport (SQL Server/RabbitMQ/Azure Service Bus)
├── Handlers (Message Handlers)
├── Sagas (Long-running Orchestration)
├── Persistence (SQL-based Saga State)
└── Pipeline Behaviors (Custom Middleware)
↓
Message Broker / SQL Server
└── Queues/Subscriptions
Key Integration Points¶
| Layer | Component | Responsibility |
|---|---|---|
| MessagingModel | ICommand/IEvent |
Message contracts (interfaces) |
| DomainModel | Services | Send/publish via IMessageSession |
| ApplicationModel | NServiceBusExtensions | Endpoint configuration |
| FlowModel.NServiceBus | Sagas | Long-running process orchestration |
| PersistenceModel | SQL Persistence | Saga state and outbox storage |
Core Components¶
1. Service Registration¶
NServiceBus is registered via extension methods:
// Program.cs
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddOptionsConfiguration(builder.Configuration);
builder.Services.AddNServiceBus(builder.Configuration);
// Configure logging
builder.Host.ConfigureLogging((hostingContext, logging) =>
{
logging.AddNServiceBus();
});
var app = builder.Build();
// Start endpoint
app.ApplicationServices.StartNServiceBusEndpoint();
await app.RunAsync();
// Stop endpoint on shutdown
app.Lifetime.ApplicationStopping.Register(() =>
{
NServiceBusExtensions.StopNServiceBusEndpoint();
});
2. Endpoint Configuration¶
The AddNServiceBus() extension method configures all NServiceBus services:
// NServiceBusExtensions.cs
internal static IServiceCollection AddNServiceBus(
this IServiceCollection services,
IConfiguration configuration)
{
ArgumentNullException.ThrowIfNull(services);
ArgumentNullException.ThrowIfNull(configuration);
NServiceBusOptions nServiceBusOptions = OptionsExtensions.NServiceBusOptions;
var endpointConfiguration = new EndpointConfiguration(nServiceBusOptions.EndpointName);
// Serialization
DefineNServiceBusSerialization(endpointConfiguration);
// Recoverability (retry policies)
DefineNServiceBusRecoverability(endpointConfiguration, nServiceBusOptions);
// Critical error handling
endpointConfiguration.DefineCriticalErrorAction(OnCriticalError);
endpointConfiguration.SendFailedMessagesTo(nServiceBusOptions.ErrorQueue);
endpointConfiguration.AuditProcessedMessagesTo(nServiceBusOptions.AuditQueue);
endpointConfiguration.AuditSagaStateChanges(serviceControlQueue: nServiceBusOptions.SagaAuditQueue);
// Heartbeat and metrics
DefineNServiceBusHeartbeat(endpointConfiguration, nServiceBusOptions);
DefineNServiceBusServiceControlMetrics(endpointConfiguration, nServiceBusOptions);
// Unobtrusive mode conventions
DefineUnobtrusiveModeConventions(endpointConfiguration);
// Transport and routing
DefineNServiceBusTransportAndRouting(endpointConfiguration, configuration, nServiceBusOptions);
// SQL Persistence
#if UseNServiceBusSQLPersistence
DefineNServiceBusSQLPersistence(nServiceBusOptions.EndpointName, endpointConfiguration, configuration);
#endif
// Outbox (optional - currently commented out)
// var outboxSettings = endpointConfiguration.EnableOutbox();
// Licensing
DefineNServiceBusLicensing(endpointConfiguration);
// Message validation
DefineNServiceBusMessageValidation(endpointConfiguration);
// OpenTelemetry
#if OpenTelemetry
endpointConfiguration.EnableOpenTelemetry();
#endif
// Enable installers (creates queues, tables, etc.)
endpointConfiguration.EnableInstallers();
// Register custom pipeline behavior
var pipeline = endpointConfiguration.Pipeline;
pipeline.Register(
stepId: nameof(ExtendLoggerScopeWithMessageDataBehavior),
behavior: typeof(ExtendLoggerScopeWithMessageDataBehavior),
description: ExtendLoggerScopeWithMessageDataBehavior.ExtendLoggerScopeWithMessageDataBehaviorDescription);
// Configure dependency injection
var endpointWithExternallyManagedServiceProvider =
DefineDependencyInjectionExternallyManagedMode(services, endpointConfiguration);
return services;
}
3. Transport Configuration¶
SQL Server Transport¶
The template supports SQL Server transport:
private static void DefineNServiceBusTransportAndRouting(
EndpointConfiguration endpointConfiguration,
IConfiguration configuration,
NServiceBusOptions nServiceBusOptions)
{
#if UseNServiceBusSqlServerTransport
var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
var connectionString = configuration.GetConnectionString(
NServiceBusSqlServerTransportConnectionStringKey);
// Create database and schema if not exists
SqlServerDatabaseHelper databaseHelper = new();
databaseHelper.CreateIfNotExists(connectionString);
databaseHelper.CreateSchema(connectionString, nServiceBusOptions.EndpointName);
databaseHelper.CreateSchema(connectionString, "nsb");
transport.ConnectionString(connectionString);
transport.DefaultSchema("nsb");
// Create required queues
using var connection = new SqlConnection(connectionString);
connection.Open();
NServiceBusQueueCreationUtils.CreateQueue(connection, "nsb", nServiceBusOptions.ErrorQueue);
transport.UseSchemaForQueue(nServiceBusOptions.ErrorQueue, "nsb");
NServiceBusQueueCreationUtils.CreateQueue(connection, "nsb", nServiceBusOptions.AuditQueue);
transport.UseSchemaForQueue(nServiceBusOptions.AuditQueue, "nsb");
NServiceBusQueueCreationUtils.CreateQueue(connection, "nsb", nServiceBusOptions.SagaAuditQueue);
transport.UseSchemaForQueue(nServiceBusOptions.SagaAuditQueue, "nsb");
// Configure transactions
transport.Transactions(TransportTransactionMode.SendsAtomicWithReceive);
// Configure subscriptions
var subscriptions = transport.SubscriptionSettings();
subscriptions.CacheSubscriptionInformationFor(TimeSpan.FromMinutes(1));
subscriptions.SubscriptionTableName(
tableName: "Subscriptions",
schemaName: "nsb");
#endif
// Configure routing
transport.Routing().RouteToEndpoint(
assembly: typeof(MicroserviceAggregateRootCreatedEvent).Assembly,
destination: nServiceBusOptions.EndpointName);
transport.Routing().RouteToEndpoint(
assembly: typeof(CreateMicroserviceAggregateRootCommand).Assembly,
destination: nServiceBusOptions.EndpointName);
}
4. SQL Persistence¶
NServiceBus uses SQL Server for saga persistence:
private static void DefineNServiceBusSQLPersistence(
string endpointName,
EndpointConfiguration endpointConfiguration,
IConfiguration configuration)
{
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
var connectionString = configuration.GetConnectionString(
NServiceBusSQLPersistenceConnectionStringKey);
// Create database and schema if not exists
SqlServerDatabaseHelper databaseHelper = new();
databaseHelper.CreateIfNotExists(connectionString);
databaseHelper.CreateSchema(connectionString, endpointName);
databaseHelper.CreateSchema(connectionString, "nsb");
var dialect = persistence.SqlDialect<SqlDialect.MsSqlServer>();
dialect.Schema(endpointName);
persistence.ConnectionBuilder(() => new SqlConnection(connectionString));
var subscriptions = persistence.SubscriptionSettings();
subscriptions.CacheFor(TimeSpan.FromMinutes(1));
}
5. Sending and Publishing Messages¶
Using IMessageSession¶
NServiceBus uses IMessageSession for sending and publishing:
public class MicroserviceAggregateRootsProcessor : IMicroserviceAggregateRootsProcessor
{
private readonly IMessageSession messageSession;
private readonly ILogger<MicroserviceAggregateRootsProcessor> logger;
public MicroserviceAggregateRootsProcessor(
IMessageSession messageSession,
ILogger<MicroserviceAggregateRootsProcessor> logger)
{
this.messageSession = messageSession;
this.logger = logger;
}
public async Task<IMicroserviceAggregateRoot> CreateMicroserviceAggregateRoot(
CreateMicroserviceAggregateRootInput input,
CancellationToken token = default)
{
// Execute domain logic
var entity = new MicroserviceAggregateRoot { ObjectId = input.ObjectId };
// ... save to repository ...
// Publish domain event
await this.messageSession.Publish(new MicroserviceAggregateRootCreatedEvent
{
ObjectId = entity.ObjectId
});
// Or send command
await this.messageSession.Send(new CreateMicroserviceAggregateRootCommand
{
ObjectId = entity.ObjectId
});
this.logger.LogInformation(
"Published MicroserviceAggregateRootCreatedEvent for {ObjectId}",
entity.ObjectId);
return entity;
}
}
Publishing Events¶
// Publish event (can have multiple subscribers)
await messageSession.Publish(new MicroserviceAggregateRootCreatedEvent
{
ObjectId = entity.ObjectId
});
Sending Commands¶
// Send command (single handler)
await messageSession.Send(new CreateMicroserviceAggregateRootCommand
{
ObjectId = input.ObjectId
});
6. Message Handlers¶
Message handlers implement IHandleMessages<T>:
public class CreateMicroserviceAggregateRootCommandHandler :
IHandleMessages<CreateMicroserviceAggregateRootCommand>
{
private readonly IMicroserviceAggregateRootsProcessor processor;
private readonly ILogger<CreateMicroserviceAggregateRootCommandHandler> logger;
public CreateMicroserviceAggregateRootCommandHandler(
IMicroserviceAggregateRootsProcessor processor,
ILogger<CreateMicroserviceAggregateRootCommandHandler> logger)
{
this.processor = processor;
this.logger = logger;
}
public async Task Handle(
CreateMicroserviceAggregateRootCommand message,
IMessageHandlerContext context)
{
this.logger.LogInformation(
"Handling CreateMicroserviceAggregateRootCommand for {ObjectId}",
message.ObjectId);
try
{
var input = new CreateMicroserviceAggregateRootInput
{
ObjectId = message.ObjectId
};
await this.processor.CreateMicroserviceAggregateRoot(input, context.CancellationToken);
// Optionally publish success event
await context.Publish(new MicroserviceAggregateRootCreatedEvent
{
ObjectId = message.ObjectId
});
this.logger.LogInformation(
"Successfully processed CreateMicroserviceAggregateRootCommand for {ObjectId}",
message.ObjectId);
}
catch (Exception ex)
{
this.logger.LogError(
ex,
"Error processing CreateMicroserviceAggregateRootCommand for {ObjectId}",
message.ObjectId);
throw; // Let NServiceBus recoverability handle retry/fault
}
}
}
7. Sagas¶
NServiceBus sagas orchestrate long-running business processes. See Saga Pattern for detailed information.
Saga Example¶
public class MicroserviceAggregateRootLifeCycleSaga :
Saga<MicroserviceAggregateRootSagaData>,
IAmStartedByMessages<MicroserviceAggregateRootCreatedEvent>
{
private readonly ILogger<MicroserviceAggregateRootLifeCycleSaga> logger;
public MicroserviceAggregateRootLifeCycleSaga(
ILogger<MicroserviceAggregateRootLifeCycleSaga> logger)
{
this.logger = logger;
}
public Task Handle(
MicroserviceAggregateRootCreatedEvent message,
IMessageHandlerContext context)
{
using (this.logger.BeginScope(
new Dictionary<string, object>(StringComparer.Ordinal)
{
["ApplicationFlowName"] = nameof(MicroserviceAggregateRootLifeCycleSaga) +
"/HandleMicroserviceAggregateRootCreatedEvent",
}))
{
Guid objectId = message.ObjectId;
this.logger.LogInformation(
"Handle MicroserviceAggregateRoot's created event for {ObjectId} started and saga created...",
objectId);
this.Data.ObjectId = message.ObjectId;
this.logger.LogInformation(
"Handle MicroserviceAggregateRoot's created event for {ObjectId} completed...",
objectId);
}
return Task.CompletedTask;
}
protected override void ConfigureHowToFindSaga(
SagaPropertyMapper<MicroserviceAggregateRootSagaData> mapper)
{
mapper.MapSaga(saga => saga.ObjectId)
.ToMessage<MicroserviceAggregateRootCreatedEvent>(msg => msg.ObjectId);
}
}
Saga Data¶
public class MicroserviceAggregateRootSagaData : ContainSagaData
{
public Guid ObjectId { get; set; }
// Additional saga state properties...
}
Unobtrusive Mode¶
NServiceBus uses unobtrusive mode to minimize coupling with NServiceBus assemblies:
private static void DefineUnobtrusiveModeConventions(EndpointConfiguration endpointConfiguration)
{
var conventions = endpointConfiguration.Conventions();
// Define commands
conventions.DefiningCommandsAs(type =>
string.Equals(type.Namespace, typeof(CreateMicroserviceAggregateRootCommand).Namespace,
StringComparison.Ordinal) &&
typeof(ICommand).IsAssignableFrom(type));
// Define events
conventions.DefiningEventsAs(type =>
string.Equals(type.Namespace, typeof(MicroserviceAggregateRootCreatedEvent).Namespace,
StringComparison.Ordinal) &&
typeof(IEvent).IsAssignableFrom(type));
// Define claim check properties (for large payloads)
conventions.DefiningClaimCheckPropertiesAs(property =>
property.Name.EndsWith("DataBus", StringComparison.Ordinal) ||
(typeof(ClaimCheckProperty<>).IsAssignableFrom(property.PropertyType) &&
typeof(ClaimCheckProperty<>) != property.PropertyType));
}
This allows message contracts to be defined without referencing NServiceBus assemblies:
// In MessagingModel project (no NServiceBus reference)
public class CreateMicroserviceAggregateRootCommand : ICommand
{
public Guid ObjectId { get; set; }
}
public class MicroserviceAggregateRootCreatedEvent : IEvent
{
public Guid ObjectId { get; set; }
}
Configuration¶
appsettings.json¶
{
"NServiceBus": {
"EndpointName": "ConnectSoft.MicroserviceTemplate",
"AuditQueue": "audit",
"ErrorQueue": "error",
"SagaAuditQueue": "saga-audit",
"Recoverability": {
"NumberOfImmediateRetries": 0,
"NumberOfDelayedRetries": 3,
"DelayIntervalInSeconds": 10
},
"Metrics": {
"ServiceControlMetricsAddress": "particular.monitoring",
"ServiceControlReportingInterval": 10
},
"Heartbeat": {
"ServiceControlHeartbeatAddress": "particular.servicecontrol",
"HeartbeatInterval": 10,
"HeartbeatTtl": 30
}
},
"ConnectionStrings": {
"ConnectSoft.MicroserviceTemplateNServiceBusTransportConnectionString":
"Server=localhost;Database=NServiceBusTransport;Integrated Security=true;",
"ConnectSoft.MicroserviceTemplateNServiceBusPersistenceConnectionString":
"Server=localhost;Database=NServiceBusPersistence;Integrated Security=true;"
}
}
Environment-Specific Configuration¶
// appsettings.Development.json
{
"NServiceBus": {
"Recoverability": {
"NumberOfImmediateRetries": 3,
"NumberOfDelayedRetries": 2
}
},
"ConnectionStrings": {
"ConnectSoft.MicroserviceTemplateNServiceBusTransportConnectionString":
"Server=localhost;Database=NServiceBusTransport_Dev;Integrated Security=true;"
}
}
// appsettings.Production.json
{
"NServiceBus": {
"Recoverability": {
"NumberOfImmediateRetries": 0,
"NumberOfDelayedRetries": 5,
"DelayIntervalInSeconds": 30
}
},
"ConnectionStrings": {
"ConnectSoft.MicroserviceTemplateNServiceBusTransportConnectionString":
"${NSB_TRANSPORT_CONNECTION_STRING}",
"ConnectSoft.MicroserviceTemplateNServiceBusPersistenceConnectionString":
"${NSB_PERSISTENCE_CONNECTION_STRING}"
}
}
Recoverability and Retry Policies¶
Immediate Retries¶
private static void DefineNServiceBusRecoverability(
EndpointConfiguration endpointConfiguration,
NServiceBusOptions nServiceBusOptions)
{
var recoverability = endpointConfiguration.Recoverability();
// Immediate retries (fail fast)
recoverability.Immediate(immediate =>
{
immediate.NumberOfRetries(nServiceBusOptions.Recoverability.NumberOfImmediateRetries);
});
// Delayed retries (with exponential backoff)
recoverability.Delayed(delayed =>
{
delayed.NumberOfRetries(nServiceBusOptions.Recoverability.NumberOfDelayedRetries);
if (nServiceBusOptions.Recoverability.NumberOfDelayedRetries != 0 &&
nServiceBusOptions.Recoverability.DelayIntervalInSeconds.HasValue)
{
delayed.TimeIncrease(TimeSpan.FromSeconds(
nServiceBusOptions.Recoverability.DelayIntervalInSeconds.Value));
}
});
}
Custom Retry Policies¶
recoverability.Immediate(immediate =>
{
immediate.NumberOfRetries(3);
});
recoverability.Delayed(delayed =>
{
delayed.NumberOfRetries(5);
delayed.TimeIncrease(TimeSpan.FromSeconds(10)); // Exponential backoff
});
// Configure unrecoverable exceptions (skip retries)
recoverability.AddUnrecoverableException<ValidationException>();
Error Queue¶
Failed messages (after all retries) are sent to the error queue:
Message Validation¶
NServiceBus supports DataAnnotations validation:
private static void DefineNServiceBusMessageValidation(EndpointConfiguration endpointConfiguration)
{
// Enable DataAnnotations message validation
endpointConfiguration.UseDataAnnotationsValidation(
outgoing: true, // Validate outgoing messages
incoming: true); // Validate incoming messages
}
Validation errors result in: - Validation exception thrown - Message handled by recoverability - Added to unrecoverable exceptions to avoid unnecessary retries
ServiceControl Integration¶
Heartbeats¶
Heartbeats enable endpoint health monitoring:
private static void DefineNServiceBusHeartbeat(
EndpointConfiguration endpointConfiguration,
NServiceBusOptions nServiceBusOptions)
{
if (nServiceBusOptions.Heartbeat != null)
{
endpointConfiguration.SendHeartbeatTo(
serviceControlQueue: nServiceBusOptions.Heartbeat.ServiceControlHeartbeatAddress,
frequency: TimeSpan.FromSeconds(nServiceBusOptions.Heartbeat.HeartbeatInterval),
timeToLive: TimeSpan.FromSeconds(nServiceBusOptions.Heartbeat.HeartbeatTtl));
}
}
Metrics¶
Metrics reporting enables performance monitoring:
private static void DefineNServiceBusServiceControlMetrics(
EndpointConfiguration endpointConfiguration,
NServiceBusOptions nServiceBusOptions)
{
if (nServiceBusOptions.Metrics != null)
{
var metrics = endpointConfiguration.EnableMetrics();
var machineName = $"{Dns.GetHostName()}.{IPGlobalProperties.GetIPGlobalProperties().DomainName}";
var instanceIdentifier = $"{nServiceBusOptions.EndpointName}@{machineName}";
metrics.SetServiceControlMetricsMessageTTBR(TimeSpan.FromHours(12));
metrics.SendMetricDataToServiceControl(
serviceControlMetricsAddress: nServiceBusOptions.Metrics.ServiceControlMetricsAddress,
interval: TimeSpan.FromSeconds(nServiceBusOptions.Metrics.ServiceControlReportingInterval),
instanceId: instanceIdentifier);
}
}
Auditing¶
All processed messages can be audited:
endpointConfiguration.AuditProcessedMessagesTo("audit");
endpointConfiguration.AuditSagaStateChanges(serviceControlQueue: "saga-audit");
Outbox Pattern¶
NServiceBus has built-in outbox support:
// Enable outbox (requires persistence)
var outboxSettings = endpointConfiguration.EnableOutbox();
// Outbox settings
outboxSettings.TimeToKeepDeduplicationData(TimeSpan.FromDays(7));
outboxSettings.DisableCleanup(); // Optional: for development
See Outbox Pattern for detailed information.
Observability¶
OpenTelemetry Integration¶
Traces include: - Message sending and receiving - Handler execution - Saga operations - Retry attempts
Structured Logging¶
Use ILogger<T> with correlation IDs:
public async Task Handle(MyMessage message, IMessageHandlerContext context)
{
using (logger.BeginScope(new Dictionary<string, object>
{
["MessageId"] = context.MessageId,
["CorrelationId"] = context.MessageHeaders["NServiceBus.CorrelationId"],
["ObjectId"] = message.ObjectId
}))
{
logger.LogInformation("Processing message");
// Process message
logger.LogInformation("Message processed successfully");
}
}
Health Checks¶
NServiceBus integrates with ASP.NET Core health checks:
// HealthChecksExtensions.cs
#if UseNServiceBus
builder.AddNServiceBusHealthChecks(configuration);
#endif
Health checks verify: - Transport database connectivity - Persistence database connectivity - Endpoint health
Testing¶
Unit Testing with NServiceBus.Testing¶
[TestMethod]
public async Task MicroserviceAggregateRootLifeCycleSaga_Should_Be_Initialized_When_Event_Published()
{
// Arrange
var saga = new MicroserviceAggregateRootLifeCycleSaga(logger);
var sagaData = new MicroserviceAggregateRootSagaData();
var context = new TestableMessageHandlerContext();
var @event = new MicroserviceAggregateRootCreatedEvent
{
ObjectId = Guid.NewGuid()
};
// Act
await saga.Handle(@event, context);
// Assert
Assert.AreEqual(@event.ObjectId, sagaData.ObjectId);
Assert.IsTrue(context.PublishedMessages.Any(m =>
m.Message is MicroserviceAggregateRootCreatedEvent));
}
Testing Handlers¶
[TestMethod]
public async Task Handler_Should_Process_Command()
{
// Arrange
var processor = new Mock<IMicroserviceAggregateRootsProcessor>();
var handler = new CreateMicroserviceAggregateRootCommandHandler(
processor.Object,
logger);
var context = new TestableMessageHandlerContext();
var command = new CreateMicroserviceAggregateRootCommand
{
ObjectId = Guid.NewGuid()
};
// Act
await handler.Handle(command, context);
// Assert
processor.Verify(p => p.CreateMicroserviceAggregateRoot(
It.IsAny<CreateMicroserviceAggregateRootInput>(),
It.IsAny<CancellationToken>()),
Times.Once);
}
Pipeline Behaviors¶
Custom pipeline behaviors can extend message processing:
public class ExtendLoggerScopeWithMessageDataBehavior :
IBehavior<IMessageHandlerContext, IMessageHandlerContext>
{
public async Task Invoke(
IMessageHandlerContext context,
Func<IMessageHandlerContext, Task> next)
{
// Extend logger scope with message data
using (logger.BeginScope(new Dictionary<string, object>
{
["MessageId"] = context.MessageId,
["MessageType"] = context.MessageHeaders["NServiceBus.EnclosedMessageTypes"]
}))
{
await next(context);
}
}
}
Register behavior:
var pipeline = endpointConfiguration.Pipeline;
pipeline.Register(
stepId: nameof(ExtendLoggerScopeWithMessageDataBehavior),
behavior: typeof(ExtendLoggerScopeWithMessageDataBehavior),
description: "Extend logger scope with message data");
Best Practices¶
Do's¶
- Use IMessageSession for sending/publishing
- Scoped per request
-
Better integration with DI
-
Keep handlers thin
- Delegate business logic to domain services
-
Handlers should only handle message concerns
-
Use unobtrusive mode
- Minimize coupling with NServiceBus assemblies
-
Use
ICommandandIEventinterfaces -
Configure retry policies appropriately
- Different policies for different exception types
- Use immediate retries for transient failures
-
Use delayed retries for longer recovery
-
Enable ServiceControl integration
- Heartbeats for endpoint monitoring
- Metrics for performance tracking
-
Auditing for compliance
-
Use SQL persistence for sagas
- Reliable saga state storage
- Supports transactions
-
Better for enterprise scenarios
-
Enable outbox for critical messages
- Guaranteed message delivery
-
Prevents message loss
-
Test with NServiceBus.Testing
- Unit test handlers and sagas
- Verify message publishing
Don'ts¶
- Don't put business logic in handlers
- Handlers should delegate to services
-
Keep orchestration separate
-
Don't ignore critical errors
- Configure critical error handlers
-
Log and alert appropriately
-
Don't share domain entities as messages
- Use interfaces in
MessagingModel -
Keep contracts independent
-
Don't disable installers in production
- Use installers for initial setup
-
Or create queues/tables manually
-
Don't ignore validation
- Enable message validation
- Catch validation errors early
Common Scenarios¶
Scenario 1: Simple Event Publishing¶
// Publish event after domain operation
public async Task CreateEntity(CreateEntityInput input)
{
var entity = new Entity { Id = input.Id };
repository.Insert(entity);
await unitOfWork.SaveChangesAsync();
// Publish event
await messageSession.Publish(new EntityCreatedEvent
{
EntityId = entity.Id
});
}
Scenario 2: Saga Orchestration¶
// Saga handles multiple messages to orchestrate workflow
public class OrderProcessingSaga : Saga<OrderProcessingSagaData>,
IAmStartedByMessages<OrderCreatedEvent>,
IHandleMessages<PaymentProcessedEvent>,
IHandleMessages<InventoryReservedEvent>
{
public async Task Handle(OrderCreatedEvent message, IMessageHandlerContext context)
{
Data.OrderId = message.OrderId;
// Send command to process payment
await context.Send(new ProcessPaymentCommand
{
OrderId = Data.OrderId,
Amount = message.TotalAmount
});
}
public async Task Handle(PaymentProcessedEvent message, IMessageHandlerContext context)
{
Data.PaymentId = message.PaymentId;
// Send command to reserve inventory
await context.Send(new ReserveInventoryCommand
{
OrderId = Data.OrderId
});
}
public async Task Handle(InventoryReservedEvent message, IMessageHandlerContext context)
{
// Complete saga
MarkAsComplete();
// Publish completion event
await context.Publish(new OrderCompletedEvent
{
OrderId = Data.OrderId
});
}
}
Scenario 3: Request/Response Pattern¶
// Client sends request and waits for response
var options = new SendOptions();
options.SetDestination("DestinationEndpoint");
var request = new GetEntityRequest { Id = entityId };
var response = await messageSession.Request<GetEntityResponse>(request, options);
// Handler responds
public class GetEntityRequestHandler : IHandleMessages<GetEntityRequest>
{
public async Task Handle(GetEntityRequest message, IMessageHandlerContext context)
{
var entity = await repository.GetByIdAsync(message.Id);
await context.Reply(new GetEntityResponse
{
Entity = entity
});
}
}
Summary¶
NServiceBus in the ConnectSoft Microservice Template provides:
- ✅ Enterprise Reliability: SQL-based persistence and transactions
- ✅ ServiceControl Integration: Monitoring, heartbeats, and metrics
- ✅ Outbox Support: Guaranteed message delivery
- ✅ Advanced Recoverability: Sophisticated retry policies
- ✅ Unobtrusive Mode: Minimal coupling with NServiceBus assemblies
- ✅ Observability: OpenTelemetry and structured logging
- ✅ Testing: Comprehensive testing support
- ✅ Clean Architecture: Fits seamlessly with DDD and CQRS patterns
By following these patterns and best practices, NServiceBus becomes a powerful messaging infrastructure that enables scalable, reliable, and observable microservices with enterprise-grade features.