-
Notifications
You must be signed in to change notification settings - Fork 60
Dispatch Queues
Zebus will process incoming messages by synchronously invoking the Handle
method on the handlers. The messages are processed sequentially, one after the other. This is implemented using a message queue and a dedicated processing thread.
The association of the message queue and the processing thread generates a dispatch queue.
By default there is only one global dispatch queue which is referred to as the default dispatch queue.
The advantage of sequencing the handling of all incoming messages is that we can more easily reason about the processing of the messages and we do not have to worry about thread safety in objects shared by multiple handlers that are serviced by the same dispatch queue. The downside is that one slow handler can prevent other handlers from processing their messages and can lead to queuing both in-process and in the persistence component.
It is possible to associate handlers with different dispatch queues by annotating the handler class with the [DispatchQueueName]
attribute and specifying a name for the dispatch queue. This will create a separate queue and thread to invoke the handle methods on the handler.
Example:
[DispatchQueueName("MyDispatchQueue")]
public class MyHandler : IMessageHandler<MyMessage> {}
It is also possible to specify the dispatch queue for an entire namespace by declaring a class that implements the IProvideDispatchQueueNameForCurrentNamespace
interface. The QueueName
supplied by the class will be used for all handlers declared in the same namespace, including nested namespaces.
Example:
public class MyDispatchQueue : IProvideDispatchQueueNameForCurrentNamespace
{
public string QueueName => "MyQueue";
}
Message handlers using the IAsyncMessageHandler<T>
interface also run in the context of a dispatch queue.
For example, the following handler will behave exactly like a synchronous handler:
public class SampleHandler : IAsyncMessageHandler<SampleEvent>
{
public Task Handle(SampleEvent message)
{
// This code runs on the dispatch queue thread
Console.WriteLine($"Hello {message.Id}");
return Task.CompletedTask;
}
}
Zebus uses a custom SynchronizationContext
that allows tasks continuations to be also executed in the dispatch queue thread:
public class SampleHandler : IAsyncMessageHandler<SampleEvent>
{
public async Task Handle(SampleEvent message)
{
// This code runs on the dispatch queue thread
Console.WriteLine("Before await");
await Task.Delay(100);
// This code runs on the dispatch queue thread
Console.WriteLine("After await");
}
}
Of course, it is possible to use task.ConfigureAwait(false)
to prevent the continuation from running on the dispatch thread.