Lightweight library that handles RabbitMQ auto-reconnect and publishing retry routine for you. The library is designed to save the developer from the headache when working with RabbitMQ.
rabbitroutine solves your RabbitMQ reconnection problems:
- Handles connection errors and channels errors separately.
- Takes into account the need to re-declare entities in RabbitMQ after reconnection.
- Notifies of errors and connection retry attempts.
- Supports FireAndForgetPublisher and EnsurePublisher, that can be wrapped with RetryPublisher.
- Supports pool of channels used for publishing.
- Provides channels pool size statistics.
go get github.com/furdarius/rabbitroutine
You need to implement Consumer and register
it with StartConsumer
or with StartMultipleConsumers.
When connection is established (at first time or after reconnect) Declare
method is called. It can be used to
declare required RabbitMQ entities (consumer example).
Usage example:
// Consumer declares your own RabbitMQ consumer implementing rabbitroutine.Consumer interface.
type Consumer struct {}
func (c *Consumer) Declare(ctx context.Context, ch *amqp.Channel) error {}
func (c *Consumer) Consume(ctx context.Context, ch *amqp.Channel) error {}
url := "amqp://guest:guest@127.0.0.1:5672/"
conn := rabbitroutine.NewConnector(rabbitroutine.Config{
// How long to wait between reconnect
Wait: 2 * time.Second,
})
ctx := context.Background()
go func() {
err := conn.Dial(ctx, url)
if err != nil {
log.Println(err)
}
}()
consumer := &Consumer{}
go func() {
err := conn.StartConsumer(ctx, consumer)
if err != nil {
log.Println(err)
}
}()
Full example demonstrates messages consuming
For publishing FireForgetPublisher and EnsurePublisher implemented. Both of them can be wrapped with RetryPublisher to repeat publishing on errors and mitigate short-term network problems.
Usage example:
ctx := context.Background()
url := "amqp://guest:guest@127.0.0.1:5672/"
conn := rabbitroutine.NewConnector(rabbitroutine.Config{
// How long wait between reconnect
Wait: 2 * time.Second,
})
pool := rabbitroutine.NewPool(conn)
ensurePub := rabbitroutine.NewEnsurePublisher(pool)
pub := rabbitroutine.NewRetryPublisher(
ensurePub,
rabbitroutine.PublishMaxAttemptsSetup(16),
rabbitroutine.PublishDelaySetup(rabbitroutine.LinearDelay(10*time.Millisecond)),
)
go conn.Dial(ctx, url)
err := pub.Publish(ctx, "myexch", "myqueue", amqp.Publishing{Body: []byte("message")})
if err != nil {
log.Println("publish error:", err)
}
Full example demonstrates messages publishing
Pull requests are very much welcomed. Create your pull request, make sure a test or example is included that covers your change and your commits represent coherent changes that include a reason for the change.
To run the integration tests, make sure you have RabbitMQ running on any host
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
Then export the environment variable AMQP_URL=amqp://host/
and run go test -tags integration
.
AMQP_URL=amqp://guest:guest@127.0.0.1:5672/ go test -v -race -cpu=1,2 -tags integration -timeout 5s
Use golangci-lint to check code with linters:
golangci-lint run ./...