Skip to content

Commit

Permalink
feat(eventsub): add lock for start
Browse files Browse the repository at this point in the history
  • Loading branch information
Satont committed May 31, 2024
1 parent 3fbeeed commit 24d3faa
Showing 1 changed file with 44 additions and 5 deletions.
49 changes: 44 additions & 5 deletions apps/eventsub/internal/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@ import (
"context"
"errors"
"log/slog"
"slices"
"sync"

"github.com/go-redsync/redsync/v4"
redsyncredis "github.com/go-redsync/redsync/v4/redis/goredis/v9"
"github.com/google/uuid"
"github.com/nicklaw5/helix/v2"
"github.com/redis/go-redis/v9"
"github.com/samber/lo"
"github.com/satont/twir/apps/eventsub/internal/tunnel"
cfg "github.com/satont/twir/libs/config"
model "github.com/satont/twir/libs/gomodels"
Expand Down Expand Up @@ -40,6 +45,7 @@ type Opts struct {
TokensGrpc tokens.TokensClient
Gorm *gorm.DB
Tunnel *tunnel.AppTunnel
Redis *redis.Client
}

func NewManager(opts Opts) (*Manager, error) {
Expand All @@ -54,6 +60,9 @@ func NewManager(opts Opts) (*Manager, error) {
tunnel: opts.Tunnel,
}

locker := redsync.New(redsyncredis.NewPool(opts.Redis))
startDistributedLock := locker.NewMutex("eventsub:startDistrubitedLock")

opts.Lc.Append(
fx.Hook{
OnStart: func(ctx context.Context) error {
Expand Down Expand Up @@ -126,11 +135,29 @@ func NewManager(opts Opts) (*Manager, error) {
panic(err)
}

startDistributedLock.Lock()
defer startDistributedLock.Unlock()

channelsWg := sync.WaitGroup{}

for _, channel := range channels {
err = manager.SubscribeToNeededEvents(requestContext, topics, channel.ID, channel.BotID)
if err != nil {
continue
}
channelsWg.Add(1)

go func() {
defer channelsWg.Done()
err = manager.SubscribeToNeededEvents(
requestContext,
topics,
channel.ID,
channel.BotID,
)
if err != nil {
opts.Logger.Error(
"failed to subscribe to needed events",
slog.Any("err", err),
)
}
}()
}

manager.SubscribeWithLimits(
Expand All @@ -145,6 +172,8 @@ func NewManager(opts Opts) (*Manager, error) {
Version: "1",
},
)

channelsWg.Wait()
}()

return nil
Expand Down Expand Up @@ -211,7 +240,6 @@ func (c *Manager) SubscribeToNeededEvents(
if err := c.gorm.
WithContext(ctx).
Where(&model.EventsubSubscription{UserID: broadcasterId}).
Where("status NOT IN ?", statusesForSkip).
Find(&existedSubscriptions).
Error; err != nil {
return err
Expand All @@ -221,6 +249,17 @@ func (c *Manager) SubscribeToNeededEvents(
newSubsCount := atomic.NewInt64(0)

for _, topic := range topics {
existedSubForTopic, subExists := lo.Find(
existedSubscriptions,
func(item model.EventsubSubscription) bool {
return item.TopicID == topic.ID
},
)

if subExists && slices.Contains(statusesForSkip, existedSubForTopic.Status) {
continue
}

wg.Add(1)

topic := topic
Expand Down

0 comments on commit 24d3faa

Please sign in to comment.