You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I was evaluating watermill today and I was surprised to learn that the Publish signature does not take a context.Context. Without it, how does one cancel a publish that may be stuck on a slow/failing network request?
Let's say I my app takes a http request with a timeout of 3 seconds. In handling this request I want to publish a message to a Redis stream using a Watermill publisher. Unfortunately the Redis instance has some issues and the network is blocked and it takes 4 seconds for Redis to answer. How can I make sure the publish get's canceled?
The issue is also apparent in the redisstream source code:
// Publish publishes message to redis stream//// Publish is blocking and waits for redis response.// When any of messages delivery fails - function is interrupted.func (p*Publisher) Publish(topicstring, msgs...*message.Message) error {
ifp.closed {
returnerrors.New("publisher closed")
}
logFields:=make(watermill.LogFields, 3)
logFields["topic"] =topicfor_, msg:=rangemsgs {
logFields["message_uuid"] =msg.UUIDp.logger.Trace("Sending message to redis stream", logFields)
values, err:=p.config.Marshaller.Marshal(topic, msg)
iferr!=nil {
returnerrors.Wrapf(err, "cannot marshal message %s", msg.UUID)
}
maxlen, ok:=p.config.Maxlens[topic]
if!ok {
maxlen=p.config.DefaultMaxlen
}
// HERE: The library needs to use "context.Background" which means this XADD can hang// forever on network issues.id, err:=p.client.XAdd(context.Background(), &redis.XAddArgs{
Stream: topic,
Values: values,
MaxLen: maxlen,
Approx: true,
}).Result()
iferr!=nil {
returnerrors.Wrapf(err, "cannot xadd message %s", msg.UUID)
}
logFields["xadd_id"] =idp.logger.Trace("Message sent to redis stream", logFields)
}
returnnil
}
The text was updated successfully, but these errors were encountered:
Right, thank you for the response. Too bad I didn't notice this, I remember looking at the NATS and Redis implementation to check how is done. Guess I picked the wrong ones. I think maybe the Kafka producer has the same issue, but it is more complicated since it uses the Sarama sync producer which has no context support also. (I think, not a user, but I found: IBM/sarama#1849).
I guess some of these libraries are old and pre-date the wide-spread use of context.Context? Not sure if it's actually feasible to do anything about this in the short term.
I was evaluating watermill today and I was surprised to learn that the Publish signature does not take a context.Context. Without it, how does one cancel a publish that may be stuck on a slow/failing network request?
Let's say I my app takes a http request with a timeout of 3 seconds. In handling this request I want to publish a message to a Redis stream using a Watermill publisher. Unfortunately the Redis instance has some issues and the network is blocked and it takes 4 seconds for Redis to answer. How can I make sure the publish get's canceled?
The issue is also apparent in the redisstream source code:
The text was updated successfully, but these errors were encountered: