Skip to content

Commit

Permalink
Delayed Requeuer example
Browse files Browse the repository at this point in the history
  • Loading branch information
m110 committed Oct 16, 2024
1 parent 1b0dcb3 commit 7fcfe2e
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
services:
server:
image: golang:1.23
restart: unless-stopped
#restart: unless-stopped
volumes:
- .:/app
- $GOPATH/pkg/mod:/go/pkg/mod
working_dir: /app
command: go run main.go
#command: go run main.go
command: echo

postgres:
image: postgres:15
Expand Down
41 changes: 6 additions & 35 deletions _examples/real-world-examples/delayed-poison-queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/ThreeDotsLabs/watermill/components/delay"
"github.com/ThreeDotsLabs/watermill/components/requeuer"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
)

func main() {
Expand All @@ -27,13 +26,6 @@ func main() {

logger := watermill.NewStdLogger(false, false)

poisonPublisher, err := sql.NewDelayedPostgreSQLPublisher(db, sql.DelayedPostgreSQLPublisherConfig{
Logger: logger,
})
if err != nil {
panic(err)
}

publisher, err := sql.NewPublisher(db, sql.PublisherConfig{
SchemaAdapter: sql.DefaultPostgreSQLSchema{},
AutoInitializeSchema: true,
Expand All @@ -42,9 +34,10 @@ func main() {
panic(err)
}

poisonSubscriber, err := sql.NewDelayedPostgreSQLSubscriber(db, sql.DelayedPostgreSQLSubscriberConfig{
DeleteOnAck: true,
Logger: logger,
delayedRequeuer, err := sql.NewPostgreSQLDelayedRequeuer(sql.DelayedRequeuerConfig{
DB: db,
Publisher: publisher,
Logger: logger,
})
if err != nil {
panic(err)
Expand All @@ -61,14 +54,8 @@ func main() {
panic(err)
}

poisonQueue, err := middleware.PoisonQueue(poisonPublisher, "poison")
if err != nil {
panic(err)
}

router := message.NewDefaultRouter(logger)
router.AddMiddleware(poisonQueue)
router.AddMiddleware(middleware.NewDelayOnError(middleware.DelayOnErrorConfig{}).Middleware)
router.AddMiddleware(delayedRequeuer.Middleware()...)

eventProcessor, err := cqrs.NewEventProcessorWithConfig(router, cqrs.EventProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) {
Expand Down Expand Up @@ -117,24 +104,8 @@ func main() {
panic(err)
}

requeuer, err := requeuer.NewRequeuer(requeuer.Config{
Subscriber: poisonSubscriber,
SubscribeTopic: "poison",
Publisher: publisher,
GeneratePublishTopic: func(params requeuer.GeneratePublishTopicParams) (string, error) {
topic := params.Message.Metadata.Get(middleware.PoisonedTopicKey)
if topic == "" {
return "", fmt.Errorf("missing topic in metadata")
}
return topic, nil
},
}, logger)
if err != nil {
panic(err)
}

go func() {
err = requeuer.Run(context.Background())
err = delayedRequeuer.Run(context.Background())
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion tools/pq/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

func main() {
config := cli.BackendConfig{
Topic: "poison",
Topic: "requeue",
RawTopic: "",
}

Expand Down

0 comments on commit 7fcfe2e

Please sign in to comment.