Skip to content

NServiceBus in ConnectSoft Microservice Template

Purpose & Overview

NServiceBus is a mature, enterprise-grade service bus framework that provides reliable message-based communication and orchestration. In the ConnectSoft Microservice Template, NServiceBus is integrated as a pluggable alternative to MassTransit, offering advanced features like SQL-based persistence, ServiceControl integration, and built-in outbox support.

Why NServiceBus?

NServiceBus offers several key benefits for enterprise scenarios:

  • Enterprise-Grade Reliability: SQL-based saga persistence with transaction support
  • ServiceControl Integration: Built-in monitoring, heartbeats, and operational visibility
  • Outbox Pattern: Built-in outbox support for guaranteed message delivery
  • Advanced Recoverability: Sophisticated retry policies and error handling
  • Interface-Based Contracts: Type-safe message contracts using ICommand and IEvent
  • Unobtrusive Mode: Minimal coupling with NServiceBus assemblies in message contracts
  • SQL Server Transport: Native SQL Server transport option for database-centric deployments
  • Commercial Support: Available commercial support and training

NServiceBus vs MassTransit

NServiceBus is an alternative to MassTransit in the template. Choose NServiceBus when you need SQL-based persistence, ServiceControl monitoring, or commercial support. Choose MassTransit for open-source-only requirements or when preferring POCO-based message contracts.

Architecture Overview

NServiceBus integrates at the messaging layer of Clean Architecture:

Domain Services (DomainModel)
    ├── Processors (Commands/Writes)
    └── Retrievers (Queries/Reads)
    ↓ (Publish/Subscribe via IMessageSession)
NServiceBus Endpoint
    ├── Transport (SQL Server/RabbitMQ/Azure Service Bus)
    ├── Handlers (Message Handlers)
    ├── Sagas (Long-running Orchestration)
    ├── Persistence (SQL-based Saga State)
    └── Pipeline Behaviors (Custom Middleware)
Message Broker / SQL Server
    └── Queues/Subscriptions

Key Integration Points

Layer Component Responsibility
MessagingModel ICommand/IEvent Message contracts (interfaces)
DomainModel Services Send/publish via IMessageSession
ApplicationModel NServiceBusExtensions Endpoint configuration
FlowModel.NServiceBus Sagas Long-running process orchestration
PersistenceModel SQL Persistence Saga state and outbox storage

Core Components

1. Service Registration

NServiceBus is registered via extension methods:

// Program.cs
var builder = WebApplication.CreateBuilder(args);

builder.Services.AddOptionsConfiguration(builder.Configuration);
builder.Services.AddNServiceBus(builder.Configuration);

// Configure logging
builder.Host.ConfigureLogging((hostingContext, logging) =>
{
    logging.AddNServiceBus();
});

var app = builder.Build();

// Start endpoint
app.ApplicationServices.StartNServiceBusEndpoint();

await app.RunAsync();

// Stop endpoint on shutdown
app.Lifetime.ApplicationStopping.Register(() =>
{
    NServiceBusExtensions.StopNServiceBusEndpoint();
});

2. Endpoint Configuration

The AddNServiceBus() extension method configures all NServiceBus services:

// NServiceBusExtensions.cs
internal static IServiceCollection AddNServiceBus(
    this IServiceCollection services, 
    IConfiguration configuration)
{
    ArgumentNullException.ThrowIfNull(services);
    ArgumentNullException.ThrowIfNull(configuration);

    NServiceBusOptions nServiceBusOptions = OptionsExtensions.NServiceBusOptions;

    var endpointConfiguration = new EndpointConfiguration(nServiceBusOptions.EndpointName);

    // Serialization
    DefineNServiceBusSerialization(endpointConfiguration);

    // Recoverability (retry policies)
    DefineNServiceBusRecoverability(endpointConfiguration, nServiceBusOptions);

    // Critical error handling
    endpointConfiguration.DefineCriticalErrorAction(OnCriticalError);
    endpointConfiguration.SendFailedMessagesTo(nServiceBusOptions.ErrorQueue);
    endpointConfiguration.AuditProcessedMessagesTo(nServiceBusOptions.AuditQueue);
    endpointConfiguration.AuditSagaStateChanges(serviceControlQueue: nServiceBusOptions.SagaAuditQueue);

    // Heartbeat and metrics
    DefineNServiceBusHeartbeat(endpointConfiguration, nServiceBusOptions);
    DefineNServiceBusServiceControlMetrics(endpointConfiguration, nServiceBusOptions);

    // Unobtrusive mode conventions
    DefineUnobtrusiveModeConventions(endpointConfiguration);

    // Transport and routing
    DefineNServiceBusTransportAndRouting(endpointConfiguration, configuration, nServiceBusOptions);

    // SQL Persistence
#if UseNServiceBusSQLPersistence
    DefineNServiceBusSQLPersistence(nServiceBusOptions.EndpointName, endpointConfiguration, configuration);
#endif

    // Outbox (optional - currently commented out)
    // var outboxSettings = endpointConfiguration.EnableOutbox();

    // Licensing
    DefineNServiceBusLicensing(endpointConfiguration);

    // Message validation
    DefineNServiceBusMessageValidation(endpointConfiguration);

    // OpenTelemetry
#if OpenTelemetry
    endpointConfiguration.EnableOpenTelemetry();
#endif

    // Enable installers (creates queues, tables, etc.)
    endpointConfiguration.EnableInstallers();

    // Register custom pipeline behavior
    var pipeline = endpointConfiguration.Pipeline;
    pipeline.Register(
        stepId: nameof(ExtendLoggerScopeWithMessageDataBehavior),
        behavior: typeof(ExtendLoggerScopeWithMessageDataBehavior),
        description: ExtendLoggerScopeWithMessageDataBehavior.ExtendLoggerScopeWithMessageDataBehaviorDescription);

    // Configure dependency injection
    var endpointWithExternallyManagedServiceProvider = 
        DefineDependencyInjectionExternallyManagedMode(services, endpointConfiguration);

    return services;
}

3. Transport Configuration

SQL Server Transport

The template supports SQL Server transport:

private static void DefineNServiceBusTransportAndRouting(
    EndpointConfiguration endpointConfiguration, 
    IConfiguration configuration, 
    NServiceBusOptions nServiceBusOptions)
{
#if UseNServiceBusSqlServerTransport
    var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
    var connectionString = configuration.GetConnectionString(
        NServiceBusSqlServerTransportConnectionStringKey);

    // Create database and schema if not exists
    SqlServerDatabaseHelper databaseHelper = new();
    databaseHelper.CreateIfNotExists(connectionString);
    databaseHelper.CreateSchema(connectionString, nServiceBusOptions.EndpointName);
    databaseHelper.CreateSchema(connectionString, "nsb");

    transport.ConnectionString(connectionString);
    transport.DefaultSchema("nsb");

    // Create required queues
    using var connection = new SqlConnection(connectionString);
    connection.Open();

    NServiceBusQueueCreationUtils.CreateQueue(connection, "nsb", nServiceBusOptions.ErrorQueue);
    transport.UseSchemaForQueue(nServiceBusOptions.ErrorQueue, "nsb");

    NServiceBusQueueCreationUtils.CreateQueue(connection, "nsb", nServiceBusOptions.AuditQueue);
    transport.UseSchemaForQueue(nServiceBusOptions.AuditQueue, "nsb");

    NServiceBusQueueCreationUtils.CreateQueue(connection, "nsb", nServiceBusOptions.SagaAuditQueue);
    transport.UseSchemaForQueue(nServiceBusOptions.SagaAuditQueue, "nsb");

    // Configure transactions
    transport.Transactions(TransportTransactionMode.SendsAtomicWithReceive);

    // Configure subscriptions
    var subscriptions = transport.SubscriptionSettings();
    subscriptions.CacheSubscriptionInformationFor(TimeSpan.FromMinutes(1));
    subscriptions.SubscriptionTableName(
        tableName: "Subscriptions",
        schemaName: "nsb");
#endif

    // Configure routing
    transport.Routing().RouteToEndpoint(
        assembly: typeof(MicroserviceAggregateRootCreatedEvent).Assembly,
        destination: nServiceBusOptions.EndpointName);
    transport.Routing().RouteToEndpoint(
        assembly: typeof(CreateMicroserviceAggregateRootCommand).Assembly,
        destination: nServiceBusOptions.EndpointName);
}

4. SQL Persistence

NServiceBus uses SQL Server for saga persistence:

private static void DefineNServiceBusSQLPersistence(
    string endpointName, 
    EndpointConfiguration endpointConfiguration, 
    IConfiguration configuration)
{
    var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
    var connectionString = configuration.GetConnectionString(
        NServiceBusSQLPersistenceConnectionStringKey);

    // Create database and schema if not exists
    SqlServerDatabaseHelper databaseHelper = new();
    databaseHelper.CreateIfNotExists(connectionString);
    databaseHelper.CreateSchema(connectionString, endpointName);
    databaseHelper.CreateSchema(connectionString, "nsb");

    var dialect = persistence.SqlDialect<SqlDialect.MsSqlServer>();
    dialect.Schema(endpointName);

    persistence.ConnectionBuilder(() => new SqlConnection(connectionString));

    var subscriptions = persistence.SubscriptionSettings();
    subscriptions.CacheFor(TimeSpan.FromMinutes(1));
}

5. Sending and Publishing Messages

Using IMessageSession

NServiceBus uses IMessageSession for sending and publishing:

public class MicroserviceAggregateRootsProcessor : IMicroserviceAggregateRootsProcessor
{
    private readonly IMessageSession messageSession;
    private readonly ILogger<MicroserviceAggregateRootsProcessor> logger;

    public MicroserviceAggregateRootsProcessor(
        IMessageSession messageSession,
        ILogger<MicroserviceAggregateRootsProcessor> logger)
    {
        this.messageSession = messageSession;
        this.logger = logger;
    }

    public async Task<IMicroserviceAggregateRoot> CreateMicroserviceAggregateRoot(
        CreateMicroserviceAggregateRootInput input,
        CancellationToken token = default)
    {
        // Execute domain logic
        var entity = new MicroserviceAggregateRoot { ObjectId = input.ObjectId };
        // ... save to repository ...

        // Publish domain event
        await this.messageSession.Publish(new MicroserviceAggregateRootCreatedEvent
        {
            ObjectId = entity.ObjectId
        });

        // Or send command
        await this.messageSession.Send(new CreateMicroserviceAggregateRootCommand
        {
            ObjectId = entity.ObjectId
        });

        this.logger.LogInformation(
            "Published MicroserviceAggregateRootCreatedEvent for {ObjectId}",
            entity.ObjectId);

        return entity;
    }
}

Publishing Events

// Publish event (can have multiple subscribers)
await messageSession.Publish(new MicroserviceAggregateRootCreatedEvent
{
    ObjectId = entity.ObjectId
});

Sending Commands

// Send command (single handler)
await messageSession.Send(new CreateMicroserviceAggregateRootCommand
{
    ObjectId = input.ObjectId
});

6. Message Handlers

Message handlers implement IHandleMessages<T>:

public class CreateMicroserviceAggregateRootCommandHandler :
    IHandleMessages<CreateMicroserviceAggregateRootCommand>
{
    private readonly IMicroserviceAggregateRootsProcessor processor;
    private readonly ILogger<CreateMicroserviceAggregateRootCommandHandler> logger;

    public CreateMicroserviceAggregateRootCommandHandler(
        IMicroserviceAggregateRootsProcessor processor,
        ILogger<CreateMicroserviceAggregateRootCommandHandler> logger)
    {
        this.processor = processor;
        this.logger = logger;
    }

    public async Task Handle(
        CreateMicroserviceAggregateRootCommand message, 
        IMessageHandlerContext context)
    {
        this.logger.LogInformation(
            "Handling CreateMicroserviceAggregateRootCommand for {ObjectId}",
            message.ObjectId);

        try
        {
            var input = new CreateMicroserviceAggregateRootInput
            {
                ObjectId = message.ObjectId
            };

            await this.processor.CreateMicroserviceAggregateRoot(input, context.CancellationToken);

            // Optionally publish success event
            await context.Publish(new MicroserviceAggregateRootCreatedEvent
            {
                ObjectId = message.ObjectId
            });

            this.logger.LogInformation(
                "Successfully processed CreateMicroserviceAggregateRootCommand for {ObjectId}",
                message.ObjectId);
        }
        catch (Exception ex)
        {
            this.logger.LogError(
                ex,
                "Error processing CreateMicroserviceAggregateRootCommand for {ObjectId}",
                message.ObjectId);
            throw; // Let NServiceBus recoverability handle retry/fault
        }
    }
}

7. Sagas

NServiceBus sagas orchestrate long-running business processes. See Saga Pattern for detailed information.

Saga Example

public class MicroserviceAggregateRootLifeCycleSaga :
    Saga<MicroserviceAggregateRootSagaData>,
    IAmStartedByMessages<MicroserviceAggregateRootCreatedEvent>
{
    private readonly ILogger<MicroserviceAggregateRootLifeCycleSaga> logger;

    public MicroserviceAggregateRootLifeCycleSaga(
        ILogger<MicroserviceAggregateRootLifeCycleSaga> logger)
    {
        this.logger = logger;
    }

    public Task Handle(
        MicroserviceAggregateRootCreatedEvent message, 
        IMessageHandlerContext context)
    {
        using (this.logger.BeginScope(
            new Dictionary<string, object>(StringComparer.Ordinal)
            {
                ["ApplicationFlowName"] = nameof(MicroserviceAggregateRootLifeCycleSaga) + 
                    "/HandleMicroserviceAggregateRootCreatedEvent",
            }))
        {
            Guid objectId = message.ObjectId;
            this.logger.LogInformation(
                "Handle MicroserviceAggregateRoot's created event for {ObjectId} started and saga created...",
                objectId);

            this.Data.ObjectId = message.ObjectId;

            this.logger.LogInformation(
                "Handle MicroserviceAggregateRoot's created event for {ObjectId} completed...",
                objectId);
        }

        return Task.CompletedTask;
    }

    protected override void ConfigureHowToFindSaga(
        SagaPropertyMapper<MicroserviceAggregateRootSagaData> mapper)
    {
        mapper.MapSaga(saga => saga.ObjectId)
            .ToMessage<MicroserviceAggregateRootCreatedEvent>(msg => msg.ObjectId);
    }
}

Saga Data

public class MicroserviceAggregateRootSagaData : ContainSagaData
{
    public Guid ObjectId { get; set; }
    // Additional saga state properties...
}

Unobtrusive Mode

NServiceBus uses unobtrusive mode to minimize coupling with NServiceBus assemblies:

private static void DefineUnobtrusiveModeConventions(EndpointConfiguration endpointConfiguration)
{
    var conventions = endpointConfiguration.Conventions();

    // Define commands
    conventions.DefiningCommandsAs(type =>
        string.Equals(type.Namespace, typeof(CreateMicroserviceAggregateRootCommand).Namespace, 
            StringComparison.Ordinal) &&
        typeof(ICommand).IsAssignableFrom(type));

    // Define events
    conventions.DefiningEventsAs(type =>
        string.Equals(type.Namespace, typeof(MicroserviceAggregateRootCreatedEvent).Namespace, 
            StringComparison.Ordinal) &&
        typeof(IEvent).IsAssignableFrom(type));

    // Define claim check properties (for large payloads)
    conventions.DefiningClaimCheckPropertiesAs(property =>
        property.Name.EndsWith("DataBus", StringComparison.Ordinal) ||
        (typeof(ClaimCheckProperty<>).IsAssignableFrom(property.PropertyType) &&
         typeof(ClaimCheckProperty<>) != property.PropertyType));
}

This allows message contracts to be defined without referencing NServiceBus assemblies:

// In MessagingModel project (no NServiceBus reference)
public class CreateMicroserviceAggregateRootCommand : ICommand
{
    public Guid ObjectId { get; set; }
}

public class MicroserviceAggregateRootCreatedEvent : IEvent
{
    public Guid ObjectId { get; set; }
}

Configuration

appsettings.json

{
  "NServiceBus": {
    "EndpointName": "ConnectSoft.MicroserviceTemplate",
    "AuditQueue": "audit",
    "ErrorQueue": "error",
    "SagaAuditQueue": "saga-audit",
    "Recoverability": {
      "NumberOfImmediateRetries": 0,
      "NumberOfDelayedRetries": 3,
      "DelayIntervalInSeconds": 10
    },
    "Metrics": {
      "ServiceControlMetricsAddress": "particular.monitoring",
      "ServiceControlReportingInterval": 10
    },
    "Heartbeat": {
      "ServiceControlHeartbeatAddress": "particular.servicecontrol",
      "HeartbeatInterval": 10,
      "HeartbeatTtl": 30
    }
  },
  "ConnectionStrings": {
    "ConnectSoft.MicroserviceTemplateNServiceBusTransportConnectionString": 
      "Server=localhost;Database=NServiceBusTransport;Integrated Security=true;",
    "ConnectSoft.MicroserviceTemplateNServiceBusPersistenceConnectionString": 
      "Server=localhost;Database=NServiceBusPersistence;Integrated Security=true;"
  }
}

Environment-Specific Configuration

// appsettings.Development.json
{
  "NServiceBus": {
    "Recoverability": {
      "NumberOfImmediateRetries": 3,
      "NumberOfDelayedRetries": 2
    }
  },
  "ConnectionStrings": {
    "ConnectSoft.MicroserviceTemplateNServiceBusTransportConnectionString": 
      "Server=localhost;Database=NServiceBusTransport_Dev;Integrated Security=true;"
  }
}

// appsettings.Production.json
{
  "NServiceBus": {
    "Recoverability": {
      "NumberOfImmediateRetries": 0,
      "NumberOfDelayedRetries": 5,
      "DelayIntervalInSeconds": 30
    }
  },
  "ConnectionStrings": {
    "ConnectSoft.MicroserviceTemplateNServiceBusTransportConnectionString": 
      "${NSB_TRANSPORT_CONNECTION_STRING}",
    "ConnectSoft.MicroserviceTemplateNServiceBusPersistenceConnectionString": 
      "${NSB_PERSISTENCE_CONNECTION_STRING}"
  }
}

Recoverability and Retry Policies

Immediate Retries

private static void DefineNServiceBusRecoverability(
    EndpointConfiguration endpointConfiguration, 
    NServiceBusOptions nServiceBusOptions)
{
    var recoverability = endpointConfiguration.Recoverability();

    // Immediate retries (fail fast)
    recoverability.Immediate(immediate =>
    {
        immediate.NumberOfRetries(nServiceBusOptions.Recoverability.NumberOfImmediateRetries);
    });

    // Delayed retries (with exponential backoff)
    recoverability.Delayed(delayed =>
    {
        delayed.NumberOfRetries(nServiceBusOptions.Recoverability.NumberOfDelayedRetries);
        if (nServiceBusOptions.Recoverability.NumberOfDelayedRetries != 0 &&
            nServiceBusOptions.Recoverability.DelayIntervalInSeconds.HasValue)
        {
            delayed.TimeIncrease(TimeSpan.FromSeconds(
                nServiceBusOptions.Recoverability.DelayIntervalInSeconds.Value));
        }
    });
}

Custom Retry Policies

recoverability.Immediate(immediate =>
{
    immediate.NumberOfRetries(3);
});

recoverability.Delayed(delayed =>
{
    delayed.NumberOfRetries(5);
    delayed.TimeIncrease(TimeSpan.FromSeconds(10)); // Exponential backoff
});

// Configure unrecoverable exceptions (skip retries)
recoverability.AddUnrecoverableException<ValidationException>();

Error Queue

Failed messages (after all retries) are sent to the error queue:

endpointConfiguration.SendFailedMessagesTo("error");

Message Validation

NServiceBus supports DataAnnotations validation:

private static void DefineNServiceBusMessageValidation(EndpointConfiguration endpointConfiguration)
{
    // Enable DataAnnotations message validation
    endpointConfiguration.UseDataAnnotationsValidation(
        outgoing: true,  // Validate outgoing messages
        incoming: true); // Validate incoming messages
}

Validation errors result in: - Validation exception thrown - Message handled by recoverability - Added to unrecoverable exceptions to avoid unnecessary retries

ServiceControl Integration

Heartbeats

Heartbeats enable endpoint health monitoring:

private static void DefineNServiceBusHeartbeat(
    EndpointConfiguration endpointConfiguration, 
    NServiceBusOptions nServiceBusOptions)
{
    if (nServiceBusOptions.Heartbeat != null)
    {
        endpointConfiguration.SendHeartbeatTo(
            serviceControlQueue: nServiceBusOptions.Heartbeat.ServiceControlHeartbeatAddress,
            frequency: TimeSpan.FromSeconds(nServiceBusOptions.Heartbeat.HeartbeatInterval),
            timeToLive: TimeSpan.FromSeconds(nServiceBusOptions.Heartbeat.HeartbeatTtl));
    }
}

Metrics

Metrics reporting enables performance monitoring:

private static void DefineNServiceBusServiceControlMetrics(
    EndpointConfiguration endpointConfiguration, 
    NServiceBusOptions nServiceBusOptions)
{
    if (nServiceBusOptions.Metrics != null)
    {
        var metrics = endpointConfiguration.EnableMetrics();

        var machineName = $"{Dns.GetHostName()}.{IPGlobalProperties.GetIPGlobalProperties().DomainName}";
        var instanceIdentifier = $"{nServiceBusOptions.EndpointName}@{machineName}";

        metrics.SetServiceControlMetricsMessageTTBR(TimeSpan.FromHours(12));
        metrics.SendMetricDataToServiceControl(
            serviceControlMetricsAddress: nServiceBusOptions.Metrics.ServiceControlMetricsAddress,
            interval: TimeSpan.FromSeconds(nServiceBusOptions.Metrics.ServiceControlReportingInterval),
            instanceId: instanceIdentifier);
    }
}

Auditing

All processed messages can be audited:

endpointConfiguration.AuditProcessedMessagesTo("audit");
endpointConfiguration.AuditSagaStateChanges(serviceControlQueue: "saga-audit");

Outbox Pattern

NServiceBus has built-in outbox support:

// Enable outbox (requires persistence)
var outboxSettings = endpointConfiguration.EnableOutbox();

// Outbox settings
outboxSettings.TimeToKeepDeduplicationData(TimeSpan.FromDays(7));
outboxSettings.DisableCleanup(); // Optional: for development

See Outbox Pattern for detailed information.

Observability

OpenTelemetry Integration

#if OpenTelemetry
endpointConfiguration.EnableOpenTelemetry();
#endif

Traces include: - Message sending and receiving - Handler execution - Saga operations - Retry attempts

Structured Logging

Use ILogger<T> with correlation IDs:

public async Task Handle(MyMessage message, IMessageHandlerContext context)
{
    using (logger.BeginScope(new Dictionary<string, object>
    {
        ["MessageId"] = context.MessageId,
        ["CorrelationId"] = context.MessageHeaders["NServiceBus.CorrelationId"],
        ["ObjectId"] = message.ObjectId
    }))
    {
        logger.LogInformation("Processing message");
        // Process message
        logger.LogInformation("Message processed successfully");
    }
}

Health Checks

NServiceBus integrates with ASP.NET Core health checks:

// HealthChecksExtensions.cs
#if UseNServiceBus
builder.AddNServiceBusHealthChecks(configuration);
#endif

Health checks verify: - Transport database connectivity - Persistence database connectivity - Endpoint health

Testing

Unit Testing with NServiceBus.Testing

[TestMethod]
public async Task MicroserviceAggregateRootLifeCycleSaga_Should_Be_Initialized_When_Event_Published()
{
    // Arrange
    var saga = new MicroserviceAggregateRootLifeCycleSaga(logger);
    var sagaData = new MicroserviceAggregateRootSagaData();
    var context = new TestableMessageHandlerContext();

    var @event = new MicroserviceAggregateRootCreatedEvent
    {
        ObjectId = Guid.NewGuid()
    };

    // Act
    await saga.Handle(@event, context);

    // Assert
    Assert.AreEqual(@event.ObjectId, sagaData.ObjectId);
    Assert.IsTrue(context.PublishedMessages.Any(m => 
        m.Message is MicroserviceAggregateRootCreatedEvent));
}

Testing Handlers

[TestMethod]
public async Task Handler_Should_Process_Command()
{
    // Arrange
    var processor = new Mock<IMicroserviceAggregateRootsProcessor>();
    var handler = new CreateMicroserviceAggregateRootCommandHandler(
        processor.Object, 
        logger);

    var context = new TestableMessageHandlerContext();
    var command = new CreateMicroserviceAggregateRootCommand
    {
        ObjectId = Guid.NewGuid()
    };

    // Act
    await handler.Handle(command, context);

    // Assert
    processor.Verify(p => p.CreateMicroserviceAggregateRoot(
        It.IsAny<CreateMicroserviceAggregateRootInput>(), 
        It.IsAny<CancellationToken>()), 
        Times.Once);
}

Pipeline Behaviors

Custom pipeline behaviors can extend message processing:

public class ExtendLoggerScopeWithMessageDataBehavior :
    IBehavior<IMessageHandlerContext, IMessageHandlerContext>
{
    public async Task Invoke(
        IMessageHandlerContext context, 
        Func<IMessageHandlerContext, Task> next)
    {
        // Extend logger scope with message data
        using (logger.BeginScope(new Dictionary<string, object>
        {
            ["MessageId"] = context.MessageId,
            ["MessageType"] = context.MessageHeaders["NServiceBus.EnclosedMessageTypes"]
        }))
        {
            await next(context);
        }
    }
}

Register behavior:

var pipeline = endpointConfiguration.Pipeline;
pipeline.Register(
    stepId: nameof(ExtendLoggerScopeWithMessageDataBehavior),
    behavior: typeof(ExtendLoggerScopeWithMessageDataBehavior),
    description: "Extend logger scope with message data");

Best Practices

Do's

  1. Use IMessageSession for sending/publishing
  2. Scoped per request
  3. Better integration with DI

  4. Keep handlers thin

  5. Delegate business logic to domain services
  6. Handlers should only handle message concerns

  7. Use unobtrusive mode

  8. Minimize coupling with NServiceBus assemblies
  9. Use ICommand and IEvent interfaces

  10. Configure retry policies appropriately

  11. Different policies for different exception types
  12. Use immediate retries for transient failures
  13. Use delayed retries for longer recovery

  14. Enable ServiceControl integration

  15. Heartbeats for endpoint monitoring
  16. Metrics for performance tracking
  17. Auditing for compliance

  18. Use SQL persistence for sagas

  19. Reliable saga state storage
  20. Supports transactions
  21. Better for enterprise scenarios

  22. Enable outbox for critical messages

  23. Guaranteed message delivery
  24. Prevents message loss

  25. Test with NServiceBus.Testing

  26. Unit test handlers and sagas
  27. Verify message publishing

Don'ts

  1. Don't put business logic in handlers
  2. Handlers should delegate to services
  3. Keep orchestration separate

  4. Don't ignore critical errors

  5. Configure critical error handlers
  6. Log and alert appropriately

  7. Don't share domain entities as messages

  8. Use interfaces in MessagingModel
  9. Keep contracts independent

  10. Don't disable installers in production

  11. Use installers for initial setup
  12. Or create queues/tables manually

  13. Don't ignore validation

  14. Enable message validation
  15. Catch validation errors early

Common Scenarios

Scenario 1: Simple Event Publishing

// Publish event after domain operation
public async Task CreateEntity(CreateEntityInput input)
{
    var entity = new Entity { Id = input.Id };
    repository.Insert(entity);
    await unitOfWork.SaveChangesAsync();

    // Publish event
    await messageSession.Publish(new EntityCreatedEvent
    {
        EntityId = entity.Id
    });
}

Scenario 2: Saga Orchestration

// Saga handles multiple messages to orchestrate workflow
public class OrderProcessingSaga : Saga<OrderProcessingSagaData>,
    IAmStartedByMessages<OrderCreatedEvent>,
    IHandleMessages<PaymentProcessedEvent>,
    IHandleMessages<InventoryReservedEvent>
{
    public async Task Handle(OrderCreatedEvent message, IMessageHandlerContext context)
    {
        Data.OrderId = message.OrderId;

        // Send command to process payment
        await context.Send(new ProcessPaymentCommand
        {
            OrderId = Data.OrderId,
            Amount = message.TotalAmount
        });
    }

    public async Task Handle(PaymentProcessedEvent message, IMessageHandlerContext context)
    {
        Data.PaymentId = message.PaymentId;

        // Send command to reserve inventory
        await context.Send(new ReserveInventoryCommand
        {
            OrderId = Data.OrderId
        });
    }

    public async Task Handle(InventoryReservedEvent message, IMessageHandlerContext context)
    {
        // Complete saga
        MarkAsComplete();

        // Publish completion event
        await context.Publish(new OrderCompletedEvent
        {
            OrderId = Data.OrderId
        });
    }
}

Scenario 3: Request/Response Pattern

// Client sends request and waits for response
var options = new SendOptions();
options.SetDestination("DestinationEndpoint");
var request = new GetEntityRequest { Id = entityId };
var response = await messageSession.Request<GetEntityResponse>(request, options);

// Handler responds
public class GetEntityRequestHandler : IHandleMessages<GetEntityRequest>
{
    public async Task Handle(GetEntityRequest message, IMessageHandlerContext context)
    {
        var entity = await repository.GetByIdAsync(message.Id);

        await context.Reply(new GetEntityResponse
        {
            Entity = entity
        });
    }
}

Summary

NServiceBus in the ConnectSoft Microservice Template provides:

  • Enterprise Reliability: SQL-based persistence and transactions
  • ServiceControl Integration: Monitoring, heartbeats, and metrics
  • Outbox Support: Guaranteed message delivery
  • Advanced Recoverability: Sophisticated retry policies
  • Unobtrusive Mode: Minimal coupling with NServiceBus assemblies
  • Observability: OpenTelemetry and structured logging
  • Testing: Comprehensive testing support
  • Clean Architecture: Fits seamlessly with DDD and CQRS patterns

By following these patterns and best practices, NServiceBus becomes a powerful messaging infrastructure that enables scalable, reliable, and observable microservices with enterprise-grade features.