Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

EnableSoftware/Enable.Extensions.Queuing

Repository files navigation

Enable.Extensions.Queuing

Build status

Messaging queue abstractions for building testable applications.

Queues provide asynchronous message communication. They allow applications to easily scale out workloads to multiple processors, improve the resiliency of applications to sudden peaks in workloads and naturally lead to loosely coupled components or applications.

Enable.Extensions.Queuing currently provides four queue implementations:

In addition to these packages, an additional Enable.Extensions.Queuing.Abstractions package is available. This contains the basic abstractions that the implementations listed above build upon. Use Enable.Extensions.Queuing.Abstractions to implement your own queue provider.

Package name NuGet version
Enable.Extensions.Queuing.Abstractions NuGet
Enable.Extensions.Queuing.InMemory NuGet
Enable.Extensions.Queuing.RabbitMQ NuGet
Enable.Extensions.Queuing.AzureServiceBus NuGet
Enable.Extensions.Queuing.AzureStorage NuGet

Queues vs. message buses

There are two types of messages, and therefore two types of messaging queues, that we may want to leverage, depending on the intent of the publisher of a message. The first type of messages are commands or jobs that must be processed. The second are events that notify of something that has already happened. In the former case, we would need an actor to action the command, and we would want a single actor to do this, potentially reporting back the outcome of the processing. In the latter case, we are simply notifying any listeners of something that has already happened, and we don't have any expectation of how that event is to be handled.

Enable.Extensions.Queuing provides the first of these types of messaging queues. Use any of the queue implementations provided to queue up work items, and have one out of any number of consumers process each individual message.

Examples

The following example demonstrates posting a message to a queue and then retrieving the same message. This uses a single application to both send and receive messages. In production systems you'll more likely have (multiple) different components publishing and consuming messages.

Here we're using the RabbitMQ queue provider. How we work with messages is the same across any of the available queue implementations. The only differences are in the options that are passed in when constructing the queue factory.

using System;
using System.Threading.Tasks;
using Enable.Extensions.Queuing.Abstractions;
using Enable.Extensions.Queuing.RabbitMQ;

namespace QueuingSamples
{
    public class Program
    {
        public static void Main() => MainAsync().GetAwaiter().GetResult();

        public static async Task MainAsync()
        {
            var options = new RabbitMQQueueClientFactoryOptions
            {
                HostName = "localhost",
                Port = 5672,
                VirtualHost = "/"
                UserName = "guest",
                Password = "guest",
            };

            // Using new RabbitMQQuorumQueueClientFactory(options) can be used to
            // create a quorum queue if the queue does not already exist. 
            var queueFactory = new RabbitMQQueueClientFactory(options);

            var queueName = "your-queue-name";

            // Get a reference to the queue that you want to work with.
            // The queue will be automatically created if it doesn't exist.
            // The dead letter queue name can optionally be passed into this method via QueueOptions.
            using (var queue = queueFactory.GetQueueReference(queueName))
            {
                // Add a new item to the queue.
                // Here we're using a `string`, but any type of message can
                // be used as long as it can be serialised to a byte array.
                await queue.EnqueueAsync("Hello, World!");

                // Retrieve our message from the queue.
                // We get this back as a type of `IQueueMessage` (note that
                // we could get `null` back here, if there are no messages
                // in the queue).
                var message = await queue.DequeueAsync();

                // The contents of the message that we sent are available
                // from `IQueueMessage.Body`. This property is our original
                // message, "Hello, World!", stored as a byte array. This
                // might not be that useful, so you can use the
                // `IQueueMessage.GetBody<T>` method to deserialise this back
                // to your original type.
                // In this case, we want to get our `string` back:
                var payload = message.GetBody<string>();

                // The following will print `Hello, World!`.
                Console.WriteLine(payload);

                // Now, do some clever work with the message.
                // Here's your chance to shine…

                // Finally, we acknowledge the message once we're done.
                // We can either "complete" the message, which means that
                // we've successfully processed it. This will remove the
                // message from our queue.
                await queue.CompleteAsync(message);

                // Or if something goes wrong and we can't process the
                // message, we can "abandon" it (but don't call both
                // `CompleteAsync` and `AbandonAsync`!).
                // await queue.AbandonAsync(message);
            }
        }
    }
}