Skip to content

NServiceBus in ConnectSoft Base Template

Purpose & Overview

NServiceBus is a mature, enterprise-grade service bus framework that provides reliable message-based communication and orchestration. In the ConnectSoft Base Template, NServiceBus is integrated as a pluggable alternative to MassTransit, offering advanced features like SQL-based persistence, ServiceControl integration, and built-in outbox support.

Target Framework

The Base Template targets .NET 10. For Base Template-specific registration, configuration, and pipeline order, see the Base Template docs/NServiceBus.md in the ConnectSoft.BaseTemplate repository.

Why NServiceBus?

NServiceBus offers several key benefits for enterprise scenarios:

  • Enterprise-Grade Reliability: SQL-based saga persistence with transaction support
  • ServiceControl Integration: Built-in monitoring, heartbeats, and operational visibility
  • Outbox Pattern: Built-in outbox support for guaranteed message delivery
  • Advanced Recoverability: Sophisticated retry policies and error handling
  • Interface-Based Contracts: Type-safe message contracts using ICommand and IEvent
  • Unobtrusive Mode: Minimal coupling with NServiceBus assemblies in message contracts
  • SQL Server Transport: Native SQL Server transport option for database-centric deployments
  • Commercial Support: Available commercial support and training

NServiceBus vs MassTransit

NServiceBus is an alternative to MassTransit in the template. Choose NServiceBus when you need SQL-based persistence, ServiceControl monitoring, or commercial support. Choose MassTransit for open-source-only requirements or when preferring POCO-based message contracts.

Architecture Overview

NServiceBus integrates at the messaging layer of Clean Architecture:

Domain Services (DomainModel)
    ├── Processors (Commands/Writes)
    └── Retrievers (Queries/Reads)
    ↓ (Publish/Subscribe via IMessageSession)
NServiceBus Endpoint
    ├── Transport (SQL Server/RabbitMQ/Azure Service Bus)
    ├── Handlers (Message Handlers)
    ├── Sagas (Long-running Orchestration)
    ├── Persistence (SQL-based Saga State)
    └── Pipeline Behaviors (Custom Middleware)
Message Broker / SQL Server
    └── Queues/Subscriptions

Key Integration Points

Layer Component Responsibility
MessagingModel ICommand/IEvent Message contracts (interfaces)
DomainModel Services Send/publish via IMessageSession
ApplicationModel NServiceBusExtensions Endpoint configuration
FlowModel.NServiceBus Sagas Long-running process orchestration
PersistenceModel SQL Persistence Saga state and outbox storage

Core Components

1. Service Registration

NServiceBus is registered via extension methods:

// Program.cs
var builder = WebApplication.CreateBuilder(args);

builder.Services.AddMicroserviceOptions(builder.Configuration);
builder.Services.AddMicroserviceNServiceBus(builder.Configuration);

// Configure logging (UseNServiceBusLogging)
builder.Host.ConfigureLogging((hostingContext, logging) =>
{
    logging.AddNServiceBus();
});

var app = builder.Build();

// Start endpoint
app.ApplicationServices.StartNServiceBusEndpoint();

await app.RunAsync();

// Stop endpoint on shutdown
app.Lifetime.ApplicationStopping.Register(() =>
{
    NServiceBusExtensions.StopNServiceBusEndpoint();
});

2. Endpoint Configuration

The AddMicroserviceNServiceBus() extension method configures all NServiceBus services. Options are obtained from OptionsExtensions.NServiceBusOptions (registered in AddMicroserviceOptions):

// NServiceBusExtensions.cs
internal static IServiceCollection AddMicroserviceNServiceBus(
    this IServiceCollection services, 
    IConfiguration configuration)
{
    ArgumentNullException.ThrowIfNull(services);
    ArgumentNullException.ThrowIfNull(configuration);

    var nServiceBusOptions = OptionsExtensions.NServiceBusOptions;

    var endpointConfiguration = new EndpointConfiguration(nServiceBusOptions.EndpointName);

    // Serialization
    DefineNServiceBusSerialization(endpointConfiguration);

    // Recoverability (retry policies)
    DefineNServiceBusRecoverability(endpointConfiguration, nServiceBusOptions);

    // Critical error handling
    endpointConfiguration.DefineCriticalErrorAction(OnCriticalError);
    endpointConfiguration.SendFailedMessagesTo(nServiceBusOptions.ErrorQueue);
    endpointConfiguration.AuditProcessedMessagesTo(nServiceBusOptions.AuditQueue);
    endpointConfiguration.AuditSagaStateChanges(serviceControlQueue: nServiceBusOptions.SagaAuditQueue);

    // Heartbeat and metrics
    DefineNServiceBusHeartbeat(endpointConfiguration, nServiceBusOptions);
    DefineNServiceBusServiceControlMetrics(endpointConfiguration, nServiceBusOptions);

    // Unobtrusive mode conventions
    DefineUnobtrusiveModeConventions(endpointConfiguration);

    // Transport and routing (uses NServiceBusOptions from OptionsExtensions)
    DefineNServiceBusTransportAndRouting(endpointConfiguration, nServiceBusOptions);

    // SQL Persistence
    if (nServiceBusOptions.PersistenceType == NServiceBusPersistenceType.SqlServer)
        DefineNServiceBusSQLPersistence(nServiceBusOptions.EndpointName, endpointConfiguration, nServiceBusOptions);

    // Outbox (optional - currently commented out)
    // var outboxSettings = endpointConfiguration.EnableOutbox();

    // Licensing
    DefineNServiceBusLicensing(endpointConfiguration);

    // Message validation (NServiceBus.Community.DataAnnotations)
    DefineNServiceBusMessageValidation(endpointConfiguration);

    // OpenTelemetry is enabled by default in NServiceBus 10.x

    // Enable installers (creates queues, tables, etc.)
    endpointConfiguration.EnableInstallers();

    // Register custom pipeline behavior
    var pipeline = endpointConfiguration.Pipeline;
    pipeline.Register(
        stepId: nameof(ExtendLoggerScopeWithMessageDataBehavior),
        behavior: typeof(ExtendLoggerScopeWithMessageDataBehavior),
        description: ExtendLoggerScopeWithMessageDataBehavior.ExtendLoggerScopeWithMessageDataBehaviorDescription);

    // Configure dependency injection
    var endpointWithExternallyManagedServiceProvider = 
        DefineDependencyInjectionExternallyManagedMode(services, endpointConfiguration);

    return services;
}

3. Transport Configuration

SQL Server Transport

The template supports SQL Server transport:

private static void DefineNServiceBusTransportAndRouting(
    EndpointConfiguration endpointConfiguration, 
    IConfiguration configuration, 
    NServiceBusOptions nServiceBusOptions)
{
#if UseNServiceBusSqlServerTransport
    var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
    var connectionString = configuration.GetConnectionString(
        NServiceBusSqlServerTransportConnectionStringKey);

    // Create database and schema if not exists
    SqlServerDatabaseHelper databaseHelper = new();
    databaseHelper.CreateIfNotExists(connectionString);
    databaseHelper.CreateSchema(connectionString, nServiceBusOptions.EndpointName);
    databaseHelper.CreateSchema(connectionString, "nsb");

    transport.ConnectionString(connectionString);
    transport.DefaultSchema("nsb");

    // Create required queues
    using var connection = new SqlConnection(connectionString);
    connection.Open();

    NServiceBusQueueCreationUtils.CreateQueue(connection, "nsb", nServiceBusOptions.ErrorQueue);
    transport.UseSchemaForQueue(nServiceBusOptions.ErrorQueue, "nsb");

    NServiceBusQueueCreationUtils.CreateQueue(connection, "nsb", nServiceBusOptions.AuditQueue);
    transport.UseSchemaForQueue(nServiceBusOptions.AuditQueue, "nsb");

    NServiceBusQueueCreationUtils.CreateQueue(connection, "nsb", nServiceBusOptions.SagaAuditQueue);
    transport.UseSchemaForQueue(nServiceBusOptions.SagaAuditQueue, "nsb");

    // Configure transactions
    transport.Transactions(TransportTransactionMode.SendsAtomicWithReceive);

    // Configure subscriptions
    var subscriptions = transport.SubscriptionSettings();
    subscriptions.CacheSubscriptionInformationFor(TimeSpan.FromMinutes(1));
    subscriptions.SubscriptionTableName(
        tableName: "Subscriptions",
        schemaName: "nsb");
#endif

    // Configure routing
    transport.Routing().RouteToEndpoint(
        assembly: typeof(BaseAggregateRootCreatedEvent).Assembly,
        destination: nServiceBusOptions.EndpointName);
    transport.Routing().RouteToEndpoint(
        assembly: typeof(CreateBaseAggregateRootCommand).Assembly,
        destination: nServiceBusOptions.EndpointName);
}

4. SQL Persistence

NServiceBus uses SQL Server for saga persistence:

private static void DefineNServiceBusSQLPersistence(
    string endpointName, 
    EndpointConfiguration endpointConfiguration, 
    IConfiguration configuration)
{
    var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
    var connectionString = configuration.GetConnectionString(
        NServiceBusSQLPersistenceConnectionStringKey);

    // Create database and schema if not exists
    SqlServerDatabaseHelper databaseHelper = new();
    databaseHelper.CreateIfNotExists(connectionString);
    databaseHelper.CreateSchema(connectionString, endpointName);
    databaseHelper.CreateSchema(connectionString, "nsb");

    var dialect = persistence.SqlDialect<SqlDialect.MsSqlServer>();
    dialect.Schema(endpointName);

    persistence.ConnectionBuilder(() => new SqlConnection(connectionString));

    var subscriptions = persistence.SubscriptionSettings();
    subscriptions.CacheFor(TimeSpan.FromMinutes(1));
}

5. Sending and Publishing Messages

Using IMessageSession

NServiceBus uses IMessageSession for sending and publishing:

public class BaseAggregateRootsProcessor : IBaseAggregateRootsProcessor
{
    private readonly IMessageSession messageSession;
    private readonly ILogger<BaseAggregateRootsProcessor> logger;

    public BaseAggregateRootsProcessor(
        IMessageSession messageSession,
        ILogger<BaseAggregateRootsProcessor> logger)
    {
        this.messageSession = messageSession;
        this.logger = logger;
    }

    public async Task<IBaseAggregateRoot> CreateBaseAggregateRoot(
        CreateBaseAggregateRootInput input,
        CancellationToken token = default)
    {
        // Execute domain logic
        var entity = new BaseAggregateRoot { ObjectId = input.ObjectId };
        // ... save to repository ...

        // Publish domain event
        await this.messageSession.Publish(new BaseAggregateRootCreatedEvent
        {
            ObjectId = entity.ObjectId
        });

        // Or send command
        await this.messageSession.Send(new CreateBaseAggregateRootCommand
        {
            ObjectId = entity.ObjectId
        });

        this.logger.LogInformation(
            "Published BaseAggregateRootCreatedEvent for {ObjectId}",
            entity.ObjectId);

        return entity;
    }
}

Publishing Events

// Publish event (can have multiple subscribers)
await messageSession.Publish(new BaseAggregateRootCreatedEvent
{
    ObjectId = entity.ObjectId
});

Sending Commands

// Send command (single handler)
await messageSession.Send(new CreateBaseAggregateRootCommand
{
    ObjectId = input.ObjectId
});

6. Message Handlers

Message handlers implement IHandleMessages<T>:

public class CreateBaseAggregateRootCommandHandler :
    IHandleMessages<CreateBaseAggregateRootCommand>
{
    private readonly IBaseAggregateRootsProcessor processor;
    private readonly ILogger<CreateBaseAggregateRootCommandHandler> logger;

    public CreateBaseAggregateRootCommandHandler(
        IBaseAggregateRootsProcessor processor,
        ILogger<CreateBaseAggregateRootCommandHandler> logger)
    {
        this.processor = processor;
        this.logger = logger;
    }

    public async Task Handle(
        CreateBaseAggregateRootCommand message, 
        IMessageHandlerContext context)
    {
        this.logger.LogInformation(
            "Handling CreateBaseAggregateRootCommand for {ObjectId}",
            message.ObjectId);

        try
        {
            var input = new CreateBaseAggregateRootInput
            {
                ObjectId = message.ObjectId
            };

            await this.processor.CreateBaseAggregateRoot(input, context.CancellationToken);

            // Optionally publish success event
            await context.Publish(new BaseAggregateRootCreatedEvent
            {
                ObjectId = message.ObjectId
            });

            this.logger.LogInformation(
                "Successfully processed CreateBaseAggregateRootCommand for {ObjectId}",
                message.ObjectId);
        }
        catch (Exception ex)
        {
            this.logger.LogError(
                ex,
                "Error processing CreateBaseAggregateRootCommand for {ObjectId}",
                message.ObjectId);
            throw; // Let NServiceBus recoverability handle retry/fault
        }
    }
}

7. Sagas

NServiceBus sagas orchestrate long-running business processes. See Saga Pattern for detailed information.

Saga Example

public class BaseAggregateRootLifeCycleSaga :
    Saga<BaseAggregateRootSagaData>,
    IAmStartedByMessages<BaseAggregateRootCreatedEvent>
{
    private readonly ILogger<BaseAggregateRootLifeCycleSaga> logger;

    public BaseAggregateRootLifeCycleSaga(
        ILogger<BaseAggregateRootLifeCycleSaga> logger)
    {
        this.logger = logger;
    }

    public Task Handle(
        BaseAggregateRootCreatedEvent message, 
        IMessageHandlerContext context)
    {
        using (this.logger.BeginScope(
            new Dictionary<string, object>(StringComparer.Ordinal)
            {
                ["ApplicationFlowName"] = nameof(BaseAggregateRootLifeCycleSaga) + 
                    "/HandleBaseAggregateRootCreatedEvent",
            }))
        {
            Guid objectId = message.ObjectId;
            this.logger.LogInformation(
                "Handle BaseAggregateRoot's created event for {ObjectId} started and saga created...",
                objectId);

            this.Data.ObjectId = message.ObjectId;

            this.logger.LogInformation(
                "Handle BaseAggregateRoot's created event for {ObjectId} completed...",
                objectId);
        }

        return Task.CompletedTask;
    }

    protected override void ConfigureHowToFindSaga(
        SagaPropertyMapper<BaseAggregateRootSagaData> mapper)
    {
        mapper.MapSaga(saga => saga.ObjectId)
            .ToMessage<BaseAggregateRootCreatedEvent>(msg => msg.ObjectId);
    }
}

Saga Data

public class BaseAggregateRootSagaData : ContainSagaData
{
    public Guid ObjectId { get; set; }
    // Additional saga state properties...
}

Unobtrusive Mode

NServiceBus uses unobtrusive mode to minimize coupling with NServiceBus assemblies:

private static void DefineUnobtrusiveModeConventions(EndpointConfiguration endpointConfiguration)
{
    var conventions = endpointConfiguration.Conventions();

    // Define commands
    conventions.DefiningCommandsAs(type =>
        string.Equals(type.Namespace, typeof(CreateBaseAggregateRootCommand).Namespace, 
            StringComparison.Ordinal) &&
        typeof(ICommand).IsAssignableFrom(type));

    // Define events
    conventions.DefiningEventsAs(type =>
        string.Equals(type.Namespace, typeof(BaseAggregateRootCreatedEvent).Namespace, 
            StringComparison.Ordinal) &&
        typeof(IEvent).IsAssignableFrom(type));

    // Define claim check properties (for large payloads)
    conventions.DefiningClaimCheckPropertiesAs(property =>
        property.Name.EndsWith("DataBus", StringComparison.Ordinal) ||
        (typeof(ClaimCheckProperty<>).IsAssignableFrom(property.PropertyType) &&
         typeof(ClaimCheckProperty<>) != property.PropertyType));
}

This allows message contracts to be defined without referencing NServiceBus assemblies:

// In MessagingModel project (no NServiceBus reference)
public class CreateBaseAggregateRootCommand : ICommand
{
    public Guid ObjectId { get; set; }
}

public class BaseAggregateRootCreatedEvent : IEvent
{
    public Guid ObjectId { get; set; }
}

Configuration

appsettings.json

{
  "NServiceBus": {
    "EndpointName": "ConnectSoft.BaseTemplate",
    "AuditQueue": "audit",
    "ErrorQueue": "error",
    "SagaAuditQueue": "saga-audit",
    "Recoverability": {
      "NumberOfImmediateRetries": 0,
      "NumberOfDelayedRetries": 3,
      "DelayIntervalInSeconds": 10
    },
    "Metrics": {
      "ServiceControlMetricsAddress": "particular.monitoring",
      "ServiceControlReportingInterval": 10
    },
    "Heartbeat": {
      "ServiceControlHeartbeatAddress": "particular.servicecontrol",
      "HeartbeatInterval": 10,
      "HeartbeatTtl": 30
    }
  },
  "ConnectionStrings": {
    "ConnectSoft.BaseTemplateNServiceBusTransportConnectionString": 
      "Server=localhost;Database=NServiceBusTransport;Integrated Security=true;",
    "ConnectSoft.BaseTemplateNServiceBusPersistenceConnectionString": 
      "Server=localhost;Database=NServiceBusPersistence;Integrated Security=true;"
  }
}

Environment-Specific Configuration

// appsettings.Development.json
{
  "NServiceBus": {
    "Recoverability": {
      "NumberOfImmediateRetries": 3,
      "NumberOfDelayedRetries": 2
    }
  },
  "ConnectionStrings": {
    "ConnectSoft.BaseTemplateNServiceBusTransportConnectionString": 
      "Server=localhost;Database=NServiceBusTransport_Dev;Integrated Security=true;"
  }
}

// appsettings.Production.json
{
  "NServiceBus": {
    "Recoverability": {
      "NumberOfImmediateRetries": 0,
      "NumberOfDelayedRetries": 5,
      "DelayIntervalInSeconds": 30
    }
  },
  "ConnectionStrings": {
    "ConnectSoft.BaseTemplateNServiceBusTransportConnectionString": 
      "${NSB_TRANSPORT_CONNECTION_STRING}",
    "ConnectSoft.BaseTemplateNServiceBusPersistenceConnectionString": 
      "${NSB_PERSISTENCE_CONNECTION_STRING}"
  }
}

Recoverability and Retry Policies

Immediate Retries

private static void DefineNServiceBusRecoverability(
    EndpointConfiguration endpointConfiguration, 
    NServiceBusOptions nServiceBusOptions)
{
    var recoverability = endpointConfiguration.Recoverability();

    // Immediate retries (fail fast)
    recoverability.Immediate(immediate =>
    {
        immediate.NumberOfRetries(nServiceBusOptions.Recoverability.NumberOfImmediateRetries);
    });

    // Delayed retries (with exponential backoff)
    recoverability.Delayed(delayed =>
    {
        delayed.NumberOfRetries(nServiceBusOptions.Recoverability.NumberOfDelayedRetries);
        if (nServiceBusOptions.Recoverability.NumberOfDelayedRetries != 0 &&
            nServiceBusOptions.Recoverability.DelayIntervalInSeconds.HasValue)
        {
            delayed.TimeIncrease(TimeSpan.FromSeconds(
                nServiceBusOptions.Recoverability.DelayIntervalInSeconds.Value));
        }
    });
}

Custom Retry Policies

recoverability.Immediate(immediate =>
{
    immediate.NumberOfRetries(3);
});

recoverability.Delayed(delayed =>
{
    delayed.NumberOfRetries(5);
    delayed.TimeIncrease(TimeSpan.FromSeconds(10)); // Exponential backoff
});

// Configure unrecoverable exceptions (skip retries)
recoverability.AddUnrecoverableException<ValidationException>();

Error Queue

Failed messages (after all retries) are sent to the error queue:

endpointConfiguration.SendFailedMessagesTo("error");

Message Validation

The Base Template uses NServiceBus.Community.DataAnnotations for message validation. The deprecated NServiceBus.DataAnnotations package has been replaced by this community-maintained alternative.

private static void DefineNServiceBusMessageValidation(EndpointConfiguration endpointConfiguration)
{
    endpointConfiguration.UseDataAnnotationsValidation(
        outgoing: true,  // Validate outgoing messages (throws MessageValidationException at sender)
        incoming: true); // Validate incoming messages (handled by recoverability)
}

Validation errors result in: - MessageValidationException thrown (outgoing) or message handled by recoverability (incoming) - Added to unrecoverable exceptions to avoid unnecessary retries - FaultContractTransformerServerFilter maps MessageValidationException to ValidationFault for gRPC fault contracts

For FluentValidation, see NServiceBus.Community.FluentValidation.

ServiceControl Integration

Heartbeats

Heartbeats enable endpoint health monitoring:

private static void DefineNServiceBusHeartbeat(
    EndpointConfiguration endpointConfiguration, 
    NServiceBusOptions nServiceBusOptions)
{
    if (nServiceBusOptions.Heartbeat != null)
    {
        endpointConfiguration.SendHeartbeatTo(
            serviceControlQueue: nServiceBusOptions.Heartbeat.ServiceControlHeartbeatAddress,
            frequency: TimeSpan.FromSeconds(nServiceBusOptions.Heartbeat.HeartbeatInterval),
            timeToLive: TimeSpan.FromSeconds(nServiceBusOptions.Heartbeat.HeartbeatTtl));
    }
}

Metrics

Metrics reporting enables performance monitoring:

private static void DefineNServiceBusServiceControlMetrics(
    EndpointConfiguration endpointConfiguration, 
    NServiceBusOptions nServiceBusOptions)
{
    if (nServiceBusOptions.Metrics != null)
    {
        var metrics = endpointConfiguration.EnableMetrics();

        var machineName = $"{Dns.GetHostName()}.{IPGlobalProperties.GetIPGlobalProperties().DomainName}";
        var instanceIdentifier = $"{nServiceBusOptions.EndpointName}@{machineName}";

        metrics.SetServiceControlMetricsMessageTTBR(TimeSpan.FromHours(12));
        metrics.SendMetricDataToServiceControl(
            serviceControlMetricsAddress: nServiceBusOptions.Metrics.ServiceControlMetricsAddress,
            interval: TimeSpan.FromSeconds(nServiceBusOptions.Metrics.ServiceControlReportingInterval),
            instanceId: instanceIdentifier);
    }
}

Auditing

All processed messages can be audited:

endpointConfiguration.AuditProcessedMessagesTo("audit");
endpointConfiguration.AuditSagaStateChanges(serviceControlQueue: "saga-audit");

Outbox Pattern

NServiceBus has built-in outbox support. In the ConnectSoft Base Template, outbox is enabled via the EnableOutbox option in NServiceBus configuration (NServiceBus:EnableOutbox). The outbox uses the same persistence as saga persistence (e.g. SQL Server); no separate outbox persistence type is configured.

  • Table creation: In development, NServiceBus installers (e.g. EnableInstallers()) can create persistence tables including the outbox table at startup. For production deployments, run the provided SQL scripts in the solution (NServiceBusSqlScripts/MsSqlServer/Outbox_Create.sql, Outbox_Drop.sql) with the same @schema and @tablePrefix as your endpoint.
// Enable outbox (requires persistence; in template, when EnableOutbox is true and PersistenceType is SqlServer)
var outboxSettings = endpointConfiguration.EnableOutbox();
outboxSettings.TimeToKeepDeduplicationData(TimeSpan.FromDays(7));

See Outbox Pattern for detailed information.

Observability

OpenTelemetry Integration

#if OpenTelemetry
endpointConfiguration.EnableOpenTelemetry();
#endif

Traces include: - Message sending and receiving - Handler execution - Saga operations - Retry attempts

Structured Logging

Use ILogger<T> with correlation IDs:

public async Task Handle(MyMessage message, IMessageHandlerContext context)
{
    using (logger.BeginScope(new Dictionary<string, object>
    {
        ["MessageId"] = context.MessageId,
        ["CorrelationId"] = context.MessageHeaders["NServiceBus.CorrelationId"],
        ["ObjectId"] = message.ObjectId
    }))
    {
        logger.LogInformation("Processing message");
        // Process message
        logger.LogInformation("Message processed successfully");
    }
}

Health Checks

NServiceBus integrates with ASP.NET Core health checks:

// HealthChecksExtensions.cs
#if UseNServiceBus
builder.AddNServiceBusHealthChecks(configuration);
#endif

Health checks verify: - Transport database connectivity - Persistence database connectivity - Endpoint health

Testing

Unit Testing with NServiceBus.Testing

[TestMethod]
public async Task BaseAggregateRootLifeCycleSaga_Should_Be_Initialized_When_Event_Published()
{
    // Arrange
    var saga = new BaseAggregateRootLifeCycleSaga(logger);
    var sagaData = new BaseAggregateRootSagaData();
    var context = new TestableMessageHandlerContext();

    var @event = new BaseAggregateRootCreatedEvent
    {
        ObjectId = Guid.NewGuid()
    };

    // Act
    await saga.Handle(@event, context);

    // Assert
    Assert.AreEqual(@event.ObjectId, sagaData.ObjectId);
    Assert.IsTrue(context.PublishedMessages.Any(m => 
        m.Message is BaseAggregateRootCreatedEvent));
}

Testing Handlers

[TestMethod]
public async Task Handler_Should_Process_Command()
{
    // Arrange
    var processor = new Mock<IBaseAggregateRootsProcessor>();
    var handler = new CreateBaseAggregateRootCommandHandler(
        processor.Object, 
        logger);

    var context = new TestableMessageHandlerContext();
    var command = new CreateBaseAggregateRootCommand
    {
        ObjectId = Guid.NewGuid()
    };

    // Act
    await handler.Handle(command, context);

    // Assert
    processor.Verify(p => p.CreateBaseAggregateRoot(
        It.IsAny<CreateBaseAggregateRootInput>(), 
        It.IsAny<CancellationToken>()), 
        Times.Once);
}

Pipeline Behaviors

Custom pipeline behaviors can extend message processing:

public class ExtendLoggerScopeWithMessageDataBehavior :
    IBehavior<IMessageHandlerContext, IMessageHandlerContext>
{
    public async Task Invoke(
        IMessageHandlerContext context, 
        Func<IMessageHandlerContext, Task> next)
    {
        // Extend logger scope with message data
        using (logger.BeginScope(new Dictionary<string, object>
        {
            ["MessageId"] = context.MessageId,
            ["MessageType"] = context.MessageHeaders["NServiceBus.EnclosedMessageTypes"]
        }))
        {
            await next(context);
        }
    }
}

Register behavior:

var pipeline = endpointConfiguration.Pipeline;
pipeline.Register(
    stepId: nameof(ExtendLoggerScopeWithMessageDataBehavior),
    behavior: typeof(ExtendLoggerScopeWithMessageDataBehavior),
    description: "Extend logger scope with message data");

Best Practices

Do's

  1. Use IMessageSession for sending/publishing
  2. Scoped per request
  3. Better integration with DI

  4. Keep handlers thin

  5. Delegate business logic to domain services
  6. Handlers should only handle message concerns

  7. Use unobtrusive mode

  8. Minimize coupling with NServiceBus assemblies
  9. Use ICommand and IEvent interfaces

  10. Configure retry policies appropriately

  11. Different policies for different exception types
  12. Use immediate retries for transient failures
  13. Use delayed retries for longer recovery

  14. Enable ServiceControl integration

  15. Heartbeats for endpoint monitoring
  16. Metrics for performance tracking
  17. Auditing for compliance

  18. Use SQL persistence for sagas

  19. Reliable saga state storage
  20. Supports transactions
  21. Better for enterprise scenarios

  22. Enable outbox for critical messages

  23. Guaranteed message delivery
  24. Prevents message loss

  25. Test with NServiceBus.Testing

  26. Unit test handlers and sagas
  27. Verify message publishing

Don'ts

  1. Don't put business logic in handlers
  2. Handlers should delegate to services
  3. Keep orchestration separate

  4. Don't ignore critical errors

  5. Configure critical error handlers
  6. Log and alert appropriately

  7. Don't share domain entities as messages

  8. Use interfaces in MessagingModel
  9. Keep contracts independent

  10. Don't disable installers in production

  11. Use installers for initial setup
  12. Or create queues/tables manually

  13. Don't ignore validation

  14. Enable message validation
  15. Catch validation errors early

Common Scenarios

Scenario 1: Simple Event Publishing

// Publish event after domain operation
public async Task CreateEntity(CreateEntityInput input)
{
    var entity = new Entity { Id = input.Id };
    repository.Insert(entity);
    await unitOfWork.SaveChangesAsync();

    // Publish event
    await messageSession.Publish(new EntityCreatedEvent
    {
        EntityId = entity.Id
    });
}

Scenario 2: Saga Orchestration

// Saga handles multiple messages to orchestrate workflow
public class OrderProcessingSaga : Saga<OrderProcessingSagaData>,
    IAmStartedByMessages<OrderCreatedEvent>,
    IHandleMessages<PaymentProcessedEvent>,
    IHandleMessages<InventoryReservedEvent>
{
    public async Task Handle(OrderCreatedEvent message, IMessageHandlerContext context)
    {
        Data.OrderId = message.OrderId;

        // Send command to process payment
        await context.Send(new ProcessPaymentCommand
        {
            OrderId = Data.OrderId,
            Amount = message.TotalAmount
        });
    }

    public async Task Handle(PaymentProcessedEvent message, IMessageHandlerContext context)
    {
        Data.PaymentId = message.PaymentId;

        // Send command to reserve inventory
        await context.Send(new ReserveInventoryCommand
        {
            OrderId = Data.OrderId
        });
    }

    public async Task Handle(InventoryReservedEvent message, IMessageHandlerContext context)
    {
        // Complete saga
        MarkAsComplete();

        // Publish completion event
        await context.Publish(new OrderCompletedEvent
        {
            OrderId = Data.OrderId
        });
    }
}

Scenario 3: Request/Response Pattern

// Client sends request and waits for response
var options = new SendOptions();
options.SetDestination("DestinationEndpoint");
var request = new GetEntityRequest { Id = entityId };
var response = await messageSession.Request<GetEntityResponse>(request, options);

// Handler responds
public class GetEntityRequestHandler : IHandleMessages<GetEntityRequest>
{
    public async Task Handle(GetEntityRequest message, IMessageHandlerContext context)
    {
        var entity = await repository.GetByIdAsync(message.Id);

        await context.Reply(new GetEntityResponse
        {
            Entity = entity
        });
    }
}

Summary

NServiceBus in the ConnectSoft Base Template provides:

  • Enterprise Reliability: SQL-based persistence and transactions
  • ServiceControl Integration: Monitoring, heartbeats, and metrics
  • Outbox Support: Guaranteed message delivery
  • Advanced Recoverability: Sophisticated retry policies
  • Unobtrusive Mode: Minimal coupling with NServiceBus assemblies
  • Observability: OpenTelemetry and structured logging
  • Testing: Comprehensive testing support
  • Clean Architecture: Fits seamlessly with DDD and CQRS patterns

By following these patterns and best practices, NServiceBus becomes a powerful messaging infrastructure that enables scalable, reliable, and observable microservices with enterprise-grade features.