From 256edfb78315eb26cf18d1e6fd10e3a1d32f7904 Mon Sep 17 00:00:00 2001 From: Satont Date: Fri, 31 May 2024 09:16:57 +0300 Subject: [PATCH] refactor(eventsub): do not populate subscriptions in production (#743) * refactor(eventsub): do not populate subscriptions in production * kruto --- apps/eventsub/go.mod | 17 ++- apps/eventsub/go.sum | 23 ++-- apps/eventsub/internal/handler/handler.go | 18 +-- .../eventsub/internal/handler/notification.go | 87 +++++++++++++ apps/eventsub/internal/handler/revoke.go | 35 +++--- apps/eventsub/internal/manager/manager.go | 115 +----------------- apps/eventsub/internal/manager/populate.go | 56 +++++++++ 7 files changed, 189 insertions(+), 162 deletions(-) create mode 100644 apps/eventsub/internal/handler/notification.go create mode 100644 apps/eventsub/internal/manager/populate.go diff --git a/apps/eventsub/go.mod b/apps/eventsub/go.mod index 282cb6d68..5f2a0ccca 100644 --- a/apps/eventsub/go.mod +++ b/apps/eventsub/go.mod @@ -1,6 +1,8 @@ module github.com/satont/twir/apps/eventsub -go 1.21.5 +go 1.22.2 + +toolchain go1.22.3 replace ( github.com/satont/twir/libs/config => ../../libs/config @@ -17,28 +19,28 @@ require ( github.com/avast/retry-go/v4 v4.5.1 github.com/google/uuid v1.6.0 github.com/lib/pq v1.10.9 - github.com/nicklaw5/helix/v2 v2.25.3 + github.com/nicklaw5/helix/v2 v2.28.1 github.com/redis/go-redis/v9 v9.5.1 github.com/samber/lo v1.39.0 github.com/satont/twir/libs/config v0.0.0-20240126231400-72985ccc25a5 github.com/satont/twir/libs/gomodels v0.0.0-20240225024146-742838c78cea github.com/satont/twir/libs/logger v0.0.0-20240208100157-ecbe2d7afcfd - github.com/satont/twir/libs/sentry v0.0.0-20240208100157-ecbe2d7afcfd github.com/satont/twir/libs/twitch v0.0.0-20240126231400-72985ccc25a5 - github.com/satont/twir/libs/types v0.0.0-20240126231400-72985ccc25a5 + github.com/twirapp/twir/libs/baseapp v0.0.0-20240531050209-7cfae27458cc github.com/twirapp/twir/libs/bus-core v0.0.0-20240225024146-742838c78cea + github.com/twirapp/twir/libs/cache v0.0.0-20240531050209-7cfae27458cc github.com/twirapp/twir/libs/grpc v0.0.0-20240126231400-72985ccc25a5 github.com/twirapp/twir/libs/integrations v0.0.0-00010101000000-000000000000 github.com/twirapp/twir/libs/uptrace v0.0.0-00010101000000-000000000000 - github.com/twirapp/twitch-eventsub-framework v1.3.6 + github.com/twirapp/twitch-eventsub-framework v1.3.7 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 go.opentelemetry.io/otel v1.24.0 go.opentelemetry.io/otel/trace v1.24.0 + go.uber.org/atomic v1.11.0 go.uber.org/fx v1.21.0 go.uber.org/zap v1.27.0 golang.ngrok.com/ngrok v1.8.0 google.golang.org/protobuf v1.33.0 - gorm.io/driver/postgres v1.5.7 gorm.io/gorm v1.25.9 ) @@ -91,6 +93,8 @@ require ( github.com/samber/slog-multi v1.0.2 // indirect github.com/samber/slog-sentry/v2 v2.4.0 // indirect github.com/samber/slog-zerolog/v2 v2.2.0 // indirect + github.com/satont/twir/libs/sentry v0.0.0-20240208100157-ecbe2d7afcfd // indirect + github.com/satont/twir/libs/types v0.0.0-20240126231400-72985ccc25a5 // indirect github.com/satori/go.uuid v1.2.0 // indirect github.com/uptrace/uptrace-go v1.21.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.48.0 // indirect @@ -119,4 +123,5 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20240304212257-790db918fca8 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8 // indirect google.golang.org/grpc v1.62.0 // indirect + gorm.io/driver/postgres v1.5.7 // indirect ) diff --git a/apps/eventsub/go.sum b/apps/eventsub/go.sum index 3d79a01f8..9fde7a3ac 100644 --- a/apps/eventsub/go.sum +++ b/apps/eventsub/go.sum @@ -17,6 +17,7 @@ github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/cloudflare/circl v1.3.7 h1:qlCDlTPz2n9fu58M0Nh1J/JzcFpfgkFHHX3O35r5vcU= github.com/cloudflare/circl v1.3.7/go.mod h1:sRTcRWXGLrKw6yIGJ+l7amYJFfAXbZG0kBSc8r4zxgA= github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa h1:jQCWAUqqlij9Pgj2i/PB79y4KOPYVyFYdROxgaCwdTQ= +github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq8dk6e9PdstVsDgu9RuyIIJqAaF//0IM= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -24,6 +25,7 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= +github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/getsentry/sentry-go v0.26.0 h1:IX3++sF6/4B5JcevhdZfdKIHfyvMmAq/UnqcyT2H6mA= @@ -109,8 +111,8 @@ github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/nicklaw5/helix/v2 v2.25.3 h1:BSTFa1UguvryFb8biCyYgnVnshftU2zMGuHSLi84tsg= -github.com/nicklaw5/helix/v2 v2.25.3/go.mod h1:zZcKsyyBWDli34x3QleYsVMiiNGMXPAEU5NjsiZDtvY= +github.com/nicklaw5/helix/v2 v2.28.1 h1:bLVKMrZ0MiSgCLB3nsi7+OrhognsIusqvNL4XFoRG0A= +github.com/nicklaw5/helix/v2 v2.28.1/go.mod h1:zZcKsyyBWDli34x3QleYsVMiiNGMXPAEU5NjsiZDtvY= github.com/onsi/ginkgo/v2 v2.15.0 h1:79HwNRBAZHOEwrczrgSOPy+eFTTlIGELKy5as+ClttY= github.com/onsi/ginkgo/v2 v2.15.0/go.mod h1:HlxMHtYF57y6Dpf+mc5529KKmSq9h2FpCF+/ZkwUxKM= github.com/onsi/gomega v1.30.0 h1:hvMK7xYz4D3HapigLTeGdId/NcfQx1VHMJc60ew99+8= @@ -155,16 +157,15 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/twirapp/twitch-eventsub-framework v1.3.2 h1:lID4wdcg8rPu4vrBOE/yuEya55/nnHc9nBTCl1284T0= -github.com/twirapp/twitch-eventsub-framework v1.3.2/go.mod h1:G67t1kwDexqC38efhh97md/HcnEZFqPFs+B1HeZkBtc= -github.com/twirapp/twitch-eventsub-framework v1.3.3 h1:n/eYSPhtn8vLv32wr20o01+RHbSGqfU3umv8fbXwUY0= -github.com/twirapp/twitch-eventsub-framework v1.3.3/go.mod h1:G67t1kwDexqC38efhh97md/HcnEZFqPFs+B1HeZkBtc= -github.com/twirapp/twitch-eventsub-framework v1.3.4 h1:ZG+xR4EFruKuOxJTcbe1sXSrhuo0xYbXqPxaZsiH4Lo= -github.com/twirapp/twitch-eventsub-framework v1.3.4/go.mod h1:G67t1kwDexqC38efhh97md/HcnEZFqPFs+B1HeZkBtc= -github.com/twirapp/twitch-eventsub-framework v1.3.5 h1:p5eqGyi+azNovuYVE4XL9tGeSj5qIzUPUKP+MKj7qWI= -github.com/twirapp/twitch-eventsub-framework v1.3.5/go.mod h1:G67t1kwDexqC38efhh97md/HcnEZFqPFs+B1HeZkBtc= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/twirapp/twir/libs/baseapp v0.0.0-20240531050209-7cfae27458cc h1:6CK3BWdPW5BpqwnAQJff4Z4J3drBp0vWH5tAhofwJzA= +github.com/twirapp/twir/libs/baseapp v0.0.0-20240531050209-7cfae27458cc/go.mod h1:hKOpVrA7x23znTdaCO9sgjh5L/W7rhCy2bXDgtYhIzc= +github.com/twirapp/twir/libs/cache v0.0.0-20240531050209-7cfae27458cc h1:v1kvbaCu8vYsSRPhhmuQfsVRx96q72blI5TKaNLbrZY= +github.com/twirapp/twir/libs/cache v0.0.0-20240531050209-7cfae27458cc/go.mod h1:QN8X1SS0Fp5Hug0++0DDu6xFx7Lv2ixxHBMdW4OCKJk= github.com/twirapp/twitch-eventsub-framework v1.3.6 h1:8FJiVMxi20HsJZSuvZT+HXUpUt/o2tBCTcsf1BSQbV0= github.com/twirapp/twitch-eventsub-framework v1.3.6/go.mod h1:G67t1kwDexqC38efhh97md/HcnEZFqPFs+B1HeZkBtc= +github.com/twirapp/twitch-eventsub-framework v1.3.7 h1:HwpolVPgZRVCJCD+AzlLol2nRZId+lLLR41qXilDYKY= +github.com/twirapp/twitch-eventsub-framework v1.3.7/go.mod h1:G67t1kwDexqC38efhh97md/HcnEZFqPFs+B1HeZkBtc= github.com/uptrace/uptrace-go v1.21.0 h1:oJoUjhiVT7aiuoG6B3ClVHtJozLn3cK9hQt8U5dQO1M= github.com/uptrace/uptrace-go v1.21.0/go.mod h1:/aXAFGKOqeAFBqWa1xtzLnGX2xJm1GScqz9NJ0TJjLM= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.48.0 h1:P+/g8GpuJGYbOp2tAdKrIPUX9JO02q8Q0YNlHolpibA= @@ -193,6 +194,8 @@ go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/dig v1.17.1 h1:Tga8Lz8PcYNsWsyHMZ1Vm0OQOUaJNDyvPImgbAu9YSc= go.uber.org/dig v1.17.1/go.mod h1:Us0rSJiThwCv2GteUN0Q7OKvU7n5J4dxZ9JKUXozFdE= go.uber.org/fx v1.21.0 h1:qqD6k7PyFHONffW5speYx403ywanuASqU4Rqdpc22XY= diff --git a/apps/eventsub/internal/handler/handler.go b/apps/eventsub/internal/handler/handler.go index 5a913b25f..6c821e08f 100644 --- a/apps/eventsub/internal/handler/handler.go +++ b/apps/eventsub/internal/handler/handler.go @@ -3,7 +3,6 @@ package handler import ( "context" "errors" - "log/slog" "net" "net/http" @@ -11,7 +10,6 @@ import ( "github.com/satont/twir/apps/eventsub/internal/manager" "github.com/satont/twir/apps/eventsub/internal/tunnel" cfg "github.com/satont/twir/libs/config" - model "github.com/satont/twir/libs/gomodels" "github.com/satont/twir/libs/logger" bus_core "github.com/twirapp/twir/libs/bus-core" "github.com/twirapp/twir/libs/cache/7tv" @@ -22,7 +20,6 @@ import ( "github.com/twirapp/twir/libs/grpc/websockets" seventvintegration "github.com/twirapp/twir/libs/integrations/seventv" eventsub_framework "github.com/twirapp/twitch-eventsub-framework" - eventsub_bindings "github.com/twirapp/twitch-eventsub-framework/esb" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel/trace" "go.uber.org/fx" @@ -69,20 +66,6 @@ type Opts struct { func New(opts Opts) *Handler { handler := eventsub_framework.NewSubHandler(true, []byte(opts.Config.TwitchClientSecret)) - handler.OnNotification = func( - h *eventsub_bindings.ResponseHeaders, - notification *eventsub_bindings.EventNotification, - ) { - if err := opts.Gorm. - Model(&model.EventsubSubscription{}). - Where("id = ?", notification.Subscription.ID). - Update("status", notification.Subscription.Status). - Error; err != nil { - opts.Logger.Error("failed to update subscription", slog.Any("err", err)) - return - } - } - myHandler := &Handler{ manager: opts.Manager, logger: opts.Logger, @@ -98,6 +81,7 @@ func New(opts Opts) *Handler { seventvCache: seventv.New(opts.Redis), } + handler.OnNotification = myHandler.onNotification handler.HandleUserAuthorizationRevoke = myHandler.handleUserAuthorizationRevoke handler.OnRevocate = myHandler.handleSubRevocate diff --git a/apps/eventsub/internal/handler/notification.go b/apps/eventsub/internal/handler/notification.go new file mode 100644 index 000000000..67afb2ec3 --- /dev/null +++ b/apps/eventsub/internal/handler/notification.go @@ -0,0 +1,87 @@ +package handler + +import ( + "log/slog" + + "github.com/google/uuid" + model "github.com/satont/twir/libs/gomodels" + eventsub_bindings "github.com/twirapp/twitch-eventsub-framework/esb" + "gorm.io/gorm/clause" +) + +// order metters +// user_id should be the last +var conditionKeys = []string{ + "broadcaster_user_id", + "broadcaster_user_id", + "to_broadcaster_user_id", + "user_id", +} + +var knownTopicsEntitiesCache = map[string]model.EventsubTopic{} + +func (c *Handler) onNotification( + _ *eventsub_bindings.ResponseHeaders, + notification *eventsub_bindings.EventNotification, +) { + condition, ok := notification.Subscription.Condition.(map[string]any) + if !ok { + c.logger.Error( + "failed to cast condition", + slog.Any("condition", notification.Subscription.Condition), + ) + return + } + + var userId string + for _, key := range conditionKeys { + if val, ok := condition[key].(string); ok { + userId = val + break + } + } + + if userId == "" { + c.logger.Error("failed to find user_id") + return + } + + var topicId uuid.UUID + if cachedTopic, topicFound := knownTopicsEntitiesCache[notification.Subscription.Type]; topicFound { + topicId = cachedTopic.ID + } else { + topicEntity := model.EventsubTopic{} + if err := c.gorm. + Where("topic = ?", notification.Subscription.Type). + First(&topicEntity). + Error; err != nil { + c.logger.Error("failed to find topic", slog.Any("err", err)) + return + } + knownTopicsEntitiesCache[notification.Subscription.Type] = topicEntity + topicId = topicEntity.ID + } + + if topicId == uuid.Nil { + c.logger.Error("failed to find topic_id") + return + } + + if err := c.gorm.Clauses( + clause.OnConflict{ + Columns: []clause.Column{{Name: "topic_id"}, {Name: "user_id"}}, + DoUpdates: clause.Assignments(map[string]interface{}{"status": notification.Subscription.Status}), + }, + ).Create( + &model.EventsubSubscription{ + ID: uuid.New(), + UserID: userId, + TopicID: topicId, + Status: notification.Subscription.Status, + Version: notification.Subscription.Version, + CallbackUrl: notification.Subscription.Transport.Callback, + }, + ).Error; err != nil { + c.logger.Error("failed to create/update subscription", slog.Any("err", err)) + } +} diff --git a/apps/eventsub/internal/handler/revoke.go b/apps/eventsub/internal/handler/revoke.go index b1ac303a4..dc675d040 100644 --- a/apps/eventsub/internal/handler/revoke.go +++ b/apps/eventsub/internal/handler/revoke.go @@ -1,10 +1,12 @@ package handler import ( + "context" "database/sql" "log/slog" model "github.com/satont/twir/libs/gomodels" + eventsub_framework "github.com/twirapp/twitch-eventsub-framework" "github.com/twirapp/twitch-eventsub-framework/esb" ) @@ -53,28 +55,21 @@ func (c *Handler) handleUserAuthorizationRevoke( } func (c *Handler) handleSubRevocate( - h *esb.ResponseHeaders, + _ *esb.ResponseHeaders, revocation *esb.RevocationNotification, ) { - topic := &model.EventsubTopic{} - if err := c.gorm. - Where("topic = ?", revocation.Subscription.Type). - First(topic).Error; err != nil { - c.logger.Error("failed to get topic", slog.Any("err", err)) - return - } - - subscription := &model.EventsubSubscription{} - if err := c.gorm. - Where("topic_id = ?", topic.ID). - First(subscription).Error; err != nil { - c.logger.Error("failed to get subscription", slog.Any("err", err)) - return - } - - subscription.Status = revocation.Subscription.Status + c.logger.Info("handleSubRevocate", slog.Any("revocation", revocation)) - if err := c.gorm.Save(&subscription).Error; err != nil { - c.logger.Error("failed to delete subscription", slog.Any("err", err)) + if revocation.Subscription.Status == "notification_failures_exceeded" { + c.manager.SubscribeWithLimits( + context.Background(), + &eventsub_framework.SubRequest{ + Type: revocation.Subscription.Type, + Condition: revocation.Subscription.Condition, + Callback: revocation.Subscription.Transport.Callback, + Secret: c.config.TwitchClientSecret, + Version: revocation.Subscription.Version, + }, + ) } } diff --git a/apps/eventsub/internal/manager/manager.go b/apps/eventsub/internal/manager/manager.go index cc766dd04..98a426626 100644 --- a/apps/eventsub/internal/manager/manager.go +++ b/apps/eventsub/internal/manager/manager.go @@ -4,15 +4,9 @@ import ( "context" "errors" "log/slog" - "slices" "sync" - "github.com/go-redsync/redsync/v4" - redsyncredis "github.com/go-redsync/redsync/v4/redis/goredis/v9" - "github.com/google/uuid" "github.com/nicklaw5/helix/v2" - "github.com/redis/go-redis/v9" - "github.com/samber/lo" "github.com/satont/twir/apps/eventsub/internal/tunnel" cfg "github.com/satont/twir/libs/config" model "github.com/satont/twir/libs/gomodels" @@ -45,7 +39,6 @@ type Opts struct { TokensGrpc tokens.TokensClient Gorm *gorm.DB Tunnel *tunnel.AppTunnel - Redis *redis.Client } func NewManager(opts Opts) (*Manager, error) { @@ -60,9 +53,6 @@ func NewManager(opts Opts) (*Manager, error) { tunnel: opts.Tunnel, } - locker := redsync.New(redsyncredis.NewPool(opts.Redis)) - startDistributedLock := locker.NewMutex("eventsub:startDistributedLock") - opts.Lc.Append( fx.Hook{ OnStart: func(ctx context.Context) error { @@ -116,66 +106,21 @@ func NewManager(opts Opts) (*Manager, error) { } unsubWg.Wait() - } - - requestContext := context.Background() - var channels []model.Channels - err := manager.gorm.Where( - `"channels"."isEnabled" = ? AND "User"."is_banned" = ? AND "channels"."isTwitchBanned" = ?`, - true, - false, - false, - ).Joins("User").Find(&channels).Error - if err != nil { - panic(err) - } - - var topics []model.EventsubTopic - if err := opts.Gorm.WithContext(requestContext).Find(&topics).Error; err != nil { - panic(err) - } - - startDistributedLock.Lock() - - channelsWg := sync.WaitGroup{} - - for _, channel := range channels { - channelsWg.Add(1) - - channel := channel - - go func() { - defer channelsWg.Done() - err = manager.SubscribeToNeededEvents( - requestContext, - topics, - channel.ID, - channel.BotID, - ) - if err != nil { - opts.Logger.Error( - "failed to subscribe to needed events", - slog.Any("err", err), - ) - } - }() + manager.populateChannels() } manager.SubscribeWithLimits( - requestContext, + context.Background(), &eventsub_framework.SubRequest{ Type: "user.authorization.revoke", Condition: map[string]string{ - "client_id": opts.Config.TwitchClientId, + "client_id": manager.config.TwitchClientId, }, - Callback: opts.Tunnel.GetAddr(), - Secret: opts.Config.TwitchClientSecret, + Callback: manager.tunnel.GetAddr(), + Secret: manager.config.TwitchClientSecret, Version: "1", }, ) - - channelsWg.Wait() - startDistributedLock.Unlock() }() return nil @@ -186,44 +131,16 @@ func NewManager(opts Opts) (*Manager, error) { return manager, nil } -var statusesForSkip = []string{ - "enabled", - "webhook_callback_verification_pending", - "authorization_revoked", - "user_removed", - "version_removed", -} - func (c *Manager) SubscribeToNeededEvents( ctx context.Context, topics []model.EventsubTopic, broadcasterId, botId string, ) error { - var existedSubscriptions []model.EventsubSubscription - if err := c.gorm. - WithContext(ctx). - Where(&model.EventsubSubscription{UserID: broadcasterId}). - Find(&existedSubscriptions). - Error; err != nil { - return err - } - var wg sync.WaitGroup newSubsCount := atomic.NewInt64(0) for _, topic := range topics { - existedSubForTopic, subExists := lo.Find( - existedSubscriptions, - func(item model.EventsubSubscription) bool { - return item.TopicID == topic.ID - }, - ) - - if subExists && slices.Contains(statusesForSkip, existedSubForTopic.Status) { - continue - } - wg.Add(1) topic := topic @@ -240,7 +157,7 @@ func (c *Manager) SubscribeToNeededEvents( return } - status, err := c.SubscribeWithLimits( + _, err := c.SubscribeWithLimits( ctx, &eventsub_framework.SubRequest{ Type: topic.Topic, @@ -264,26 +181,6 @@ func (c *Manager) SubscribeToNeededEvents( return } - subStatus := "unknown" - subId := uuid.New() - if status != nil && len(status.Data) > 0 { - subStatus = status.Data[0].Status - subId = uuid.MustParse(status.Data[0].ID) - } - - if err := c.gorm.Create( - &model.EventsubSubscription{ - ID: subId, - TopicID: topic.ID, - UserID: broadcasterId, - Status: subStatus, - Version: topic.Version, - CallbackUrl: c.tunnel.GetAddr(), - }, - ).Error; err != nil { - c.logger.Error("failed to create subscription", slog.Any("err", err)) - } - newSubsCount.Inc() }() } diff --git a/apps/eventsub/internal/manager/populate.go b/apps/eventsub/internal/manager/populate.go new file mode 100644 index 000000000..dde88193b --- /dev/null +++ b/apps/eventsub/internal/manager/populate.go @@ -0,0 +1,56 @@ +package manager + +import ( + "context" + "log/slog" + "sync" + + model "github.com/satont/twir/libs/gomodels" +) + +func (c *Manager) populateChannels() error { + requestContext := context.Background() + var channels []model.Channels + err := c.gorm.Where( + `"channels"."isEnabled" = ? AND "User"."is_banned" = ? AND "channels"."isTwitchBanned" = ?`, + true, + false, + false, + ).Joins("User").Find(&channels).Error + if err != nil { + return err + } + + var topics []model.EventsubTopic + if err := c.gorm.WithContext(requestContext).Find(&topics).Error; err != nil { + return err + } + + channelsWg := sync.WaitGroup{} + + for _, channel := range channels { + channelsWg.Add(1) + + channel := channel + + go func() { + defer channelsWg.Done() + err := c.SubscribeToNeededEvents( + requestContext, + topics, + channel.ID, + channel.BotID, + ) + if err != nil { + c.logger.Error( + "failed to subscribe to needed events", + slog.Any("err", err), + ) + } + }() + } + + channelsWg.Wait() + + return nil +}