Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add no subscribers fallback option to GoChannel Pub/Sub #418

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 38 additions & 9 deletions pubsub/gochannel/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ import (
"github.com/ThreeDotsLabs/watermill/message"
)

// NoSubscribersFallbackDefaultTopic is the default fallback topic messages without any subscribers
// will be sent to – it is used if the `EnableNoSubscribersFallback` option is enabled and no
// fallback topic is configured via the `NoSubscribersFallbackTopic` option.
const NoSubscribersFallbackDefaultTopic = "*"

// Config holds the GoChannel Pub/Sub's configuration options.
type Config struct {
// Output channel buffer size.
Expand All @@ -20,12 +25,21 @@ type Config struct {
// it will receive all previously produced messages.
//
// All messages are persisted to the memory (simple slice),
// so be aware that with large amount of messages you can go out of the memory.
// so be aware that with a large amount of messages you can run out of memory.
Persistent bool

// When true, Publish will block until subscriber Ack's the message.
// If there are no subscribers, Publish will not block (also when Persistent is true).
BlockPublishUntilSubscriberAck bool

// When true, messages sent to a topic without any subscribers will be sent to the
// subscribers of the fallback topic (configured via `NoSubscribersFallbackTopic` option).
EnableNoSubscribersFallback bool

// NoSubscribersFallbackTopic is the fallback topic messages without any subscribers will be sent to.
// This is used if the `EnableNoSubscribersFallback` configuration option is enabled.
// If it's not set then `*` is used by default.
NoSubscribersFallbackTopic string
}

// GoChannel is the simplest Pub/Sub implementation.
Expand All @@ -52,15 +66,19 @@ type GoChannel struct {
persistedMessagesLock sync.RWMutex
}

// NewGoChannel creates new GoChannel Pub/Sub.
// NewGoChannel creates a new GoChannel Pub/Sub.
//
// This GoChannel is not persistent.
// That means if you send a message to a topic to which no subscriber is subscribed, that message will be discarded.
// By default, GoChannel isn't persistent; that means messages sent to a topic
// without any subscribers will be discarded if the fallback option isn't enabled.
func NewGoChannel(config Config, logger watermill.LoggerAdapter) *GoChannel {
if logger == nil {
logger = watermill.NopLogger{}
}

if config.EnableNoSubscribersFallback && config.NoSubscribersFallbackTopic == "" {
config.NoSubscribersFallbackTopic = NoSubscribersFallbackDefaultTopic
}

return &GoChannel{
config: config,

Expand All @@ -77,9 +95,9 @@ func NewGoChannel(config Config, logger watermill.LoggerAdapter) *GoChannel {
}

// Publish in GoChannel is NOT blocking until all consumers consume.
// Messages will be send in background.
// Messages will be sent in the background.
//
// Messages may be persisted or not, depending of persistent attribute.
// Messages may be persisted or not, depending on whether the persistent option is enabled.
func (g *GoChannel) Publish(topic string, messages ...*message.Message) error {
if g.isClosed() {
return errors.New("Pub/Sub closed")
Expand Down Expand Up @@ -141,9 +159,14 @@ func (g *GoChannel) sendMessage(topic string, message *message.Message) (<-chan
logFields := watermill.LogFields{"message_uuid": message.UUID, "topic": topic}

if len(subscribers) == 0 {
close(ackedBySubscribers)
g.logger.Info("No subscribers to send message", logFields)
return ackedBySubscribers, nil
if !g.config.EnableNoSubscribersFallback {
return g.handleNoSubscribers(ackedBySubscribers, logFields)
}

g.logger.Debug("No subscribers to send the message to, trying the fallback subscribers", logFields)
if subscribers = g.topicSubscribers(g.config.NoSubscribersFallbackTopic); len(subscribers) == 0 {
return g.handleNoSubscribers(ackedBySubscribers, logFields)
}
}

go func(subscribers []*subscriber) {
Expand All @@ -166,6 +189,12 @@ func (g *GoChannel) sendMessage(topic string, message *message.Message) (<-chan
return ackedBySubscribers, nil
}

func (g *GoChannel) handleNoSubscribers(ackedBySubscribers chan struct{}, logFields watermill.LogFields) (<-chan struct{}, error) {
close(ackedBySubscribers)
g.logger.Info("No subscribers to send the message to", logFields)
return ackedBySubscribers, nil
}

// Subscribe returns channel to which all published messages are sent.
// Messages are not persisted. If there are no subscribers and message is produced it will be gone.
//
Expand Down
45 changes: 45 additions & 0 deletions pubsub/gochannel/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,51 @@ func TestPublishSubscribe_not_persistent(t *testing.T) {
assert.NoError(t, pubSub.Close())
}

func TestPublishSubscribe_enable_no_subscribers_fallback(t *testing.T) {
messagesCount := 100
pubSub := gochannel.NewGoChannel(
gochannel.Config{
OutputChannelBuffer: int64(messagesCount),
EnableNoSubscribersFallback: true,
},
watermill.NewStdLogger(true, true),
)
topicName := "test_topic_" + watermill.NewUUID()

msgs, err := pubSub.Subscribe(context.Background(), gochannel.NoSubscribersFallbackDefaultTopic)
require.NoError(t, err)

sendMessages := tests.PublishSimpleMessages(t, messagesCount, pubSub, topicName)
receivedMsgs, _ := subscriber.BulkRead(msgs, messagesCount, time.Second)

tests.AssertAllMessagesReceived(t, sendMessages, receivedMsgs)

assert.NoError(t, pubSub.Close())
}

func TestPublishSubscribe_enable_no_subscribers_fallback_with_custom_topic(t *testing.T) {
messagesCount := 100
pubSub := gochannel.NewGoChannel(
gochannel.Config{
OutputChannelBuffer: int64(messagesCount),
EnableNoSubscribersFallback: true,
NoSubscribersFallbackTopic: "custom_fallback_topic",
},
watermill.NewStdLogger(true, true),
)
topicName := "test_topic_" + watermill.NewUUID()

msgs, err := pubSub.Subscribe(context.Background(), "custom_fallback_topic")
require.NoError(t, err)

sendMessages := tests.PublishSimpleMessages(t, messagesCount, pubSub, topicName)
receivedMsgs, _ := subscriber.BulkRead(msgs, messagesCount, time.Second)

tests.AssertAllMessagesReceived(t, sendMessages, receivedMsgs)

assert.NoError(t, pubSub.Close())
}

func TestPublishSubscribe_block_until_ack(t *testing.T) {
pubSub := gochannel.NewGoChannel(
gochannel.Config{BlockPublishUntilSubscriberAck: true},
Expand Down