Skip to content

Commit

Permalink
use generic cqrs handlers in all examples (#519)
Browse files Browse the repository at this point in the history
  • Loading branch information
roblaszczak authored Nov 23, 2024
1 parent 15d765b commit cf7e2bb
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 142 deletions.
58 changes: 8 additions & 50 deletions _examples/basic/5-cqrs-protobuf/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,7 @@ type BookRoomHandler struct {
eventBus *cqrs.EventBus
}

func (b BookRoomHandler) HandlerName() string {
return "BookRoomHandler"
}

// NewCommand returns type of command which this handle should handle. It must be a pointer.
func (b BookRoomHandler) NewCommand() interface{} {
return &BookRoom{}
}

func (b BookRoomHandler) Handle(ctx context.Context, c interface{}) error {
// c is always the type returned by `NewCommand`, so casting is always safe
cmd := c.(*BookRoom)

func (b BookRoomHandler) Handle(ctx context.Context, cmd *BookRoom) error {
// some random price, in production you probably will calculate in wiser way
price := (rand.Int63n(40) + 1) * 10

Expand Down Expand Up @@ -70,18 +58,7 @@ type OrderBeerOnRoomBooked struct {
commandBus *cqrs.CommandBus
}

func (o OrderBeerOnRoomBooked) HandlerName() string {
// this name is passed to EventsSubscriberConstructor and used to generate queue name
return "OrderBeerOnRoomBooked"
}

func (OrderBeerOnRoomBooked) NewEvent() interface{} {
return &RoomBooked{}
}

func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, e interface{}) error {
event := e.(*RoomBooked)

func (o OrderBeerOnRoomBooked) Handle(ctx context.Context, event *RoomBooked) error {
orderBeerCmd := &OrderBeer{
RoomId: event.RoomId,
Count: rand.Int63n(10) + 1,
Expand All @@ -100,13 +77,7 @@ func (o OrderBeerHandler) HandlerName() string {
return "OrderBeerHandler"
}

func (o OrderBeerHandler) NewCommand() interface{} {
return &OrderBeer{}
}

func (o OrderBeerHandler) Handle(ctx context.Context, c interface{}) error {
cmd := c.(*OrderBeer)

func (o OrderBeerHandler) Handle(ctx context.Context, cmd *OrderBeer) error {
if rand.Int63n(10) == 0 {
// sometimes there is no beer left, command will be retried
return fmt.Errorf("no beer left for room %s, please try later", cmd.RoomId)
Expand Down Expand Up @@ -137,22 +108,11 @@ func NewBookingsFinancialReport() *BookingsFinancialReport {
return &BookingsFinancialReport{handledBookings: map[string]struct{}{}}
}

func (b BookingsFinancialReport) HandlerName() string {
// this name is passed to EventsSubscriberConstructor and used to generate queue name
return "BookingsFinancialReport"
}

func (BookingsFinancialReport) NewEvent() interface{} {
return &RoomBooked{}
}

func (b *BookingsFinancialReport) Handle(ctx context.Context, e interface{}) error {
func (b *BookingsFinancialReport) Handle(ctx context.Context, event *RoomBooked) error {
// Handle may be called concurrently, so it need to be thread safe.
b.lock.Lock()
defer b.lock.Unlock()

event := e.(*RoomBooked)

// When we are using Pub/Sub which doesn't provide exactly-once delivery semantics, we need to deduplicate messages.
// GoChannel Pub/Sub provides exactly-once delivery,
// but let's make this example ready for other Pub/Sub implementations.
Expand Down Expand Up @@ -323,19 +283,17 @@ func main() {
}

err = commandProcessor.AddHandlers(
BookRoomHandler{eventBus},
OrderBeerHandler{eventBus},
cqrs.NewCommandHandler("BookRoomHandler", BookRoomHandler{eventBus}.Handle),
cqrs.NewCommandHandler("OrderBeerHandler", OrderBeerHandler{eventBus}.Handle),
)
if err != nil {
panic(err)
}

err = eventProcessor.AddHandlersGroup(
"events",
OrderBeerOnRoomBooked{commandBus},

NewBookingsFinancialReport(),

cqrs.NewGroupEventHandler(OrderBeerOnRoomBooked{commandBus}.Handle),
cqrs.NewGroupEventHandler(NewBookingsFinancialReport().Handle),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *BeerOrdered) error {
logger.Info("Beer ordered", watermill.LogFields{
"room_id": event.RoomId,
Expand Down
56 changes: 8 additions & 48 deletions _examples/real-world-examples/consumer-groups/crm-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ func main() {
EventsPublisher: publisher,
EventHandlers: func(commandBus *cqrs.CommandBus, eventBus *cqrs.EventBus) []cqrs.EventHandler {
return []cqrs.EventHandler{
AddToCRM8Handler{},
AddToSupport8Handler{},
cqrs.NewEventHandler("AddToCRM-8", AddToCRM8Handler{}.Handle),
cqrs.NewEventHandler("AddToSupport-8", AddToSupport8Handler{}.Handle),
}
},
EventsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) {
Expand Down Expand Up @@ -120,8 +120,8 @@ func main() {
EventsPublisher: publisher,
EventHandlers: func(commandBus *cqrs.CommandBus, eventBus *cqrs.EventBus) []cqrs.EventHandler {
return []cqrs.EventHandler{
AddToCRM9Handler{},
AddToSupport9Handler{},
cqrs.NewEventHandler("AddToCRM-9", AddToCRM9Handler{}.Handle),
cqrs.NewEventHandler("AddToSupport-9", AddToSupport9Handler{}.Handle),
}
},
EventsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) {
Expand Down Expand Up @@ -152,71 +152,31 @@ func main() {

type AddToCRM8Handler struct{}

func (h AddToCRM8Handler) HandlerName() string {
return "AddToCRM-8"
}

func (h AddToCRM8Handler) NewEvent() interface{} {
return &common.UserSignedUp{}
}

func (h AddToCRM8Handler) Handle(ctx context.Context, event interface{}) error {
e := event.(*common.UserSignedUp)

func (h AddToCRM8Handler) Handle(ctx context.Context, e *common.UserSignedUp) error {
fmt.Println("Adding user", e.UserID, "to the CRM")

return nil
}

type AddToSupport8Handler struct{}

func (h AddToSupport8Handler) HandlerName() string {
return "AddToSupport-8"
}

func (h AddToSupport8Handler) NewEvent() interface{} {
return &common.UserSignedUp{}
}

func (h AddToSupport8Handler) Handle(ctx context.Context, event interface{}) error {
e := event.(*common.UserSignedUp)

func (h AddToSupport8Handler) Handle(ctx context.Context, e *common.UserSignedUp) error {
fmt.Println("Adding user", e.UserID, "to the support channel")

return nil
}

type AddToCRM9Handler struct{}

func (h AddToCRM9Handler) HandlerName() string {
return "AddToCRM-9"
}

func (h AddToCRM9Handler) NewEvent() interface{} {
return &common.UserSignedUp{}
}

func (h AddToCRM9Handler) Handle(ctx context.Context, event interface{}) error {
e := event.(*common.UserSignedUp)

func (h AddToCRM9Handler) Handle(ctx context.Context, e *common.UserSignedUp) error {
fmt.Println("Adding user", e.UserID, "to the CRM")

return nil
}

type AddToSupport9Handler struct{}

func (h AddToSupport9Handler) HandlerName() string {
return "AddToSupport-9"
}

func (h AddToSupport9Handler) NewEvent() interface{} {
return &common.UserSignedUp{}
}

func (h AddToSupport9Handler) Handle(ctx context.Context, event interface{}) error {
e := event.(*common.UserSignedUp)

func (h AddToSupport9Handler) Handle(ctx context.Context, e *common.UserSignedUp) error {
fmt.Println("Adding user", e.UserID, "to the support channel")

return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,8 @@ func main() {
EventsPublisher: publisher,
EventHandlers: func(commandBus *cqrs.CommandBus, eventBus *cqrs.EventBus) []cqrs.EventHandler {
return []cqrs.EventHandler{
AddToPromotionsList8Handler{},
AddToNewsList8Handler{},
cqrs.NewEventHandler("AddToPromotionsList-8", AddToPromotionsList8Handler{}.Handle),
cqrs.NewEventHandler("AddToNewsList-8", AddToNewsList8Handler{}.Handle),
}
},
EventsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) {
Expand Down Expand Up @@ -345,8 +345,8 @@ func main() {
EventsPublisher: publisher,
EventHandlers: func(commandBus *cqrs.CommandBus, eventBus *cqrs.EventBus) []cqrs.EventHandler {
return []cqrs.EventHandler{
AddToPromotionsList9Handler{},
AddToNewsList9Handler{},
cqrs.NewEventHandler("AddToPromotionsList-9", AddToPromotionsList9Handler{}.Handle),
cqrs.NewEventHandler("AddToNewsList-9", AddToNewsList9Handler{}.Handle),
}
},
EventsSubscriberConstructor: func(handlerName string) (message.Subscriber, error) {
Expand Down Expand Up @@ -381,13 +381,7 @@ func (h AddToPromotionsList8Handler) HandlerName() string {
return "AddToPromotionsList-8"
}

func (h AddToPromotionsList8Handler) NewEvent() interface{} {
return &common.UserSignedUp{}
}

func (h AddToPromotionsList8Handler) Handle(ctx context.Context, event interface{}) error {
e := event.(*common.UserSignedUp)

func (h AddToPromotionsList8Handler) Handle(ctx context.Context, e *common.UserSignedUp) error {
if !e.Consents.Marketing {
return nil
}
Expand All @@ -399,17 +393,7 @@ func (h AddToPromotionsList8Handler) Handle(ctx context.Context, event interface

type AddToNewsList8Handler struct{}

func (h AddToNewsList8Handler) HandlerName() string {
return "AddToNewsList-8"
}

func (h AddToNewsList8Handler) NewEvent() interface{} {
return &common.UserSignedUp{}
}

func (h AddToNewsList8Handler) Handle(ctx context.Context, event interface{}) error {
e := event.(*common.UserSignedUp)

func (h AddToNewsList8Handler) Handle(ctx context.Context, e *common.UserSignedUp) error {
if !e.Consents.News {
return nil
}
Expand All @@ -420,17 +404,7 @@ func (h AddToNewsList8Handler) Handle(ctx context.Context, event interface{}) er

type AddToPromotionsList9Handler struct{}

func (h AddToPromotionsList9Handler) HandlerName() string {
return "AddToPromotionsList-9"
}

func (h AddToPromotionsList9Handler) NewEvent() interface{} {
return &common.UserSignedUp{}
}

func (h AddToPromotionsList9Handler) Handle(ctx context.Context, event interface{}) error {
e := event.(*common.UserSignedUp)

func (h AddToPromotionsList9Handler) Handle(ctx context.Context, e *common.UserSignedUp) error {
if !e.Consents.Marketing {
return nil
}
Expand All @@ -442,17 +416,7 @@ func (h AddToPromotionsList9Handler) Handle(ctx context.Context, event interface

type AddToNewsList9Handler struct{}

func (h AddToNewsList9Handler) HandlerName() string {
return "AddToNewsList-9"
}

func (h AddToNewsList9Handler) NewEvent() interface{} {
return &common.UserSignedUp{}
}

func (h AddToNewsList9Handler) Handle(ctx context.Context, event interface{}) error {
e := event.(*common.UserSignedUp)

func (h AddToNewsList9Handler) Handle(ctx context.Context, e *common.UserSignedUp) error {
if !e.Consents.News {
return nil
}
Expand Down

0 comments on commit cf7e2bb

Please sign in to comment.