Skip to content

Dispatch Queues

Olivier Coanet edited this page Jan 29, 2021 · 1 revision

Default message dispatching

Zebus will process incoming messages by synchronously invoking the Handle method on the handlers. The messages are processed sequentially, one after the other. This is implemented using a message queue and a dedicated processing thread.

The association of the message queue and the processing thread generates a dispatch queue.

By default there is only one global dispatch queue which is referred to as the default dispatch queue.

The advantage of sequencing the handling of all incoming messages is that we can more easily reason about the processing of the messages and we do not have to worry about thread safety in objects shared by multiple handlers that are serviced by the same dispatch queue. The downside is that one slow handler can prevent other handlers from processing their messages and can lead to queuing both in-process and in the persistence component.

Custom dispatch queues

It is possible to associate handlers with different dispatch queues by annotating the handler class with the [DispatchQueueName] attribute and specifying a name for the dispatch queue. This will create a separate queue and thread to invoke the handle methods on the handler.

Example:

[DispatchQueueName("MyDispatchQueue")]
public class MyHandler : IMessageHandler<MyMessage> {}

It is also possible to specify the dispatch queue for an entire namespace by declaring a class that implements the IProvideDispatchQueueNameForCurrentNamespace interface. The QueueName supplied by the class will be used for all handlers declared in the same namespace, including nested namespaces.

Example:

public class MyDispatchQueue : IProvideDispatchQueueNameForCurrentNamespace
{
    public string QueueName => "MyQueue";
}

Asynchronous handlers

Message handlers using the IAsyncMessageHandler<T> interface also run in the context of a dispatch queue.

For example, the following handler will behave exactly like a synchronous handler:

public class SampleHandler : IAsyncMessageHandler<SampleEvent>
{
    public Task Handle(SampleEvent message)
    {
        // This code runs on the dispatch queue thread
        Console.WriteLine($"Hello {message.Id}");
        
        return Task.CompletedTask;
    }
}

Zebus uses a custom SynchronizationContext that allows tasks continuations to be also executed in the dispatch queue thread:

public class SampleHandler : IAsyncMessageHandler<SampleEvent>
{
    public async Task Handle(SampleEvent message)
    {
        // This code runs on the dispatch queue thread
        Console.WriteLine("Before await");

        await Task.Delay(100);
        
        // This code runs on the dispatch queue thread
        Console.WriteLine("After await");
    }
}

Of course, it is possible to use task.ConfigureAwait(false) to prevent the continuation from running on the dispatch thread.