The Ruminant Pattern

cow chewing cud

Photo by Wolfgang Hasselmann on Unsplash

This post is inspired by the Gang of Four's brilliant work Design Patterns. Also I need to thank Heath Turnage for suggesting the solution that I now call the Ruminant Pattern.

Intent

Provide a way to resume failed steps of a non-atomic sequence with minimal resources.

Motivation

Often times I find myself needing to execute a sequence of operations that cannot be made atomic, where the operations may fail for various reasons, where the failed operations can be re-attempted, and where successful operations should not be re-run. Consider for instance a series of operations like

{
    var orderRequest = GetNextOrderFromQueue();
    SendClientReceivedNotification(orderRequest);
    var order = CreateOrder(orderRequest);
    NotifyVendors(order);
    SendClientInProgressNotification(order);
}

Imagine a scenario where each one of those operations involved a separate API call, and that a transaction solution that rolls back past operations is impracticle or impossible. If the CreateOrder step threw an exception because a database was unavailable, you could theoretically recover from the error by re-sending the same message to the queue to start the whole process all over again. But you'd re-send the received notification to the client, and this might be less than ideal. What if the vendor API could not be notified? If you re-send the same message to the queue after the failing vendor API came back online, you'd create a new order and re-send the client a received notification. One way to solve this might be to create as many queues and consumers as there are operations:

Then if an operation failed at any point in the sequence, you could recover from the error by re-sending the message that failed to the queue where it failed. The code may look something like the following spread across several consumer assemblies:

{
    var orderRequest = GetNextOrderFromQueue();
    SendClientReceivedNotification(orderRequest);
    SendOrderRequestToCreateOrderQueue(orderRequest);
}

{
    var orderRequest = GetNextOrderFromCreateOrderQueue();
    var order = CreateOrder(orderRequest);
    SendOrderToNotifyVendorsQueue(order);
}

{
    var order = GetNextOrderFromNotifyVendorsQueue();
    NotifyVendor(order);
    SendOrderToNotifyClientOrderInProgressQueue(order);
}

{
    var order = GetNextOrderFromNotifyClientOrderInProgressQueue();
    SendClientInProgressNotification(order);
}

Now the number of queues and consumers is equal to the number of atomic operations. You can probably see how this looks a little resource intensive. But what if an order can have multiple vendors like this?

{
    var order = GetNextOrderFromNotifyVendorsQueue();
    foreach (var service in order.Services)
    NotifyVendor(service);

    SendOrderToNotifyClientOrderInProgressQueue(order);
}

If one vendor notification API temporarily fails and we attempt to recover by re-queueing the message, we re-notify all previously successful vendor APIs. Could we try to make as many queues as there are vendors? How would we coordinate when they all succeed? This seems impractical.

A better solution might be to create a data structure that can retain the state of the sequence of operations and pass it back to the same queue that started the sequence. Then we only have one queue and one queue-consumer for this sequence of operations, and any temporary failure can be re-attempted by re-sending the message associated with the last failure.

This idea of regurgitating a message back up to the queue from which it originated for the purposes of moving on to the a next stage of processing reminds me of a cow chewing its cud.

Applicability

Use the Ruminant pattern when

Structure

Participants

Producer

Sends a message to the queue.

Queue

Persists messages and releases them to a consumer in the order that they were added.

Consumer

Receives messages from a queue and performs a sequence of operations based on the content of the message.

Collaborations

Consequences

The Ruminant pattern has the following benefits and liabilities:

Implementation

Here are some useful techniques for implementing the Ruminant pattern.

Sample

We'll apply the Ruminant pattern to creating the order discussed earlier.

A consumer that accepts messages from a queue would be scaffolded to receive messages and deserialize them before handing them off to a message handler. The implementation may look something like this. For simplicity, the following example assumes dependencies are injected by a dependency injection framework.

class MessageHandler : IMessageHandler
{
    IHandlerFactory _factory;

    MessageHandler(IHandlerFactory factory)
    {
        _factory = factory;
    }

    void Handle(CreateOrderDto message)
    {
        var strategy = _factory.Get(message);
        strategy.Handle(message);
    }
}

class HandlerFactory : IHandlerFactory
{
    IServiceProvider _services;

    HandlerFactory(IServiceProvider services)
    {
        _services = services;
    }

    IMessageHandler Get(CreateOrderDto message)
    {
        if (message.Services.All(x => x.VendorNotified))
        {
            return _services.Get(NotifyClientOrderInProgressMessageHandler);
        }
        else if (message.OrderCreated)
        {
            return _services.Get(NotifyVendorMessageHandler);
        }
        else if (message.SentClientReceivedNotification)
        {
            return _services.Get(CreateOrderMessageHandler);
        }
        else
        {
            return _services.Get(SendClientReceivedNotificationMessageHandler);
        }
    }
}

class SendClientReceivedNotificationMessageHandler : IMessageHandler
{
    ISender _sender;
    IClientOrderReceivedNotifier _notifier;

    MessageHandler(ISender sender, IClientOrderReceivedNotifier notifier)
    {
        _sender = sender;
        _notifier = notifier;
    }

    void Handle(CreateOrderDto message)
    {
        _notifier.Notify(message.Client);
        message.SentClientReceivedNotification = true;
        _sender.Send(message);
    }
}

class CreateOrderMessageHandler : IMessageHandler
{
    ISender _sender;
    ICreateOrderService _service;

    MessageHandler(ISender sender, ICreateOrderService service)
    {
        _sender = sender;
        _service = service;
    }

    void Handle(CreateOrderDto message)
    {
        _service.Create(message.Order);
        message.OrderCreated = true;
        _sender.Send(message);
    }
}

class NotifyVendorMessageHandler : IMessageHandler
{
    ISender _sender;
    IVendorNotifier _notifier;

    MessageHandler(ISender sender, IVendorNotifier notifier)
    {
        _sender = sender;
        _notifier = notifier;
    }

    void Handle(CreateOrderDto message)
    {
        var service = message.Services.First(x => !x.VendorNotified)
        _notifier.Notify(service);
        service.VendorNotified = true;
        _sender.Send(message);
    }
}

class NotifyVendorMessageHandler : IMessageHandler
{
    ISender _sender;
    IVendorNotifier _notifier;

    MessageHandler(ISender sender, IVendorNotifier notifier)
    {
        _sender = sender;
        _notifier = notifier;
    }

    void Handle(CreateOrderDto message)
    {
        var service = message.Services.First(x => !x.VendorNotified)
        _notifier.Notify(service);
        service.VendorNotified = true;
        _sender.Send(message);
    }
}

class NotifyClientOrderInProgressMessageHandler : IMessageHandler
{
    IOrderInProgressNotifier _notifier;

    MessageHandler(IOrderInProgressNotifier notifier)
    {
        _notifier = notifier;
    }

    void Handle(CreateOrderDto message)
    {
        _notifier.Notify(message.Client);
    }
}