Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The publish signature does not allow for a context.Context to be passed #445

Open
advdv opened this issue May 29, 2024 · 2 comments
Open

Comments

@advdv
Copy link

advdv commented May 29, 2024

I was evaluating watermill today and I was surprised to learn that the Publish signature does not take a context.Context. Without it, how does one cancel a publish that may be stuck on a slow/failing network request?

Let's say I my app takes a http request with a timeout of 3 seconds. In handling this request I want to publish a message to a Redis stream using a Watermill publisher. Unfortunately the Redis instance has some issues and the network is blocked and it takes 4 seconds for Redis to answer. How can I make sure the publish get's canceled?

The issue is also apparent in the redisstream source code:

// Publish publishes message to redis stream
//
// Publish is blocking and waits for redis response.
// When any of messages delivery fails - function is interrupted.
func (p *Publisher) Publish(topic string, msgs ...*message.Message) error {
	if p.closed {
		return errors.New("publisher closed")
	}

	logFields := make(watermill.LogFields, 3)
	logFields["topic"] = topic

	for _, msg := range msgs {
		logFields["message_uuid"] = msg.UUID
		p.logger.Trace("Sending message to redis stream", logFields)

		values, err := p.config.Marshaller.Marshal(topic, msg)
		if err != nil {
			return errors.Wrapf(err, "cannot marshal message %s", msg.UUID)
		}

		maxlen, ok := p.config.Maxlens[topic]
		if !ok {
			maxlen = p.config.DefaultMaxlen
		}
                
                // HERE: The library needs to use "context.Background" which means this XADD can hang
                // forever on network issues.
		id, err := p.client.XAdd(context.Background(), &redis.XAddArgs{
			Stream: topic,
			Values: values,
			MaxLen: maxlen,
			Approx: true,
		}).Result()
		if err != nil {
			return errors.Wrapf(err, "cannot xadd message %s", msg.UUID)
		}

		logFields["xadd_id"] = id
		p.logger.Trace("Message sent to redis stream", logFields)
	}

	return nil
}
@m110
Copy link
Member

m110 commented Jun 28, 2024

Hey @advdv. The publish doesn't take context because it's attached to the message. See the message.SetContext() method.

You are right about redisstream, though. We probably should review all Pub/Subs. For example, watermill-amqp does it correctly: https://github.com/ThreeDotsLabs/watermill-amqp/blob/master/pkg/amqp/publisher.go#L191

@advdv
Copy link
Author

advdv commented Jun 28, 2024

Right, thank you for the response. Too bad I didn't notice this, I remember looking at the NATS and Redis implementation to check how is done. Guess I picked the wrong ones. I think maybe the Kafka producer has the same issue, but it is more complicated since it uses the Sarama sync producer which has no context support also. (I think, not a user, but I found: IBM/sarama#1849).

I guess some of these libraries are old and pre-date the wide-spread use of context.Context? Not sure if it's actually feasible to do anything about this in the short term.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants