In the era of cloud-native applications, scalability, flexibility, and resilience are key to building distributed systems. MassTransit, a powerful .NET library, enables developers to easily implement event-driven architecture (EDA) and reactive systems. In this post, we’ll explore how MassTransit works, its benefits for building reactive and resilient systems, and why it’s a great fit for your .NET applications.
.NET is a free, cross-platform, and open-source developer platform from Microsoft. It allows developers to create applications for web, mobile, desktop, cloud, and IoT. Key features of .NET include:
For distributed systems, .NET offers:
MassTransit is a .NET-based library that simplifies the development of message-driven applications. It helps you connect components in distributed systems through messaging, making your applications more flexible, scalable, and resilient. MassTransit integrates with popular message brokers such as RabbitMQ, Kafka, and Azure Service Bus.
Why Use MassTransit?
Event-Driven Architecture (EDA) is a software design pattern where system components communicate by sending and receiving events. Events represent changes in state or trigger business processes.
Key Components of EDA:
EDA provides several advantages for modern distributed systems:
A reactive system is designed to be responsive, resilient, elastic, and message-driven. These characteristics make systems adaptable to real-time changes and varying loads.
MassTransit and Reactive Systems:
MassTransit supports the message-driven nature of reactive systems by enabling asynchronous communication between services. It also provides features like retry policies and circuit breakers to ensure that failures are handled gracefully, making systems resilient.
Reactive systems built with MassTransit are:
Building Reactive and Resilient Systems with MassTransit in .NET - Code Sample on Github
Create a new solution to contain both the producer and consumer projects:
dotnet new sln -n MassTransitExample
Create the producer project:
dotnet new console -n ProducerApp
Create the consumer project:
dotnet new console -n ConsumerApp
Add both projects to the solution:
dotnet sln add ProducerApp/ProducerApp.csproj
dotnet sln add ConsumerApp/ConsumerApp.csproj
You need to install the MassTransit and RabbitMQ packages in both projects.
ProducerApp:
cd ProducerApp
dotnet add package MassTransit
dotnet add package MassTransit.RabbitMQ
ConsumerApp:
cd ../ConsumerApp
dotnet add package MassTransit
dotnet add package MassTransit.RabbitMQ
You’ll need a shared project or a shared class for the event message. Let’s create a simple class to represent the OrderCreated event.
In the solution root directory, create a new folder called Shared and inside that folder create a Messages.cs file:
mkdir Shared
cd Shared
touch Messages.cs
Add the following code to define the event in Messages.cs:
namespace Shared
{
public class OrderCreated
{
public Guid OrderId { get; set; }
public DateTime CreatedAt { get; set; }
}
}
Add a reference to the shared project in both ProducerApp and ConsumerApp: Open ProducerApp/ProducerApp.csproj and add the shared project reference:
<ItemGroup>
<ProjectReference Include="../Shared/Shared.csproj" />
</ItemGroup>
Repeat the same steps for ConsumerApp.
Open the ProducerApp/Program.cs and update it to publish the OrderCreated event to RabbitMQ using MassTransit.
using MassTransit;
using Shared;
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host("rabbitmq://localhost", h =>
{
h.Username("guest");
h.Password("guest");
});
});
await busControl.StartAsync();
try
{
Console.WriteLine("Publishing order created events every 2 seconds...");
while (true) // Infinite loop to keep publishing
{
// Create and publish an event (create a new order)
var orderCreated = new OrderCreated
{
OrderId = Guid.NewGuid(),
CreatedAt = DateTime.UtcNow
};
await busControl.Publish(orderCreated);
Console.WriteLine($"Order created event published: {orderCreated.OrderId} at {orderCreated.CreatedAt}");
// Wait for 2 seconds before publishing the next event
await Task.Delay(TimeSpan.FromSeconds(2));
}
}
finally
{
await busControl.StopAsync();
}
This code sets up the RabbitMQ connection and publishes the OrderCreated event to the RabbitMQ broker.
Open the ConsumerApp/Program.cs and update it to consume the OrderCreated event from RabbitMQ using MassTransit.
using MassTransit;
using Microsoft.Extensions.Hosting;
using Shared;
public class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
// MassTransit setup
services.AddMassTransit(x =>
{
// Register the consumer
x.AddConsumer<OrderCreatedConsumer>(config =>
{
// This sets up the error queue for the consumer
config.UseMessageRetry(r => r.Interval(2, TimeSpan.FromSeconds(1))); // Retry policy
});
// Configure RabbitMQ
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
// Register the consumer's endpoint
cfg.ReceiveEndpoint("order-created-event", e =>
{
// Ensure the context parameter is passed to correctly configure the consumer
e.ConfigureConsumer<OrderCreatedConsumer>(context);
e.UseMessageRetry(r => r.Immediate(5)); // Retry 5 times immediately
});
});
});
// Add MassTransit Hosted Service to automatically manage bus lifecycle
services.AddMassTransitHostedService(true);
});
}
// Define your consumer class
public class OrderCreatedConsumer : IConsumer<OrderCreated>
{
public Task Consume(ConsumeContext<OrderCreated> context)
{
var message = context.Message;
// Intentional error creation so that the Exchange and Error Queue are created automatically
if (message.OrderId.ToString().StartsWith("5"))
{
throw new Exception("Order ID starts with 5");
}
Console.WriteLine($"Order Received: {message.OrderId} at {message.CreatedAt}");
return Task.CompletedTask;
}
}
This code configures a consumer that listens for OrderCreated events and prints the event details when received.
Ensure RabbitMQ is running (use Docker or install it locally). If using Docker, you can run:
docker run -d --hostname rabbitmq --name rabbitmq -p 5672:5672 rabbitmq:3-management
Run the ConsumerApp first:
cd ConsumerApp
dotnet run
The consumer will start listening for events and wait for OrderCreated messages.
Run the ProducerApp:
cd ../ProducerApp
dotnet run
When the producer runs, it will publish an OrderCreated event, which the consumer will receive and process.
You should see the following in the consumer’s console output:
Order Received: f47ac10b-58cc-4372-a567-0e02b2c3d479 at 10/08/2024 14:25:00
To add retry policies or other resiliency features, you can modify the consumer configuration.
For example, in ConsumerApp/Program.cs, add the retry policy:
cfg.ReceiveEndpoint("order-created-event", e =>
{
e.Consumer<OrderCreatedConsumer>();
// Retry policy
e.UseMessageRetry(r => r.Immediate(5)); // Retry 5 times immediately
});
MassTransit and .NET offer a powerful combination for building scalable, resilient, and reactive systems based on Event-Driven Architecture. By decoupling components, handling failures gracefully, and ensuring that services can respond to real-time events, you can build applications that are prepared for modern distributed environments. Leveraging the patterns and features discussed here, you can create systems that not only scale with demand but also stay resilient in the face of challenges.