Skip to content

MassTransit in ConnectSoft Microservice Template

Purpose & Overview

MassTransit is the default messaging and orchestration engine integrated into the ConnectSoft Microservice Template. It provides a robust, scalable, and developer-friendly abstraction over distributed message-based communication, enabling event-driven architectures, long-running process orchestration, and reliable message delivery.

Why MassTransit?

MassTransit offers several key benefits for the template:

  • Open Source (MIT License): Free to use without commercial licensing concerns
  • Pub/Sub and Request/Response: Supports both messaging patterns
  • Saga Orchestration: Built-in support for long-running business processes
  • Transport Abstraction: Works with RabbitMQ, Azure Service Bus, Amazon SQS, and Kafka
  • Built-in Reliability: Message retries, dead-letter queues, and fault handling
  • Dependency Injection: Seamless integration with ASP.NET Core DI
  • Testing Support: Comprehensive test harness for unit and integration testing
  • Observability: First-class OpenTelemetry and structured logging support
  • Outbox Pattern: Supports transactional outbox for exactly-once delivery

MassTransit vs Alternatives

MassTransit was chosen over alternatives like NServiceBus due to its non-commercial license, active community, and flexible customization model that aligns well with ConnectSoft's DDD, AI, and Event-Sourced ecosystem.

Architecture Overview

MassTransit integrates at the messaging layer of Clean Architecture:

Domain Services (DomainModel)
    ├── Processors (Commands/Writes)
    └── Retrievers (Queries/Reads)
    ↓ (Publish/Subscribe)
MassTransit Bus
    ├── Transport (RabbitMQ/Azure Service Bus)
    ├── Consumers (Message Handlers)
    ├── Sagas (Long-running Orchestration)
    └── Middleware (Retry, Outbox, Tracing)
Message Broker
    └── Queues/Exchanges/Topics

Key Integration Points

Layer Component Responsibility
MessagingModel Commands/Events Message contracts (DTOs)
DomainModel Services Publish events via IPublishEndpoint
ApplicationModel MassTransitExtensions Bus configuration and registration
FlowModel.MassTransit Sagas Long-running process orchestration
PersistenceModel Outbox/Saga State Message persistence and saga state

Core Components

1. Service Registration

MassTransit is registered via a single extension method:

// Program.cs or Startup.cs
builder.Services.AddMicroserviceMassTransit();

2. Configuration Extension

The AddMicroserviceMassTransit() extension method configures all MassTransit services:

// MassTransitExtensions.cs
internal static IServiceCollection AddMicroserviceMassTransit(this IServiceCollection services)
{
    ArgumentNullException.ThrowIfNull(services);

    MassTransitOptions massTransitOptions = OptionsExtensions.MassTransitOptions;

    // Configure host options
    AddAndConfigureMassTransitHostOptions(services, massTransitOptions);

    // Configure correlation ID mapping
    GlobalTopology.Send.UseCorrelationId<MicroserviceAggregateRootCreatedEvent>(x => x.ObjectId);

    services.AddMassTransit(configurator =>
    {
        // Use kebab-case for endpoint names
        configurator.SetKebabCaseEndpointNameFormatter();

        // Register saga state machine
        var sagaRegistrationConfigurator = configurator
            .AddSagaStateMachine<MicroserviceAggregateRootLifeCycleSaga, MicroserviceAggregateRootLifeCycleSagaState>();

#if UseMassTransitMongoDBPersistence
        // Configure MongoDB saga persistence
        sagaRegistrationConfigurator.MongoDbRepository(repo =>
        {
            repo.Connection = massTransitOptions.MongoDbPersistence.Connection;
            repo.DatabaseName = massTransitOptions.MongoDbPersistence.DatabaseName;
            repo.CollectionName = nameof(MicroserviceAggregateRootLifeCycleSaga);
        });
#endif

        // Configure transport
#if UseMassTransitRabbitMQTransport
        ConfigureUsingRabbitMq(configurator, massTransitOptions);
#endif

#if UseMassTransitAzureServiceBusTransport
        ConfigureUsingAzureServiceBus(configurator, massTransitOptions);
#endif
    });

    // Register event bus adapter
    services.AddSingleton<IEventBus, MassTransitAdapter>();
    services.ActivateSingleton<IEventBus>();

    return services;
}

3. Transport Configuration

RabbitMQ Transport

private static void ConfigureUsingRabbitMq(
    IBusRegistrationConfigurator configurator, 
    MassTransitOptions massTransitOptions)
{
    configurator.UsingRabbitMq((context, config) =>
    {
        // Configure RabbitMQ host
        config.Host(
            massTransitOptions.RabbitMqTransport.RabbitMqTransportHost.Host,
            massTransitOptions.RabbitMqTransport.RabbitMqTransportHost.Port,
            massTransitOptions.RabbitMqTransport.RabbitMqTransportHost.VirtualHost,
            host =>
            {
                host.Username(massTransitOptions.RabbitMqTransport.RabbitMqTransportHost.UserName);
                host.Password(massTransitOptions.RabbitMqTransport.RabbitMqTransportHost.Password);

                if (massTransitOptions.RabbitMqTransport.RabbitMqTransportHost.UseSsl)
                {
                    host.UseSsl(ssl =>
                    {
                        ssl.Protocol = SslProtocols.Tls12;
                    });
                }

                host.PublisherConfirmation = massTransitOptions.RabbitMqTransport.RabbitMqHostBehaviour.PublisherConfirmation;
                host.Heartbeat(massTransitOptions.RabbitMqTransport.RabbitMqHostBehaviour.Heartbeat);
                host.RequestedChannelMax(massTransitOptions.RabbitMqTransport.RabbitMqHostBehaviour.RequestedChannelMax);
                host.RequestedConnectionTimeout(massTransitOptions.RabbitMqTransport.RabbitMqHostBehaviour.RequestedConnectionTimeout);
                host.ContinuationTimeout(massTransitOptions.RabbitMqTransport.RabbitMqHostBehaviour.ContinuationTimeout);
            });

        // Configure endpoint settings
        config.PrefetchCount = massTransitOptions.RabbitMqTransport.RabbitMqReceiveEndpoint.PrefetchCount;
        config.PurgeOnStartup = massTransitOptions.RabbitMqTransport.RabbitMqReceiveEndpoint.PurgeOnStartup;
        config.AutoDelete = massTransitOptions.RabbitMqTransport.RabbitMqReceiveEndpoint.AutoDelete;
        config.Durable = massTransitOptions.RabbitMqTransport.RabbitMqReceiveEndpoint.Durable;
        config.ConcurrentMessageLimit = massTransitOptions.RabbitMqTransport.RabbitMqReceiveEndpoint.ConcurrentMessageLimit;

        // Configure retry policy
        config.UseMessageRetry(retryConfig =>
        {
            retryConfig.Interval(
                retryCount: 3,
                interval: TimeSpan.FromSeconds(5));
        });

        // Auto-configure endpoints for consumers and sagas
        config.ConfigureEndpoints(context);
    });
}

Azure Service Bus Transport

private static void ConfigureUsingAzureServiceBus(
    IBusRegistrationConfigurator configurator, 
    MassTransitOptions massTransitOptions)
{
    configurator.UsingAzureServiceBus((context, config) =>
    {
        if (!string.IsNullOrWhiteSpace(massTransitOptions.AzureServiceBusTransport.AzureServiceBusHost.ConnectionString))
        {
            config.Host(massTransitOptions.AzureServiceBusTransport.AzureServiceBusHost.ConnectionString, host =>
            {
                host.RetryLimit = massTransitOptions.AzureServiceBusTransport.RetryLimit;
                host.RetryMaxBackoff = TimeSpan.FromSeconds(
                    massTransitOptions.AzureServiceBusTransport.RetryMaxBackoffSeconds ?? 30);
                host.RetryMinBackoff = TimeSpan.FromSeconds(
                    massTransitOptions.AzureServiceBusTransport.RetryMinBackoffSeconds ?? 5);
            });
        }
        else
        {
            config.Host(massTransitOptions.AzureServiceBusTransport.AzureServiceBusHost.FullyQualifiedNamespace, host =>
            {
                if (massTransitOptions.AzureServiceBusTransport.AzureServiceBusHost.UseManagedIdentity)
                {
                    if (!string.IsNullOrWhiteSpace(
                        massTransitOptions.AzureServiceBusTransport.AzureServiceBusHost.ClientId))
                    {
                        host.TokenCredential = new Azure.Identity.DefaultAzureCredential(
                            new Azure.Identity.DefaultAzureCredentialOptions
                            {
                                ManagedIdentityClientId = 
                                    massTransitOptions.AzureServiceBusTransport.AzureServiceBusHost.ClientId,
                            });
                    }
                    else
                    {
                        host.TokenCredential = new Azure.Identity.DefaultAzureCredential();
                    }
                }

                if (massTransitOptions.AzureServiceBusTransport.AzureServiceBusHost.UseWebSockets)
                {
                    host.TransportType = Azure.Messaging.ServiceBus.ServiceBusTransportType.AmqpWebSockets;
                }
                else
                {
                    host.TransportType = Azure.Messaging.ServiceBus.ServiceBusTransportType.AmqpTcp;
                }

                host.RetryLimit = massTransitOptions.AzureServiceBusTransport.RetryLimit;
                host.RetryMaxBackoff = TimeSpan.FromSeconds(
                    massTransitOptions.AzureServiceBusTransport.RetryMaxBackoffSeconds ?? 30);
                host.RetryMinBackoff = TimeSpan.FromSeconds(
                    massTransitOptions.AzureServiceBusTransport.RetryMinBackoffSeconds ?? 5);
            });
        }

        // Configure receive endpoint
        config.ReceiveEndpoint(
            queueName: "microservice-masstransit",
            configureEndpoint: endpointConfig =>
            {
                endpointConfig.PrefetchCount = 
                    massTransitOptions.AzureServiceBusTransport.AzureServiceBusReceiveEndpoint.PrefetchCount;
                endpointConfig.ConcurrentMessageLimit = 
                    massTransitOptions.AzureServiceBusTransport.AzureServiceBusReceiveEndpoint.ConcurrentMessageLimit;
            });

        config.PrefetchCount = 
            massTransitOptions.AzureServiceBusTransport.AzureServiceBusReceiveEndpoint.PrefetchCount;
        config.ConcurrentMessageLimit = 
            massTransitOptions.AzureServiceBusTransport.AzureServiceBusReceiveEndpoint.PrefetchCount;

        config.ConfigureEndpoints(context);
    });
}

4. Publishing Messages

Using IPublishEndpoint

The recommended way to publish messages is via IPublishEndpoint:

public class MicroserviceAggregateRootsProcessor : IMicroserviceAggregateRootsProcessor
{
    private readonly IPublishEndpoint publishEndpoint;
    private readonly ILogger<MicroserviceAggregateRootsProcessor> logger;

    public MicroserviceAggregateRootsProcessor(
        IPublishEndpoint publishEndpoint,
        ILogger<MicroserviceAggregateRootsProcessor> logger)
    {
        this.publishEndpoint = publishEndpoint;
        this.logger = logger;
    }

    public async Task<IMicroserviceAggregateRoot> CreateMicroserviceAggregateRoot(
        CreateMicroserviceAggregateRootInput input,
        CancellationToken token = default)
    {
        // Execute domain logic
        var entity = new MicroserviceAggregateRoot { ObjectId = input.ObjectId };
        // ... save to repository ...

        // Publish domain event
        await this.publishEndpoint.Publish(new MicroserviceAggregateRootCreatedEvent
        {
            ObjectId = entity.ObjectId
        }, token);

        this.logger.LogInformation(
            "Published MicroserviceAggregateRootCreatedEvent for {ObjectId}", 
            entity.ObjectId);

        return entity;
    }
}

IPublishEndpoint vs IBus

Always use IPublishEndpoint instead of IBus for publishing. IPublishEndpoint is scoped per request and more testable.

Publishing from Domain Services

// Domain service can publish events after business operations
public async Task ProcessOrder(Order order)
{
    // Business logic
    await repository.SaveAsync(order);

    // Publish event (using scoped IPublishEndpoint)
    await publishEndpoint.Publish(new OrderCreatedEvent
    {
        OrderId = order.Id,
        CustomerId = order.CustomerId,
        TotalAmount = order.TotalAmount
    });
}

5. Consuming Messages

Consumers implement IConsumer<T> and handle incoming messages:

// Example consumer (if implemented in template)
public class CreateMicroserviceAggregateRootCommandConsumer : 
    IConsumer<CreateMicroserviceAggregateRootCommand>
{
    private readonly IMicroserviceAggregateRootsProcessor processor;
    private readonly ILogger<CreateMicroserviceAggregateRootCommandConsumer> logger;

    public CreateMicroserviceAggregateRootCommandConsumer(
        IMicroserviceAggregateRootsProcessor processor,
        ILogger<CreateMicroserviceAggregateRootCommandConsumer> logger)
    {
        this.processor = processor;
        this.logger = logger;
    }

    public async Task Consume(ConsumeContext<CreateMicroserviceAggregateRootCommand> context)
    {
        this.logger.LogInformation(
            "Consuming CreateMicroserviceAggregateRootCommand for {ObjectId}", 
            context.Message.ObjectId);

        try
        {
            var input = new CreateMicroserviceAggregateRootInput
            {
                ObjectId = context.Message.ObjectId
            };

            await this.processor.CreateMicroserviceAggregateRoot(input, context.CancellationToken);

            // Optionally publish success event
            await context.Publish(new MicroserviceAggregateRootCreatedEvent
            {
                ObjectId = context.Message.ObjectId
            }, context.CancellationToken);

            this.logger.LogInformation(
                "Successfully processed CreateMicroserviceAggregateRootCommand for {ObjectId}",
                context.Message.ObjectId);
        }
        catch (Exception ex)
        {
            this.logger.LogError(
                ex,
                "Error processing CreateMicroserviceAggregateRootCommand for {ObjectId}",
                context.Message.ObjectId);
            throw; // Let MassTransit handle retry/fault
        }
    }
}

Consumer Registration

Consumers are automatically registered when using namespace scanning:

// Auto-register all consumers in namespace
configurator.AddConsumersFromNamespaceContaining<CreateMicroserviceAggregateRootCommandConsumer>();

// Or register manually
configurator.AddConsumer<CreateMicroserviceAggregateRootCommandConsumer>();

6. Sagas

MassTransit sagas orchestrate long-running business processes. See Saga Pattern for detailed information.

Saga State Machine Example

public class MicroserviceAggregateRootLifeCycleSaga :
    MassTransitStateMachine<MicroserviceAggregateRootLifeCycleSagaState>
{
    public State? MicroserviceAggregateRootCreatedEventState { get; private set; }
    public State? MicroserviceAggregateRootCreatedFaultedState { get; private set; }

    public Event<MicroserviceAggregateRootCreatedEvent>? MicroserviceAggregateRootCreatedEvent { get; private set; }
    public Event<Fault<MicroserviceAggregateRootCreatedEvent>>? MicroserviceAggregateRootCreatedFaultEvent { get; private set; }

    public MicroserviceAggregateRootLifeCycleSaga(
        ILogger<MicroserviceAggregateRootLifeCycleSaga> logger,
        TimeProvider timeProvider)
    {
        this.InstanceState(x => x.CurrentState);

        // Define events and correlation
        this.Event(() => this.MicroserviceAggregateRootCreatedEvent, e =>
        {
            e.CorrelateBy<Guid>(state => state.ObjectId, context => context.Message.ObjectId);
            e.InsertOnInitial = true;
            e.SetSagaFactory(ctx => new MicroserviceAggregateRootLifeCycleSagaState
            {
                CorrelationId = ctx.Message.ObjectId,
                ObjectId = ctx.Message.ObjectId
            });
        });

        this.Event(() => this.MicroserviceAggregateRootCreatedFaultEvent, configurator =>
        {
            configurator.CorrelateById(ctx => ctx.InitiatorId ?? ctx.Message.Message.ObjectId);
        });

        // Define state machine behavior
        this.Initially(
            When(this.MicroserviceAggregateRootCreatedEvent)
                .Then(context =>
                {
                    logger.LogInformation(
                        "Saga started for ObjectId={ObjectId}",
                        context.Message.ObjectId);
                    context.Saga.StartedAt = timeProvider.GetUtcNow();
                })
                .TransitionTo(this.MicroserviceAggregateRootCreatedEventState));

        // Define fault compensation
        this.DuringAny(
            When(this.MicroserviceAggregateRootCreatedFaultEvent)
                .Then(context => this.ProcessMicroserviceAggregateRootCreatedEventFaulted(context))
                .TransitionTo(this.MicroserviceAggregateRootCreatedFaultedState));
    }

    private void ProcessMicroserviceAggregateRootCreatedEventFaulted(
        BehaviorContext<MicroserviceAggregateRootLifeCycleSagaState, 
            Fault<MicroserviceAggregateRootCreatedEvent>> context)
    {
        // Log fault and handle compensation
        // ...
    }
}

Configuration

appsettings.json

{
  "MassTransit": {
    "MassTransitHost": {
      "StopTimeout": "00:00:30",
      "StartTimeout": "00:01:00",
      "WaitUntilStarted": true,
      "ConsumerStopTimeout": "00:00:30"
    },
    "RabbitMqTransport": {
      "RabbitMqTransportHost": {
        "Host": "localhost",
        "Port": 5672,
        "VirtualHost": "/",
        "UserName": "guest",
        "Password": "guest",
        "UseSsl": false
      },
      "RabbitMqHostBehaviour": {
        "PublisherConfirmation": true,
        "Heartbeat": "00:00:30",
        "RequestedChannelMax": 0,
        "RequestedConnectionTimeout": "00:01:00",
        "ContinuationTimeout": "00:00:20"
      },
      "RabbitMqReceiveEndpoint": {
        "PrefetchCount": 16,
        "PurgeOnStartup": false,
        "AutoDelete": false,
        "Durable": true,
        "ConcurrentMessageLimit": 10
      }
    },
    "MongoDbPersistence": {
      "Connection": "mongodb://localhost:27017",
      "DatabaseName": "SagaStateDb"
    }
  }
}

Environment-Specific Configuration

// appsettings.Development.json
{
  "MassTransit": {
    "RabbitMqTransport": {
      "RabbitMqTransportHost": {
        "Host": "localhost",
        "UserName": "dev_user",
        "Password": "dev_pass"
      }
    },
    "MongoDbPersistence": {
      "Connection": "mongodb://localhost:27017",
      "DatabaseName": "DevSagaStateDb"
    }
  }
}

// appsettings.Production.json
{
  "MassTransit": {
    "RabbitMqTransport": {
      "RabbitMqTransportHost": {
        "Host": "prod-rabbitmq.internal",
        "UserName": "${RABBITMQ_USERNAME}",
        "Password": "${RABBITMQ_PASSWORD}"
      }
    },
    "MongoDbPersistence": {
      "Connection": "${MONGODB_CONNECTION_STRING}",
      "DatabaseName": "ProdSagaStateDb"
    }
  }
}

Message Patterns

Publish/Subscribe (Events)

Events are published and can have multiple subscribers:

// Publish event
await publishEndpoint.Publish(new MicroserviceAggregateRootCreatedEvent
{
    ObjectId = entity.ObjectId
});

// Multiple consumers can handle the same event
// - Saga receives event for orchestration
// - Audit consumer logs the event
// - Notification consumer sends alerts

Request/Response

For request/response patterns:

// Client side
var client = bus.CreateRequestClient<GetEntityQuery>(TimeSpan.FromSeconds(30));
var response = await client.GetResponse<GetEntityResponse>(
    new GetEntityQuery { Id = entityId });

// Handler side
public class GetEntityQueryConsumer : IConsumer<GetEntityQuery>
{
    public async Task Consume(ConsumeContext<GetEntityQuery> context)
    {
        var entity = await repository.GetByIdAsync(context.Message.Id);

        await context.RespondAsync(new GetEntityResponse
        {
            Entity = entity
        });
    }
}

When to Use Each Pattern

  • Publish/Subscribe: For domain events, notifications, multiple interested parties
  • Request/Response: For queries, synchronous operations requiring immediate response

Retry and Error Handling

Retry Configuration

config.UseMessageRetry(retryConfig =>
{
    // Immediate retries
    retryConfig.Interval(
        retryCount: 3,
        interval: TimeSpan.FromSeconds(5));

    // Or exponential backoff
    retryConfig.Exponential(
        retryLimit: 5,
        minInterval: TimeSpan.FromSeconds(1),
        maxInterval: TimeSpan.FromSeconds(30),
        intervalDelta: TimeSpan.FromSeconds(2));
});

// Configure retry filter (retry only on specific exceptions)
config.UseMessageRetry(retryConfig =>
{
    retryConfig.Except<ValidationException>()
        .Interval(3, TimeSpan.FromSeconds(5));
});

Delayed Redelivery

For longer delays between attempts:

config.UseDelayedRedelivery(redeliveryConfig =>
{
    redeliveryConfig.Intervals(
        TimeSpan.FromSeconds(10),
        TimeSpan.FromSeconds(30),
        TimeSpan.FromMinutes(1));
});

Dead Letter Queues

After all retries are exhausted, messages are moved to error queues:

  • RabbitMQ: queue-name_error
  • Azure Service Bus: Dead-letter queue (automatically created)

Messages in error queues contain: - Original message - Exception details - Retry count - Timestamp

Fault Handling

Sagas can handle faults explicitly:

// Define fault event
public Event<Fault<CreateCommand>>? CreateCommandFaulted { get; private set; }

// Handle fault in saga
During(Any,
    When(CreateCommandFaulted)
        .Then(context =>
        {
            logger.LogError(
                "Command failed: {Error}",
                context.Message.Exceptions.First().Message);
            // Perform compensation
        })
        .TransitionTo(FailedState));

Observability

OpenTelemetry Integration

MassTransit automatically creates spans for message operations:

// In OpenTelemetryExtensions.cs
services.AddOpenTelemetry()
    .WithTracing(tracerProviderBuilder =>
    {
        tracerProviderBuilder
            .AddSource("MassTransit")
            .AddSource("MassTransit.Saga")
            .AddMassTransitInstrumentation(); // Enable MassTransit tracing
    });

Traces include: - Message publishing - Message consumption - Saga transitions - Retry attempts - Fault handling

Structured Logging

Use ILogger<T> with correlation IDs:

public async Task Consume(ConsumeContext<MyMessage> context)
{
    using (logger.BeginScope(new Dictionary<string, object>
    {
        ["MessageId"] = context.MessageId,
        ["CorrelationId"] = context.CorrelationId,
        ["ObjectId"] = context.Message.ObjectId
    }))
    {
        logger.LogInformation("Processing message");
        // Process message
        logger.LogInformation("Message processed successfully");
    }
}

Health Checks

MassTransit integrates with ASP.NET Core health checks:

// HealthChecksExtensions.cs
builder.AddMassTransitHealthChecks(services);

// Health check endpoint
// GET /health → Includes MassTransit bus status

Testing

Unit Testing with Test Harness

MassTransit provides a test harness for in-memory testing:

[TestMethod]
public async Task MicroserviceAggregateRootLifeCycleSaga_Should_Be_Initialized_When_Event_Published()
{
    // Arrange
    var services = new ServiceCollection();
    services.AddMassTransitTestHarness(configurator =>
    {
        configurator.AddSagaStateMachine<
            MicroserviceAggregateRootLifeCycleSaga,
            MicroserviceAggregateRootLifeCycleSagaState>();
    });

    var provider = services.BuildServiceProvider();
    var harness = provider.GetRequiredService<ITestHarness>();

    await harness.Start();

    var @event = new MicroserviceAggregateRootCreatedEvent
    {
        ObjectId = Guid.NewGuid()
    };

    // Act
    await harness.Bus.Publish(@event);

    // Assert
    Assert.IsTrue(await harness.Published.Any<MicroserviceAggregateRootCreatedEvent>());
    Assert.IsTrue(await harness.Consumed.Any<MicroserviceAggregateRootCreatedEvent>());

    var sagaHarness = harness.GetSagaStateMachineHarness<
        MicroserviceAggregateRootLifeCycleSaga,
        MicroserviceAggregateRootLifeCycleSagaState>();

    Assert.IsTrue(await sagaHarness.Created.Any());

    var instance = sagaHarness.Created.ContainsInState(
        correlationId: @event.ObjectId,
        machine: sagaHarness.StateMachine,
        state: sagaHarness.StateMachine.MicroserviceAggregateRootCreatedEventState);

    Assert.IsNotNull(instance);
    Assert.AreEqual(@event.ObjectId, instance.ObjectId);
    Assert.AreEqual(@event.ObjectId, instance.CorrelationId);
}

Testing Consumers

[TestMethod]
public async Task Consumer_Should_Process_Command()
{
    var services = new ServiceCollection();
    services.AddMassTransitTestHarness(configurator =>
    {
        configurator.AddConsumer<CreateMicroserviceAggregateRootCommandConsumer>();
    });

    var provider = services.BuildServiceProvider();
    var harness = provider.GetRequiredService<ITestHarness>();

    await harness.Start();

    var command = new CreateMicroserviceAggregateRootCommand
    {
        ObjectId = Guid.NewGuid()
    };

    await harness.InputQueueSendEndpoint.Send(command);

    Assert.IsTrue(await harness.Consumed.Any<CreateMicroserviceAggregateRootCommand>());
    Assert.IsTrue(await harness.Published.Any<MicroserviceAggregateRootCreatedEvent>());
}

Integration with Domain Services

Publishing from Processors

public class MicroserviceAggregateRootsProcessor : IMicroserviceAggregateRootsProcessor
{
    private readonly IPublishEndpoint publishEndpoint;
    private readonly IMicroserviceAggregateRootsRepository repository;

    public async Task<IMicroserviceAggregateRoot> CreateMicroserviceAggregateRoot(
        CreateMicroserviceAggregateRootInput input,
        CancellationToken token)
    {
        // Domain logic
        var entity = new MicroserviceAggregateRoot { ObjectId = input.ObjectId };
        repository.Insert(entity);
        await unitOfWork.SaveChangesAsync(token);

        // Publish domain event
        await this.publishEndpoint.Publish(new MicroserviceAggregateRootCreatedEvent
        {
            ObjectId = entity.ObjectId
        }, token);

        return entity;
    }
}

Using Outbox Pattern

For guaranteed delivery, use the outbox pattern:

// Enable outbox in configuration
services.AddMassTransit(x =>
{
    x.AddEntityFrameworkOutbox<MyDbContext>(o =>
    {
        o.QueryDelay = TimeSpan.FromSeconds(10);
        o.UseSqlServer();

        o.UseBusOutbox(bus =>
        {
            bus.Durable = true;
            bus.ConcurrentMessageLimit = 10;
        });
    });
});

See Outbox Pattern for detailed information.

Best Practices

Do's

  1. Use IPublishEndpoint for publishing
  2. Scoped per request
  3. More testable than IBus
  4. Better integration with DI

  5. Keep consumers thin

  6. Delegate business logic to domain services
  7. Consumers should only handle message concerns

  8. Use correlation IDs

  9. Include CorrelationId in all messages
  10. Use for tracing across services
  11. Enable with GlobalTopology.Send.UseCorrelationId<T>()

  12. Handle faults explicitly

  13. Define fault events in sagas
  14. Implement compensation logic
  15. Log faults with context

  16. Configure retry policies appropriately

  17. Different policies for different exception types
  18. Use exponential backoff for transient failures
  19. Set reasonable retry limits

  20. Use structured logging

  21. Include correlation IDs in log scope
  22. Log message start and completion
  23. Include relevant business identifiers

  24. Test with MassTransit Test Harness

  25. Use in-memory transport for tests
  26. Verify message publishing and consumption
  27. Test saga state transitions

  28. Separate message contracts from domain

  29. Messages in MessagingModel project
  30. Keep contracts simple POCOs
  31. No business logic in messages

Don'ts

  1. Don't put business logic in consumers
  2. Consumers should delegate to services
  3. Keep orchestration separate from business rules

  4. Don't use IBus directly

  5. Prefer IPublishEndpoint for publishing
  6. Prefer ISendEndpointProvider for sending

  7. Don't ignore faults

  8. Always handle Fault<T> in sagas
  9. Log faults appropriately
  10. Implement compensation

  11. Don't share domain entities as messages

  12. Use DTOs in MessagingModel
  13. Keep contracts independent of persistence

  14. Don't configure endpoints manually

  15. Use ConfigureEndpoints(context) for auto-configuration
  16. Let MassTransit handle routing

  17. Don't swallow exceptions

  18. Let exceptions propagate for retry/fault handling
  19. Only catch if you have a specific recovery strategy

Common Scenarios

Scenario 1: Simple Event Publishing

// Publish event after domain operation
public async Task CreateEntity(CreateEntityInput input)
{
    var entity = new Entity { Id = input.Id };
    repository.Insert(entity);
    await unitOfWork.SaveChangesAsync();

    // Publish event
    await publishEndpoint.Publish(new EntityCreatedEvent
    {
        EntityId = entity.Id
    });
}

Scenario 2: Saga Orchestration

// Saga handles multiple events to orchestrate workflow
public class OrderProcessingSaga : MassTransitStateMachine<OrderProcessingSagaState>
{
    Initially(
        When(OrderCreated)
            .Publish(ProcessPaymentCommand)
            .TransitionTo(PaymentProcessing));

    During(PaymentProcessing,
        When(PaymentProcessed)
            .Publish(ReserveInventoryCommand)
            .TransitionTo(InventoryReserved),
        When(PaymentFailed)
            .Publish(CancelOrderCommand)
            .TransitionTo(Failed));
}

Scenario 3: Request/Response Pattern

// Client requests data
var client = bus.CreateRequestClient<GetEntityQuery>();
var response = await client.GetResponse<GetEntityResponse>(
    new GetEntityQuery { Id = entityId });

// Handler responds
public class GetEntityQueryConsumer : IConsumer<GetEntityQuery>
{
    public async Task Consume(ConsumeContext<GetEntityQuery> context)
    {
        var entity = await repository.GetByIdAsync(context.Message.Id);
        await context.RespondAsync(new GetEntityResponse { Entity = entity });
    }
}

Configuration Reference

MassTransitOptions

Option Description Default
MassTransitHost.StopTimeout Time to wait for bus to stop 00:00:30
MassTransitHost.StartTimeout Time to wait for bus to start 00:01:00
MassTransitHost.WaitUntilStarted Wait for bus to start before continuing true
RabbitMqTransport.RabbitMqReceiveEndpoint.PrefetchCount Messages to prefetch 16
RabbitMqTransport.RabbitMqReceiveEndpoint.ConcurrentMessageLimit Max concurrent messages 10

Endpoint Naming

MassTransit uses kebab-case for endpoint names: - CreateMicroserviceAggregateRootCommandcreate-microservice-aggregate-root-command - MicroserviceAggregateRootCreatedEventmicroservice-aggregate-root-created-event

Summary

MassTransit in the ConnectSoft Microservice Template provides:

  • Reliable Messaging: Retry, dead-letter queues, fault handling
  • Saga Orchestration: Long-running process management
  • Transport Flexibility: RabbitMQ, Azure Service Bus support
  • Observability: OpenTelemetry and structured logging
  • Testing: Comprehensive test harness
  • Clean Architecture: Fits seamlessly with DDD and CQRS patterns
  • Outbox Support: Guaranteed message delivery

By following these patterns and best practices, MassTransit becomes a powerful messaging infrastructure that enables scalable, reliable, and observable microservices.