Building an Event Driven .NET Application: Setting Up MassTransit and RabbitMQ

Category

Distributed Systems

Published on
Authors

Series

  • Building an Event Driven .NET Application: The Fundamentals
  • Building an Event Driven .NET Application: Setting Up MassTransit and RabbitMQ
  • Building an Event Driven .NET Application: Testing Pub/Subs (TBD)
  • Building an Event Driven .NET Application: Error Handling (TBD)
  • Building an Event Driven .NET Application: Open Telemetry Tracing (TBD)

Introduction

The purpose of this series is to go through and in-depth walkthrough of setting up an event driven architecture in a distributed .NET application.

This is the second post in the Building an Event Driven .NET Application series. In this post, we’ll see how we can set up our MassTransit bus to send message to and receive messages from RabbitMQ. In the last post, we covered the core concepts and now we get to look at some code.

⚠ Unless you're already very familiar with the EDA flow, I would strongly recommend going through the core concepts post. It won't take very long 🙂

Setting Up RabbitMQ 🐰

We're going to start out by setting up our message broker (this is our post office where we send all of our messages for distribution). In this example, we're going to use RabbitMQ.

Running RabbitMQ Locally Using Docker

While RabbitMQ has a great docker page, we're actually going to use the MassTransit Image instead to make our lives a bit easier (MassTransit is the message bus we're going to be using).

docker run -p 15672:15672 -p 5672:5672 masstransit/rabbitmq

Once you have the container up and running, you can go to localhost:15672 and log in with the user guest and password guest.

Using a Deployed Instance of RabbitMQ

There are obviously a myriad of options to use when deploying RabbitMQ, but I want to give a special shout out to CloudAMQP. They offer RMQ as a Service and focus entirely on doing this very well. They also have great docs and even have a free-tier for devs to work in and get a feel for things, so it's perfect for this use case.

Understanding the RabbitMQ User Interface

The MassTransit docker image we used for RabbitMQ leverages the management plugin to give you a use interface (this what you hit if you went to localhost:15672 or your CloudAMQP deployment). It isn't the best UI you've ever seen, but it has some pretty good features for managing your RMQ setup.

CloudAMQP has a great write up on the different aspects of the UI and I'd definitely recommend checking it out.

Publishing and Subscribing to Messages Using MassTransit in .NET Web APIs

So now that we have our message broker set up, let's send it some messages! We could roll our own code for this, but there are several really good libraries out there (MassTransit, NServiceBus, Brighter, Rebus) that handle publishing and subscribing to messages for use already. No need to reinvent the wheel.

In this example, we're going to use MassTransit. It's a battle tested OSS library in the .NET space with a great community, helpful maintainer, and thorough documentation.

We're going to go over a few different use cases to cover what the different exchanges would look like.

Use Case 1: Setting Up a Direct Exchange

For this example, lets say that we have a reporting application and we want to send out a report over either email or fax. To do this, we'll send a message to either the email queue or the fax queue depending on which routing key is sent along with the message. Once the message reaches the queue, an email or fax service can get the message from the queue and send the report.

Project Setup

I'm using a simple web api generated from Craftsman. I'm using a build in the middle of v0.10.0 so there isn't a great way to build it on demand at the moment, but you can jump tot he example repo below.

You can build something similar using this template in 0.9.4+ and should have even more flexibility with 0.10.x+ to do a lot of this for you.

DomainName: WeSendReportsCompany
BoundedContexts:
- SolutionName: Reporting
  Port: 1200
  DbContext:
   ContextName: ReportingDbContext 
   DatabaseName: ReportingDbContext 
   Provider: SqlServer
  Entities:
  - Name: ReportRequest
    Properties:
    - Name: ReportId
      IsPrimaryKey: true
      Type: guid
      CanFilter: true
      CanSort: true
    - Name: Provider
      Type: string
      CanFilter: true
      CanSort: true
    - Name: Target
      Type: string
      CanFilter: true
      CanSort: true
  SwaggerConfig:
    Title: Reporting Swagger Doc
    Description: Swagger doc for reporting
    SwaggerEndpointName: "v1"
    AddSwaggerComments: true

If you want to look at the repo directly, you can see:

Summary

So if you wanted to visualize what we're going to build, you'd see something like the below. And it probably isn't quite what you'd expect at first.

direct exchange

Let's see what this looks like.

  1. A user submits a report in their application, which makes an API call to our .NET Core backend. Our backend will create a message marked with a routing key of email and an exchange type of direct to be sent to our message broker.
  2. Our message gets sent to the reporting exchange called send-report. This reporting exchange has a direct binding to two other intermediary exchanges, one for email and one for fax (more on the intermediary exchange in a minute).
  3. When the reporting exchange receives the message, it will check the binding keys between each of the intermediary exchanges to see where it should route the message. In this case, the message has a routing key of email and will be routed to the exchange that is bound with a matching email key.
  4. The intermediary exchange is set as a fanout with a single binding to a queue with the same name; in this case, send-email. Once this exchange gets the message from the primary exchange, it will route that message to the email queue to be consumed by the email service.

Now you might be asking, what the heck is up with these intermediary exchanges? Up to this point, we've only ever discussed having one exchange that connects directly to a queue with some kind of binding. In this case, we've added the extra exchange to facilitate a pattern called wire tapping to allow us to more easily debug our process in production. This pattern is built into MassTransit so you don't have to do anything special, but you do need to be aware of how it's setting things up for you. For more info on the wire tap pattern and how it can be used, check out this video by Chris Patterson, the creator of MassTransit.

Creating the Message

This is the branch for message setup

First, lets add a class library to our solution (if you have a project built with Craftsman 0.10.x+, this project will get added with your bus automatically). I'd recommend creating a separate class library project to use as your source of truth for your messages. This is because MassTransit uses the full type name, including the namespace, for message contracts. When creating the same message type in two separate projects, the namespaces must match or the message will not be consumed. By adding the message to a single class library, it should make your life a bit easier, especially when working with distributed projects.

Okay, so what do these messages look like? Well, they're just simple plain old class objects (POCO), though it is strongly recommended to use interfaces for message contracts. In this example, I'm going to make an ISendReportRequest object that will capture the ReportId, the Provider that we want to use to send the message (email or fax), and the Target where we want to send the message. This would probably be a bit more involved (e.g. enum or something of that nature for Provider), but i'm going to leave it as a string for simplicity sake. Generally you want to keep your messages as small as you can, but the amount of messages going back and forth can actually affect performance more than the message size, so don't be afraid to put a bit more in your messages if it will reduce the amount of calls you are making.

public interface ISendReportRequest
{
    Guid ReportId { get; set; }
    string Provider { get; set; } // "email" or "fax"
    string Target { get; set; }
}

That's it! Let's publish our message.

Registering MassTransit in Our Producer

This is the branch for producer setup

Let's start by adding MassTransit to the project we're going to publish from. In this case, I'm going to add it to a .NET 5 web api, but this process would be the same for any .NET Core app.

To start, let's go to the csproj of our publishing project and add the following Nuget packages so we can use MassTransit:

<PackageReference Include="MassTransit" Version="7.1.8" />
<PackageReference Include="MassTransit.AspNetCore" Version="7.1.8" />
<PackageReference Include="MassTransit.Extensions.DependencyInjection" Version="7.1.8" />
<PackageReference Include="MassTransit.RabbitMQ" Version="7.1.8" />

Then, let's register MassTransit in our Startup.cs services and use RabbitMQ:

using MassTransit;
using Messages;
using RabbitMQ.Client;
///
services.AddMassTransit(mt =>
{
    mt.UsingRabbitMq((context, cfg) =>
    {
      	cfg.Host("localhost", "/", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });

        cfg.Message<ISendReportRequest>(e => e.SetEntityName("report-requests")); // name of the primary exchange
        cfg.Publish<ISendReportRequest>(e => e.ExchangeType = ExchangeType.Direct); // primary exchange type
	      cfg.Send<ISendReportRequest>(e =>
        {
            e.UseRoutingKeyFormatter(context => context.Message.Provider.ToString()); // route by provider (email or fax)
        });
    });
});
services.AddMassTransitHostedService();
  • AddMassTransit is the main service registration for MassTransit
  • UsingRabbitMq will let us configure our RabbitMQ connection
    • The cfg.Host portion is actually optional for local development if you are running RMQ locally using the MassTransit image, but you can add whatever server information is appropriate if you're using something like CloudAMQP or a non dev environment. Obviously if you are using this in production, please be mindful of how you are storing this information.
  • Message<ISendReportRequest>(e => e.SetEntityName("report-requests")) is telling MassTransit that we want to send messages of type ISendReportRequest to an exchange called report-requests.
  • Publish<ISendReportRequest>(e => e.ExchangeType = ExchangeType.Direct) is telling MassTransit that we want messages of type ISendReportRequest to be published using a direct exchange type.
  • Send<ISendReportRequest> is setting up the routing configuration we want to use when we sent a message of type ISendReportRequest. In this case, we are using UseRoutingKeyFormatter to tell MassTransit that we want to use the provider as the routing key (e.g. email or fax)
  • AddMassTransitHostedService will add the service bus and some additional bells and whistles like health check integration that we won't worry about for now.

Publishing Our Message

Next, we can publish our message! The exact location that you put this depends on the way you've organized your project, but the principle is the same. In this example, I have a MediatR RequestHandler (technically an in-memory message bus dealing with commands instead of events), but you could put it in a repository or other method if that's how your project is set up. Regardless of where you're adding it, it's pretty straight forward.

As our starting point, lets say that I have a SubmitReportRequestCommand command that adds an order request to the database:

public class Handler : IRequestHandler<SubmitReportRequestCommand, ReportRequestDto>
{
    private readonly ReportingDbContext _db;
    private readonly IMapper _mapper;

    public Handler(ReportingDbContext db, IMapper mapper)
    {
        _mapper = mapper;
        _db = db;
    }

    public async Task<ReportRequestDto> Handle(SubmitReportRequestCommand request, CancellationToken cancellationToken)
    {
        if (await _db.ReportRequests.AnyAsync(r => r.ReportId == request.RequestToAdd.ReportId))
        {
            throw new ConflictException("Reporting Request already exists with this primary key.");
        }

        var reportRequest = _mapper.Map<ReportRequest>(request.ReportRequestToAdd);
        _db.ReportRequests.Add(reportRequest);
        var saveSuccessful = await _db.SaveChangesAsync() > 0;

        if (saveSuccessful)
        {
            return await _db.ReportRequests
                .ProjectTo<ReportRequestDto>(_mapper.ConfigurationProvider)
                .FirstOrDefaultAsync(r => r.ReportId == reportRequest.ReportId);
        }
        else
        {
            throw new Exception("Unable to save the new record. Please check the logs for more information.");
        }
    }
}

First, inject an IPublishEndpoint to your command, repository, etc. This is what MassTransit will use to transport the message to the message broker.

private readonly ReportingDbContext _db;
private readonly IMapper _mapper;
private readonly IPublishEndpoint _publishEndpoint;

public Handler(ReportingDbContext db, IMapper mapper, IPublishEndpoint publishEndpoint)
{
    _mapper = mapper;
    _db = db;
    _publishEndpoint = publishEndpoint;
}

Then we can create our message and publish it using the publishing endpoint. Here, I am just using an anonymous object, but you could use automapper or whatever makes sense for you here as well.

var message = new
{
    ReportId = reportRequest.ReportRequestId,
    Provider = reportRequest.Provider.ToString(),
    Target = reportRequest.Target.ToString()
};
await _publishEndpoint.Publish<ISendReportRequest>(message);

So bringing that all together, you have something like this.

public class Handler : IRequestHandler<SubmitReportRequestCommand, ReportRequestDto>
{
    private readonly ReportingDbContext _db;
    private readonly IMapper _mapper;
    private readonly IPublishEndpoint _publishEndpoint;

    public Handler(ReportingDbContext db, IMapper mapper, IPublishEndpoint publishEndpoint)
    {
        _mapper = mapper;
        _db = db;
        _publishEndpoint = publishEndpoint;
    }

    public async Task<ReportRequestDto> Handle(SubmitReportRequestCommand request, CancellationToken cancellationToken)
    {
        if (await _db.ReportRequests.AnyAsync(r => r.ReportId == request.RequestToAdd.ReportId))
        {
            throw new ConflictException("Reporting Request already exists with this primary key.");
        }

        var reportRequest = _mapper.Map<ReportRequest>(request.ReportRequestToAdd);
        _db.ReportRequests.Add(reportRequest);
        var saveSuccessful = await _db.SaveChangesAsync() > 0;

        if (saveSuccessful)
        {
            var message = new
            {
              ReportId = reportRequest.ReportRequestId,
              Provider = reportRequest.Provider.ToString(),
              Target = reportRequest.Target.ToString()
            };
            await _publishEndpoint.Publish<ISendReportRequest>(message);

            return await _db.ReportRequests
                .ProjectTo<ReportRequestDto>(_mapper.ConfigurationProvider)
                .FirstOrDefaultAsync(r => r.ReportId == reportRequest.ReportId);
        }
        else
        {
            throw new Exception("Unable to save the new record. Please check the logs for more information.");
        }
    }
}

And that's it! If we were to run our API and call our endpoint that creates new order requests, we should see a new request in RabbitMQ!

Registering MassTransit in our Consumer

This is the branch for consumer setup

Now lets set up our consumer. This is going to be a worker service that doesn't actually implement any logic, but can consume the messages for us.

Just like before, let's start by going to the csproj of our worker project and add the following Nuget packages so we can use MassTransit:

<PackageReference Include="MassTransit" Version="7.1.8" />
<PackageReference Include="MassTransit.AspNetCore" Version="7.1.8" />
<PackageReference Include="MassTransit.Extensions.DependencyInjection" Version="7.1.8" />
<PackageReference Include="MassTransit.RabbitMQ" Version="7.1.8" />

Then, let's register MassTransit in our Program.cs services (no startup because we're using a worker, but could do that if you were in a web api and it would work the same) and use RabbitMQ:

using MassTransit;
using Messages;
using RabbitMQ.Client;
///
services.AddMassTransit(mt =>
{
    mt.UsingRabbitMq((context, cfg) =>
    {
      	cfg.Host("localhost", "/", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });

      	cfg.ReceiveEndpoint("email-reports", re =>
        {
            // turns off default fanout settings
            re.ConfigureConsumeTopology = false;

            // a replicated queue to provide high availability and data safety. available in RMQ 3.8+
            re.SetQuorumQueue();

            // enables a lazy queue for more stable cluster with better predictive performance.
            // Please note that you should disable lazy queues if you require really high performance, if the queues are always short, or if you have set a max-length policy.
            re.SetQueueArgument("declare", "lazy");

            re.Consumer<EmailConsumer>();
            re.Bind("report-requests", e =>
            {
                e.RoutingKey = "email";
                e.ExchangeType = ExchangeType.Direct;
            });
        });
      	cfg.ReceiveEndpoint("fax-reports", re =>
        {
            re.ConfigureConsumeTopology = false;
            re.Consumer<FaxConsumer>();
            re.Bind("report-requests", e =>
            {
                e.RoutingKey = "fax";
                e.ExchangeType = ExchangeType.Direct;
            });
        });
    });
});
services.AddMassTransitHostedService();
  • AddMassTransit, UsingRabbitMq, cfg.Host, and AddMassTransitHostedService all behave the same as in the previous service registration
  • cfg.ReceiveEndpoint("email-reports", re => is creating an intermediary fanout exchange named email-reports as well as a queue of the same name that will be bound to it.
  • re.ConfigureConsumeTopology = false; turns off the default fanout settings
  • re.SetQuorumQueue(); sets the queue to be a quorum queue, a best practice made available starting in RMQ 3.8. For more details see this writeup by CloudAMQP.
  • re.SetQueueArgument("declare", "lazy"); sets the queue to be lazy per CloudAMQP best practices. If you need really high performance, always have short queues, or have a max length policy you should remove this setting.
  • re.Consumer<EmailConsumer>(); subscribes the EmailConsumer to the email-reports queue.
  • re.Bind("report-requests", e => binds the report-requests exchange to the intermediary exchange.
  • e.RoutingKey = "email"; sets the binding between the primary and intermediary exchange to a particular routing key
  • e.ExchangeType = ExchangeType.Direct; sets the primary exchange type. In this case, direct so messages have to have a routing key that matches the binding key to be routed through.

Consuming the message

Finally, let's make the actual consumers! These are just going to be classes that inherit an IConsumer of whatever the message is, in our case IConsumer<ISendReportRequest>.

So the email consumer might look like this:

public class EmailConsumer : IConsumer<ISendReportRequest>
{
    public Task Consume(ConsumeContext<ISendReportRequest> context)
    {
        // do work to send email here

        return Task.CompletedTask;
    }
}

And the fax consumer might look like this:

public class FaxConsumer : IConsumer<ISendReportRequest>
{
    public Task Consume(ConsumeContext<ISendReportRequest> context)
    {
        // do work to send fax here

        return Task.CompletedTask;
    }
}

Now if you ran the solution with multiple startup projects (web api and worker), you should be able to create a new request and messages with a provider of email or fax to hit a breakpoint on either of the consumers!

Use Case 2: Setting Up a Topic Exchange

Next, let's see how we can set up a topic exchange. As a reminder, a topic exchange type is similar to the direct type, but lets us check for a partial match between the routing key and the binding key. You can use a * (asterisk) as a wildcard for a single word and a # (hash) can substitute for zero or more words.

Source Code

We'll be modifying our direct exchange project. Here is the branch with the topic exchange updates.

Summary

topic exchange

This example will be similar to the direct example above, only now, we will be routing our message based on whether it is a public or private message. Again, let's start by visualizing the process.

  1. A user submits a report in their application, which makes an API call to our .NET Core backend. Our backend will create a message marked with a routing key of private.email and an exchange type of topic to be sent to our message broker.
  2. Our message gets sent to the reporting exchange called send-report. This reporting exchange now has a topic binding to two other intermediary exchanges, one for private messages and one for public messages. This time, it doesn't matter what the provider is because we have a * in our binding; only the public or private designation will affect the routing of these messages.
  3. When the reporting exchange receives the message, it will check the binding keys between each of the intermediary exchanges to see where it should route the message. In this case, the message has a routing key of private.email and will be routed to the exchange that is bound with the partial match of private.*
  4. The intermediary exchange is set as a fanout with a single binding to a queue with the same name; in this case, private-reports. Once this exchange gets the message from the primary exchange, it will route that message to the email queue to be consumed by the email service.

Updating Our Message

First, I want to update our message (and associated DTO and entity) to include a new IsPublic property. This will determine whether or not a request is for public or private consumption and, in turn, how we should route the message.

public interface ISendReportRequest
{
    Guid ReportId { get; set; }
    string Provider { get; set; } // "email" or "fax" or "cloud"
    string Target { get; set; }
    public bool IsPublic { get; set; }
}

Setting Up Our Producer

Then we're going to make some slight tweaks to our service registration.

services.AddMassTransit(mt =>
{
    mt.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("localhost", "/", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });

        cfg.Message<ISendReportRequest>(e => e.SetEntityName("send-report")); // name of the primary exchange
        cfg.Publish<ISendReportRequest>(e => e.ExchangeType = ExchangeType.Topic); // primary exchange type
        cfg.Send<ISendReportRequest>(e =>
        {
            e.UseRoutingKeyFormatter(context =>
            {
                var sharedState = context.Message.IsPublic ? "public" : "private";
                return $"{sharedState}.{context.Message.Provider}";
            });
        });
    });
});
services.AddMassTransitHostedService();

Let's break this down. The big change here is that we're creating a chained routing key.

  • Just like before, cfg.Message<ISendReportRequest>(e => e.SetEntityName("send-report")); will create our primary exchange with a name of send-report
  • cfg.Publish<ISendReportRequest>(e => e.ExchangeType = ExchangeType.Topic); is setting our exchange type just like before, only now we're using a type of topic
  • e.UseRoutingKeyFormatter(context => is still here to set build our message's routing key, but now we're creating a sharedState variable to figure out if the message public or private, and then appending the provider (where we want to send the report) on the end like so $"{sharedState}.{context.Message.Provider}". The provider isn't coming in to play with the routing at all as our binding will have wildcard for the provider, but we can still pass the info along to be used however is appropriate.

Updating Our Consumer

Next, let's update our consumer to create modified endpoints for this new need. We're going to make one private endpoint (private-reports) and a public endpoint (public-reports).

services.AddMassTransit(mt =>
{
    mt.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("localhost", "/", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });

        cfg.ReceiveEndpoint("private-reports", re =>
        {
            // turns off default fanout settings
            re.ConfigureConsumeTopology = false;

            // a replicated queue to provide high availability and data safety. available in RMQ 3.8+
            re.SetQuorumQueue();

            // enables a lazy queue for more stable cluster with better predictive performance.
            // Please note that you should disable lazy queues if you require really high performance, if the queues are always short, or if you have set a max-length policy.
            re.SetQueueArgument("declare", "lazy");

            re.Consumer<EmailConsumer>();
            re.Consumer<FaxConsumer>();
            re.Bind("send-report", e =>
            {
                e.RoutingKey = "private.*";
                e.ExchangeType = ExchangeType.Topic;
            });
        });
        cfg.ReceiveEndpoint("public-reports", re =>
        {
            // turns off default fanout settings
            re.ConfigureConsumeTopology = false;

            // a replicated queue to provide high availability and data safety. available in RMQ 3.8+
            re.SetQuorumQueue();

            // enables a lazy queue for more stable cluster with better predictive performance.
            // Please note that you should disable lazy queues if you require really high performance, if the queues are always short, or if you have set a max-length policy.
            re.SetQueueArgument("declare", "lazy");

            re.Consumer<CloudConsumer>();
            re.Bind("send-report", e =>
            {
                e.RoutingKey = "public.*";
                e.ExchangeType = ExchangeType.Topic;
            });
        });
    });
});
services.AddMassTransitHostedService();

Let's break it down:

  • Like before, cfg.ReceiveEndpoint("private-reports", re => is creating our intermediary exchange and queue called private-reports.
  • re.ConfigureConsumeTopology = false; turns off the default fanout settings
  • re.SetQuorumQueue(); sets the queue to be a quorum queue, a best practice made available starting in RMQ 3.8. For more details see this writeup by CloudAMQP.
  • re.SetQueueArgument("declare", "lazy"); sets the queue to be lazy per CloudAMQP best practices. If you need really high performance, always have short queues, or have a max length policy. You should remove this setting.
  • Then we're using re.Consumer<EmailConsumer>(); and re.Consumer<FaxConsumer>(); to bind both the email and the fax consumers to this endpoint. So private messages will be sent to both email and fax. This doesn't really make sense in this case as we're only sending one target and provider so the other couldn't be used, but you get the point 🙂
  • Same as before re.Bind("send-report", e => will create a binding from our intermediary exchange to our primary exchange with a binding of private.*, so any incoming message that starts with private will be sent here.
  • And again, we set the primary exchange type to topic with e.ExchangeType = ExchangeType.Topic;

The cloud consumer that was added here is just to show another consumer hitting the private queue, but it looks the same as the other two:

public class CloudConsumer : IConsumer<ISendReportRequest>
{
    public Task Consume(ConsumeContext<ISendReportRequest> context)
    {
        // do work to send cloud here

        return Task.CompletedTask;
    }
}

RabbitMQ Best Practices

If you'd like to read on some of the best practices to follow around RabbitMQ, check out this post by CloudAMQP.

Using Craftsman to Add MassTransit Into Your Projects in Minutes

In the upcoming v0.10.x release of Craftsman, you will have several commands available to you like add:bus, add:message, register:consumer, and register:producer to scaffold out your events in minutes. This eventing functionality will continue to be improved in future releases to include scaffolded tests, logging, open telemetry tracing, and more.

Coming in the Next Post

In the next post of the series, we’ll look at how we can implement testing for our events.

Regardless, I hope this was helpful! I'd love to hear your thoughts and insights on Twitter @pdevito3.