Building an Event Driven .NET Application: Integration Testing

Category

Distributed Systems

Published on
Authors

Series

Introduction

The purpose of this series is to go through and in-depth walkthrough of setting up an event driven architecture in a distributed .NET application.

In the last post we looked at how we can set up a message bus using MassTransit. Now, let's see how we can test them.

The source code for this example project is here on my github. I used the current development version of Craftsman to scaffold out this project, but scaffolding for all of this will be available in the upcoming release of v0.13

I'm going to use NUnit for this example, but you could definitely do this with XUnit as well.

It's worth noting that we'll be making integration tests in this post. These fit my app structure and give me a bit more robustness in testing closer to the full request flow that will be happening in production. If you want to write unit tests, check out the MassTransit docs which already cover this pretty well.

Setting Up Our In Memory Test Harness

The first thing to do is to set up our in memory test harness. We're going to use this in place of our message bus in our service registration from our last post to make our lives a bit easier. There is a downside to be aware of when using this method though.

To start, I'm going to establish my Test Fixture's OneTimeSetUp. This is what will run once before any of my tests. This is a bit consolidated than normal, but lets us focus on the test harness.

I'm also adding a OneTimeTearDown to clean up the harness after testing is done.

public class TestFixture
{
    private static IConfigurationRoot _configuration;
    private static IWebHostEnvironment _env;
    private static IServiceScopeFactory _scopeFactory;
    private static ServiceProvider _provider;
    private static InMemoryTestHarness _harness;

    [OneTimeSetUp]
    public async Task RunBeforeAnyTests()
    {
        var builder = new ConfigurationBuilder()
            .SetBasePath(Directory.GetCurrentDirectory())
            .AddInMemoryCollection(new Dictionary<string, string> { })
            .AddEnvironmentVariables();

        _configuration = builder.Build();
        _env = Mock.Of<IWebHostEnvironment>(e => e.EnvironmentName == LocalConfig.IntegrationTestingEnvName);

        var startup = new Startup(_configuration, _env);
        var services = new ServiceCollection();

        startup.ConfigureServices(services);

        services.AddMassTransitInMemoryTestHarness(cfg =>
        {
            // we'll add consumer's here soon
        });
        
        _provider = services.BuildServiceProvider();
        _scopeFactory = _provider.GetService<IServiceScopeFactory>();
        
        _harness = _provider.GetRequiredService<InMemoryTestHarness>();
        await _harness.Start();
    }

		[OneTimeTearDown]
    public async Task RunAfterAnyTests()
    {
        await _harness.Stop();
    }
}

A Noteworthy Downside to the In Memory Harness ⚠️

While the in memory harness is lightening our DX load, it is also bypassing our actual message bus configuration that we have in our service registration, lowering the value of these tests to a degree.

For additional robustness, you could spin up RMQ in docker and feed that connection config to your RMQ service registration that we added in the last post. I have a post on how to do this with databases (and the example repo has an updated version with Fluent Docker) and the process would be similar here. This would add some complexity and overhead, but give you more confidence in your workflow, which is the whole point of integration tests.

I'll probably play around with this to see if the juice is worth the squeeze and do a follow up post on my findings.

Helper Methods

I also want to extend our test fixture with some helper methods to make our lives a little easier. Alternatively, you could export the InMemoryTestHarness as public and use it in your tests directly.

I've added summaries above each with a description, but generally, these methods are taking in a message and confirming that some activity was done with it.

public class TestFixture
{
    private static IConfigurationRoot _configuration;
    private static IWebHostEnvironment _env;
    private static IServiceScopeFactory _scopeFactory;
    private static ServiceProvider _provider;
    private static InMemoryTestHarness _harness;

    [OneTimeSetUp]
    public async Task RunBeforeAnyTests()
    {
		// above
    }

		[OneTimeTearDown]
    public async Task RunAfterAnyTests()
    {
        await _harness.Stop();
    }

		/// <summary>
    /// Publishes a message to the bus, and waits for the specified response.
    /// </summary>
    /// <param name="message">The message that should be published.</param>
    /// <typeparam name="TMessage">The message that should be published.</typeparam>
    public static async Task PublishMessage<TMessage>(object message)
        where TMessage : class
    {
        await _harness.Bus.Publish<TMessage>(message);
    }
    
    /// <summary>
    /// Confirm that a message has been published for this harness.
    /// </summary>
    /// <typeparam name="TMessage">The message that should be published.</typeparam>
    /// <returns>A boolean of true if a message of the given type has been published.</returns>
    public static async Task<bool> IsPublished<TMessage>()
        where TMessage : class
    {
        return await _harness.Published.Any<TMessage>();
    }
    
    /// <summary>
    /// Confirm that a message has been consumed for this harness.
    /// </summary>
    /// <typeparam name="TMessage">The message that should be consumed.</typeparam>
    /// <returns>A boolean of true if a message of the given type has been consumed.</returns>
    public static async Task<bool> IsConsumed<TMessage>()
        where TMessage : class
    {
        return await _harness.Consumed.Any<TMessage>();
    }

    /// <summary>
    /// The desired consumer consumed the message.
    /// </summary>
    /// <typeparam name="TMessage">The message that should be consumed.</typeparam>
    /// <typeparam name="TConsumedBy">The consumer of the message.</typeparam>
    /// <returns>A boolean of true if a message of the given type has been consumed by the given consumer.</returns>
    public static async Task<bool> IsConsumed<TMessage, TConsumedBy>()
        where TMessage : class
        where TConsumedBy : class, IConsumer
    {
        var consumerHarness = _provider.GetRequiredService<IConsumerTestHarness<TConsumedBy>>();
        return await consumerHarness.Consumed.Any<TMessage>();
    }
    
    /// <summary>
    /// Confirm if there was a fault when publishing for this harness.
    /// </summary>
    /// <typeparam name="TMessage">The message that should be published.</typeparam>
    /// <returns>A boolean of true if there was a fault for a message of the given type when published.</returns>
    public static async Task<bool> IsFaultyPublished<TMessage>()
        where TMessage : class
    {
        return await _harness.Published.Any<Fault<TMessage>>();
    }
    
    /// <summary>
    /// Confirm if there was a fault when consuming for this harness.
    /// </summary>
    /// <typeparam name="TMessage">The message that should be consumed.</typeparam>
    /// <returns>A boolean of true if there was a fault for a message of the given type when consumed.</returns>
    public static async Task<bool> IsFaultyConsumed<TMessage>()
        where TMessage : class
    {
        return await _harness.Consumed.Any<Fault<TMessage>>();
    }
}

Testing Our Producers

The Producer

Let's say I have a MediatR command that creates a recipe and publishes a message using IPublishEndpoint that a recipe was added.

Since we're using IPublishEndpoint MassTransit's in memory test harness will automatically register that as a scoped service for us to use so we can jump right into a test.

public static class AddRecipe
{
    public class AddRecipeCommand : IRequest<RecipeDto>
    {
        public RecipeForCreationDto RecipeToAdd { get; set; }

        public AddRecipeCommand(RecipeForCreationDto recipeToAdd)
        {
            RecipeToAdd = recipeToAdd;
        }
    }

    public class Handler : IRequestHandler<AddRecipeCommand, RecipeDto>
    {
        private readonly RecipesDbContext _db;
        private readonly IMapper _mapper;
        private readonly IPublishEndpoint _publishEndpoint;

        public Handler(RecipesDbContext db, IMapper mapper, IPublishEndpoint publishEndpoint)
        {
            _mapper = mapper;
            _publishEndpoint = publishEndpoint;
            _db = db;
        }

        public async Task<RecipeDto> Handle(AddRecipeCommand request, CancellationToken cancellationToken)
        {
            var recipe = Recipe.Create(request.RecipeToAdd);
            _db.Recipes.Add(recipe);

            await _db.SaveChangesAsync(cancellationToken);

            await _publishEndpoint.Publish<IRecipeAdded>(new
            {
                RecipeId = recipe.Id
            });
            
            return await _db.Recipes
                .AsNoTracking()
                .ProjectTo<RecipeDto>(_mapper.ConfigurationProvider)
                .FirstOrDefaultAsync(r => r.Id == recipe.Id, cancellationToken);
        }
    }
}

The Producer Test

So normally, I might have a test like this for an add command:

public class AddRecipeCommandTests : TestBase
{
    [Test]
    public async Task can_add_new_recipe_to_db()
    {
        // Arrange
        var fakeRecipeOne = new FakeRecipeForCreationDto().Generate();

        // Act
        var command = new AddRecipe.AddRecipeCommand(fakeRecipeOne);
        var recipeReturned = await SendAsync(command);
        var recipeCreated = await ExecuteDbContextAsync(db => db.Recipes.SingleOrDefaultAsync());
        
        // Assert
        recipeReturned.Should().BeEquivalentTo(fakeRecipeOne, options =>
            options.ExcludingMissingMembers());
        recipeCreated.Should().BeEquivalentTo(fakeRecipeOne, options =>
            options.ExcludingMissingMembers());
    }
}

Now all I need to do is add our two helper methods.

public class AddRecipeCommandTests : TestBase
{
    [Test]
    public async Task can_add_new_recipe_to_db()
    {
        // Arrange
        var fakeRecipeOne = new FakeRecipeForCreationDto().Generate();

        // Act
        var command = new AddRecipe.AddRecipeCommand(fakeRecipeOne);
        var recipeReturned = await SendAsync(command);
        var recipeCreated = await ExecuteDbContextAsync(db => db.Recipes.SingleOrDefaultAsync());
        
        // Assert
        (await IsPublished<IRecipeAdded>()).Should().Be(true);
        (await IsFaultyPublished<IRecipeAdded>()).Should().Be(false);
        recipeReturned.Should().BeEquivalentTo(fakeRecipeOne, options =>
            options.ExcludingMissingMembers());
        recipeCreated.Should().BeEquivalentTo(fakeRecipeOne, options =>
            options.ExcludingMissingMembers());
    }
}

Testing Our Consumers

The Consumer

Let's say I have this consumer set up where IRecipeAdded is the message that we are consuming.

In this context, the content of the message and what happens in the consumption doesn't matter. Depending on what it is you can (and likely should) also be tested, but let's focus on the consumption testing.

public class AddToBook : IConsumer<IRecipeAdded>
{
    private readonly IMapper _mapper;
    private readonly RecipesDbContext _db;

    public AddToBook(RecipesDbContext db, IMapper mapper)
    {
        _mapper = mapper;
        _db = db;
    }

    public Task Consume(ConsumeContext<IRecipeAdded> context)
    {
        // do work here. could be a db call and/or 
				// publishing another message from context

        return Task.CompletedTask;
    }
}

Registering the Consumer in Our In Memory Harness

As mentioned earlier, since we are leveraging an in memory bus here instead of the normal service registration, we need to register our consumers in the in memory harness. There are two lines that we need to add:

services.AddMassTransitInMemoryTestHarness(cfg =>
{
    cfg.AddConsumer<AddToBook>();
    cfg.AddConsumerTestHarness<AddToBook>();
});

The names pretty much speak for themselves, but AddConsumer will actually add the consumer for configuration and AddConsumerTestHarness will actually make a test harness for the consumer in the harness container. You might have noticed a lookup for this in the IsConsumed<TMessage, TConsumedBy>() test above.

The Consumer Test

Now that we have our setup done, let's actually write our test. There's two things we want to check for:

public class AddToBookTests
{
    [Test]
    public async Task addtobook_can_consume_IRecipeAdded_message()
    {
        // Arrange
        var message = new Mock<IRecipeAdded>();

        // Act
        await PublishMessage<IRecipeAdded>(message);

        // Assert
        // did the endpoint consume the message
        (await IsConsumed<IRecipeAdded>()).Should().Be(true);
        
        // the desired consumer consumed the message
        (await IsConsumed<IRecipeAdded,AddToBook>()).Should().Be(true);
    }
}

Additionally, if our consumer was also publishing a message, we could add tests for that into the mix as well:

public class AddToBookTests
{
    [Test]
    public async Task addtobook_can_consume_IRecipeAdded_message()
    {
        // Arrange
        var message = new Mock<IRecipeAdded>();

        // Act
        await PublishMessage<IRecipeAdded>(message);

        // Assert
        (await IsConsumed<IRecipeAdded>()).Should().Be(true);
        (await IsConsumed<IRecipeAdded,AddToBook>()).Should().Be(true);
				
        (await IsPublished<IRecipeAdded>()).Should().Be(true);
        (await IsFaultyPublished<IRecipeAdded>()).Should().Be(false);
    }
}

Conclusion

Setting up integration tests for your Producers and Consumers in MassTransit is pretty straight forward once you get your boilerplate in place for the in memory harness, but I do want to investigate spinning up and actual RMQ message broker in docker so I can test against my actual bus configuration as well.

Regardless, I hope this was helpful! As always, I'd love to hear from you on twitter or discord if you have any questions, feedback, or general thoughts!