Skip to content

Commit

Permalink
refactor: add single subscribe function to eventsub
Browse files Browse the repository at this point in the history
  • Loading branch information
Satont committed May 31, 2024
1 parent 8a76048 commit 1349d40
Show file tree
Hide file tree
Showing 12 changed files with 188 additions and 58 deletions.
50 changes: 13 additions & 37 deletions apps/api-gql/internal/gql/resolvers/admin-actions.resolver.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 38 additions & 10 deletions apps/api-gql/internal/gql/resolvers/user.resolver.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions apps/api/internal/impl_protected/bot/bot.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ func (c *Bot) BotJoinPart(ctx context.Context, request *bots.BotJoinPartRequest)
}

if dbChannel.IsEnabled {
c.Bus.EventSub.Subscribe.Publish(
eventsub.EventsubSubscribeRequest{ChannelID: dashboardId},
c.Bus.EventSub.SubscribeToAllEvents.Publish(
eventsub.EventsubSubscribeToAllEventsRequest{ChannelID: dashboardId},
)
}

Expand Down
4 changes: 2 additions & 2 deletions apps/api/internal/impl_unprotected/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ func (c *Auth) AuthPostCode(ctx context.Context, request *auth.PostCodeRequest)
c.SessionManager.Put(ctx, "twitchUser", &twitchUser)
c.SessionManager.Put(ctx, "dashboardId", dbUser.ID)

if err := c.Bus.EventSub.Subscribe.Publish(
eventsub.EventsubSubscribeRequest{
if err := c.Bus.EventSub.SubscribeToAllEvents.Publish(
eventsub.EventsubSubscribeToAllEventsRequest{
ChannelID: dbUser.ID,
},
); err != nil {
Expand Down
38 changes: 35 additions & 3 deletions apps/eventsub/internal/bus-listener/bus-listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,24 @@ func New(opts Opts) (*BusListener, error) {
opts.Lc.Append(
fx.Hook{
OnStart: func(ctx context.Context) error {
return impl.bus.EventSub.Subscribe.SubscribeGroup("eventsub", impl.subscribeToEvents)
if err := impl.bus.EventSub.SubscribeToAllEvents.SubscribeGroup(
"eventsub",
impl.subscribeToAllEvents,
); err != nil {
return err
}

if err := impl.bus.EventSub.Subscribe.SubscribeGroup(
"eventsub",
impl.subscribe,
); err != nil {
return err
}

return nil
},
OnStop: func(ctx context.Context) error {
impl.bus.EventSub.SubscribeToAllEvents.Unsubscribe()
impl.bus.EventSub.Subscribe.Unsubscribe()
return nil
},
Expand All @@ -52,9 +67,9 @@ func New(opts Opts) (*BusListener, error) {
return impl, nil
}

func (c *BusListener) subscribeToEvents(
func (c *BusListener) subscribeToAllEvents(
ctx context.Context,
msg eventsub.EventsubSubscribeRequest,
msg eventsub.EventsubSubscribeToAllEventsRequest,
) struct{} {
channel := model.Channels{}
err := c.gorm.
Expand Down Expand Up @@ -85,3 +100,20 @@ func (c *BusListener) subscribeToEvents(

return struct{}{}
}

func (c *BusListener) subscribe(
ctx context.Context,
msg eventsub.EventsubSubscribeRequest,
) struct{} {
if err := c.eventSubClient.SubscribeToEvent(
ctx,
msg.ConditionType,
msg.Topic,
msg.Version,
msg.ChannelID,
); err != nil {
c.logger.Error("error subscribing to event", err)
}

return struct{}{}
}
2 changes: 1 addition & 1 deletion apps/eventsub/internal/manager/condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
model "github.com/satont/twir/libs/gomodels"
)

func getTypeCondition(
func GetTypeCondition(
t model.EventsubConditionType,
topic,
channelID,
Expand Down
47 changes: 46 additions & 1 deletion apps/eventsub/internal/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package manager
import (
"context"
"errors"
"fmt"
"log/slog"
"sync"

Expand Down Expand Up @@ -146,7 +147,7 @@ func (c *Manager) SubscribeToNeededEvents(
topic := topic
go func() {
defer wg.Done()
condition := getTypeCondition(topic.ConditionType, topic.Topic, broadcasterId, botId)
condition := GetTypeCondition(topic.ConditionType, topic.Topic, broadcasterId, botId)
if condition == nil {
c.logger.Error(
"failed to get condition",
Expand Down Expand Up @@ -198,3 +199,47 @@ func (c *Manager) SubscribeToNeededEvents(

return nil
}

func (c *Manager) SubscribeToEvent(
ctx context.Context,
conditionType,
topic,
version,
channelId string,
) error {
channel := model.Channels{}
err := c.gorm.
WithContext(ctx).
Where(
`"id" = ?`,
channelId,
).First(&channel).Error
if err != nil {
return err
}

convertedCondition := model.FindEventsubCondition(conditionType)
if conditionType == "" {
return errors.New("condition type not found")
}

condition := GetTypeCondition(convertedCondition, topic, channel.ID, channel.BotID)

fmt.Println(conditionType, topic, version, channelId)
if condition == nil {
return errors.New("condition not found")
}

_, err = c.SubscribeWithLimits(
ctx,
&eventsub_framework.SubRequest{
Type: topic,
Condition: condition,
Callback: c.tunnel.GetAddr(),
Secret: c.config.TwitchClientSecret,
Version: version,
},
)

return err
}
3 changes: 2 additions & 1 deletion libs/bus-core/bus-services.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ type evalBus struct {
}

type eventSubBus struct {
Subscribe Queue[eventsub.EventsubSubscribeRequest, struct{}]
SubscribeToAllEvents Queue[eventsub.EventsubSubscribeToAllEventsRequest, struct{}]
Subscribe Queue[eventsub.EventsubSubscribeRequest, struct{}]
}

type schedulerBus struct {
Expand Down
6 changes: 6 additions & 0 deletions libs/bus-core/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ func NewNatsBus(nc *nats.Conn) *Bus {
},

EventSub: &eventSubBus{
SubscribeToAllEvents: NewNatsQueue[eventsub.EventsubSubscribeToAllEventsRequest, struct{}](
nc,
eventsub.EventsubSubscribeSubject,
1*time.Minute,
nats.GOB_ENCODER,
),
Subscribe: NewNatsQueue[eventsub.EventsubSubscribeRequest, struct{}](
nc,
eventsub.EventsubSubscribeSubject,
Expand Down
9 changes: 8 additions & 1 deletion libs/bus-core/eventsub/eventsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ const (
EventsubSubscribeSubject = "eventsub.subscribe"
)

type EventsubSubscribeRequest struct {
type EventsubSubscribeToAllEventsRequest struct {
ChannelID string
}

type EventsubSubscribeRequest struct {
ChannelID string
Topic string
ConditionType string
Version string
}
Loading

0 comments on commit 1349d40

Please sign in to comment.