You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
for msg := range h.messagesCh {
h.runningHandlersWgLock.Lock()
h.runningHandlersWg.Add(1)
h.runningHandlersWgLock.Unlock()
go h.handleMessage(msg, middlewareHandler)
}
if h.publisher != nil {
h.logger.Debug("Waiting for publisher to close", nil)
if err := h.publisher.Close(); err != nil {
h.logger.Error("Failed to close publisher", err, nil)
}
h.logger.Debug("Publisher closed", nil)
}
Whenever a message is received, a new goroutine is created to process it.
If the messagesCh is closed while the message is being processed, then we immediately proceed to closing the publisher, without waiting on the goroutine to return.
In my use case, the processing logic can take some time, and requires the publisher to be open to send the results back. In the event the router is shutdown, the publisher will be closed when the goroutine tries to publish the results.
It seems that the runningHandlersWg is waited on outside of this scope (the router uses it to wait on handlers). But it could also be waited on before closing the publisher in each handler.
for msg := range h.messagesCh {
h.runningHandlersWgLock.Lock()
h.runningHandlersWg.Add(1)
h.runningHandlersWgLock.Unlock()
go h.handleMessage(msg, middlewareHandler)
}
if h.publisher != nil {
h.logger.Debug("Waiting on message processing go routines", nil)
h.runningHandlersWg.Wait()
h.logger.Debug("Waiting for publisher to close", nil)
if err := h.publisher.Close(); err != nil {
h.logger.Error("Failed to close publisher", err, nil)
}
h.logger.Debug("Publisher closed", nil)
}
In a pub/sub setup, it makes sense to close the subscriber to avoid processing more messages. But the publisher should remain open until all the goroutines processing messages terminate. Or is there something I misunderstand? Or wrongly assuming?
EDIT: it is not correct to use the h.runningHandlersWg waitGroup, because it is shared between all the handlers. Instead, it would require a per-handler wait group, but the idea is the same.
The text was updated successfully, but these errors were encountered:
In
router.go
, line615
, we have thisWhenever a message is received, a new goroutine is created to process it.
If the
messagesCh
is closed while the message is being processed, then we immediately proceed to closing the publisher, without waiting on the goroutine to return.In my use case, the processing logic can take some time, and requires the publisher to be open to send the results back. In the event the router is shutdown, the publisher will be closed when the goroutine tries to publish the results.
It seems that the
runningHandlersWg
is waited on outside of this scope (the router uses it to wait onhandlers
). But it could also be waited on before closing the publisher in eachhandler
.In a pub/sub setup, it makes sense to close the subscriber to avoid processing more messages. But the publisher should remain open until all the goroutines processing messages terminate. Or is there something I misunderstand? Or wrongly assuming?
EDIT: it is not correct to use the
h.runningHandlersWg
waitGroup, because it is shared between all the handlers. Instead, it would require a per-handler wait group, but the idea is the same.The text was updated successfully, but these errors were encountered: