diff --git a/.editorconfig b/.editorconfig index e04d0bf10..5be1a90ee 100644 --- a/.editorconfig +++ b/.editorconfig @@ -8,6 +8,6 @@ indent_style = tab indent_size = 2 max_line_length = 100 -[*.{yaml,yml}] +[*.{yaml,yml,json}] indent_style = space ident_size = 2 diff --git a/.env.example b/.env.example index 5e506d6f0..d665e0592 100644 --- a/.env.example +++ b/.env.example @@ -26,7 +26,7 @@ SEVENTV_TOKEN= CDN_PUBLIC_URL=http://localhost:8000 CDN_HOST=localhost:8000 -CDN_BUCKET= +CDN_BUCKET=twir CDN_REGION= CDN_ACCESS_TOKEN= CDN_SECRET_TOKEN= diff --git a/apps/api-gql/internal/gql/resolvers/admin-actions.resolver.go b/apps/api-gql/internal/gql/resolvers/admin-actions.resolver.go index 7803f401a..59b6e37a6 100644 --- a/apps/api-gql/internal/gql/resolvers/admin-actions.resolver.go +++ b/apps/api-gql/internal/gql/resolvers/admin-actions.resolver.go @@ -6,7 +6,14 @@ package resolvers import ( "context" + "errors" "fmt" + + "github.com/google/uuid" + model "github.com/satont/twir/libs/gomodels" + "github.com/twirapp/twir/apps/api-gql/internal/gql/gqlmodel" + "github.com/twirapp/twir/libs/bus-core/eventsub" + "gorm.io/gorm" ) // DropAllAuthSessions is the resolver for the dropAllAuthSessions field. @@ -27,3 +34,59 @@ func (r *mutationResolver) DropAllAuthSessions(ctx context.Context) (bool, error return true, nil } + +// EventsubSubscribe is the resolver for the eventsubSubscribe field. +func (r *mutationResolver) EventsubSubscribe(ctx context.Context, opts gqlmodel.EventsubSubscribeInput) (bool, error) { + existedSubscription := model.EventsubTopic{} + if err := r.gorm. + WithContext(ctx). + Where("topic = ?", opts.Type). + First(&existedSubscription). + Error; err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { + return false, nil + } + + if existedSubscription.Topic != "" { + return false, fmt.Errorf("subscription already exists") + } + + condition, err := r.eventSubGqlToCondition(opts.Condition) + if err != nil { + return false, fmt.Errorf("failed to convert condition: %w", err) + } + + newTopic := model.EventsubTopic{ + ID: uuid.New(), + Topic: opts.Type, + Version: opts.Version, + ConditionType: condition, + } + + if err := r.gorm. + WithContext(ctx). + Create(&newTopic). + Error; err != nil { + return false, fmt.Errorf("failed to create topic: %w", err) + } + + var channels []model.Channels + if err := r.gorm. + WithContext(ctx). + Select("id", `"isEnabled"`). + Where(`"isEnabled" = ?`, true). + Find(&channels).Error; err != nil { + return false, fmt.Errorf("failed to get channels: %w", err) + } + + for _, channel := range channels { + go func() { + r.twirBus.EventSub.Subscribe.Publish( + eventsub.EventsubSubscribeRequest{ + ChannelID: channel.ID, + }, + ) + }() + } + + return true, nil +} diff --git a/apps/api-gql/internal/gql/resolvers/admin-actions.resolver.service.go b/apps/api-gql/internal/gql/resolvers/admin-actions.resolver.service.go new file mode 100644 index 000000000..1975d1719 --- /dev/null +++ b/apps/api-gql/internal/gql/resolvers/admin-actions.resolver.service.go @@ -0,0 +1,25 @@ +package resolvers + +import ( + "fmt" + + model "github.com/satont/twir/libs/gomodels" + "github.com/twirapp/twir/apps/api-gql/internal/gql/gqlmodel" +) + +func (r *mutationResolver) eventSubGqlToCondition( + input gqlmodel.EventsubSubscribeConditionInput, +) (model.EventsubConditionType, error) { + switch input { + case gqlmodel.EventsubSubscribeConditionInputChannel: + return model.EventsubConditionTypeBroadcasterUserID, nil + case gqlmodel.EventsubSubscribeConditionInputUser: + return model.EventsubConditionTypeUserID, nil + case gqlmodel.EventsubSubscribeConditionInputChannelWithModeratorID: + return model.EventsubConditionTypeBroadcasterWithModeratorID, nil + case gqlmodel.EventsubSubscribeConditionInputChannelWithBotID: + return model.EventsubConditionTypeBroadcasterWithUserID, nil + default: + return "", fmt.Errorf("unknown input type") + } +} diff --git a/apps/api-gql/internal/gql/resolvers/notifications.resolver.go b/apps/api-gql/internal/gql/resolvers/notifications.resolver.go index 4bca7596f..a5307f0c5 100644 --- a/apps/api-gql/internal/gql/resolvers/notifications.resolver.go +++ b/apps/api-gql/internal/gql/resolvers/notifications.resolver.go @@ -19,10 +19,7 @@ import ( ) // TwitchProfile is the resolver for the twitchProfile field. -func (r *adminNotificationResolver) TwitchProfile( - ctx context.Context, - obj *gqlmodel.AdminNotification, -) (*gqlmodel.TwirUserTwitchInfo, error) { +func (r *adminNotificationResolver) TwitchProfile(ctx context.Context, obj *gqlmodel.AdminNotification) (*gqlmodel.TwirUserTwitchInfo, error) { if obj.UserID == nil { return nil, nil } @@ -31,11 +28,7 @@ func (r *adminNotificationResolver) TwitchProfile( } // NotificationsCreate is the resolver for the notificationsCreate field. -func (r *mutationResolver) NotificationsCreate( - ctx context.Context, - text string, - userID *string, -) (*gqlmodel.AdminNotification, error) { +func (r *mutationResolver) NotificationsCreate(ctx context.Context, text string, userID *string) (*gqlmodel.AdminNotification, error) { entity := model.Notifications{ ID: uuid.NewString(), CreatedAt: time.Now().UTC(), @@ -90,11 +83,7 @@ func (r *mutationResolver) NotificationsCreate( } // NotificationsUpdate is the resolver for the notificationsUpdate field. -func (r *mutationResolver) NotificationsUpdate( - ctx context.Context, - id string, - opts gqlmodel.NotificationUpdateOpts, -) (*gqlmodel.AdminNotification, error) { +func (r *mutationResolver) NotificationsUpdate(ctx context.Context, id string, opts gqlmodel.NotificationUpdateOpts) (*gqlmodel.AdminNotification, error) { entity := model.Notifications{} if err := r.gorm.WithContext(ctx).Where("id = ?", id).First(&entity).Error; err != nil { return nil, err @@ -145,10 +134,7 @@ func (r *mutationResolver) NotificationsDelete(ctx context.Context, id string) ( } // NotificationsByUser is the resolver for the notificationsByUser field. -func (r *queryResolver) NotificationsByUser(ctx context.Context) ( - []gqlmodel.UserNotification, - error, -) { +func (r *queryResolver) NotificationsByUser(ctx context.Context) ([]gqlmodel.UserNotification, error) { user, err := r.sessions.GetAuthenticatedUser(ctx) if err != nil { return nil, err @@ -176,10 +162,7 @@ func (r *queryResolver) NotificationsByUser(ctx context.Context) ( } // NotificationsByAdmin is the resolver for the notificationsByAdmin field. -func (r *queryResolver) NotificationsByAdmin( - ctx context.Context, - opts gqlmodel.AdminNotificationsParams, -) (*gqlmodel.AdminNotificationsResponse, error) { +func (r *queryResolver) NotificationsByAdmin(ctx context.Context, opts gqlmodel.AdminNotificationsParams) (*gqlmodel.AdminNotificationsResponse, error) { query := r.gorm.WithContext(ctx) if opts.Type.IsSet() { @@ -235,10 +218,7 @@ func (r *queryResolver) NotificationsByAdmin( } // NewNotification is the resolver for the newNotification field. -func (r *subscriptionResolver) NewNotification(ctx context.Context) ( - <-chan *gqlmodel.UserNotification, - error, -) { +func (r *subscriptionResolver) NewNotification(ctx context.Context) (<-chan *gqlmodel.UserNotification, error) { user, err := r.sessions.GetAuthenticatedUser(ctx) if err != nil { return nil, err diff --git a/apps/api-gql/schema/admin-actions.graphqls b/apps/api-gql/schema/admin-actions.graphqls index 5ede5a831..68e95fcd5 100644 --- a/apps/api-gql/schema/admin-actions.graphqls +++ b/apps/api-gql/schema/admin-actions.graphqls @@ -1,3 +1,17 @@ extend type Mutation { dropAllAuthSessions: Boolean! @isAuthenticated @isAdmin + eventsubSubscribe(opts: EventsubSubscribeInput!): Boolean! @isAuthenticated @isAdmin +} + +input EventsubSubscribeInput { + type: String! @validate(constraint: "max=50") + version: String! @validate(constraint: "max=50") + condition: EventsubSubscribeConditionInput! +} + +enum EventsubSubscribeConditionInput { + CHANNEL + USER + CHANNEL_WITH_MODERATOR_ID + CHANNEL_WITH_BOT_ID } diff --git a/apps/api/app/app.go b/apps/api/app/app.go index 11c5a23f4..7bd70c2e6 100644 --- a/apps/api/app/app.go +++ b/apps/api/app/app.go @@ -5,7 +5,6 @@ import ( "errors" "log/slog" "net/http" - "time" "github.com/alexedwards/scs/v2" "github.com/redis/go-redis/v9" @@ -21,7 +20,7 @@ import ( "github.com/satont/twir/apps/api/internal/webhooks" cfg "github.com/satont/twir/libs/config" "github.com/satont/twir/libs/logger" - internalSentry "github.com/satont/twir/libs/sentry" + "github.com/twirapp/twir/libs/baseapp" buscore "github.com/twirapp/twir/libs/bus-core" "github.com/twirapp/twir/libs/grpc/clients" "github.com/twirapp/twir/libs/grpc/discord" @@ -32,26 +31,11 @@ import ( "github.com/twirapp/twir/libs/grpc/websockets" "github.com/twirapp/twir/libs/uptrace" "go.uber.org/fx" - "gorm.io/driver/postgres" - "gorm.io/gorm" ) var App = fx.Options( + baseapp.CreateBaseApp("api"), fx.Provide( - func() cfg.Config { - config, err := cfg.New() - if err != nil { - panic(err) - } - return *config - }, - internalSentry.NewFx( - internalSentry.NewFxOpts{ - Service: "api", - }, - ), - logger.NewFx(logger.Opts{Service: "api"}), - uptrace.NewFx("api"), func(c cfg.Config) tokens.TokensClient { return clients.NewTokens(c.AppEnv) }, @@ -70,47 +54,9 @@ var App = fx.Options( func(c cfg.Config) discord.DiscordClient { return clients.NewDiscord(c.AppEnv) }, - func(config cfg.Config, lc fx.Lifecycle) (*redis.Client, error) { - redisOpts, err := redis.ParseURL(config.RedisUrl) - if err != nil { - return nil, err - } - client := redis.NewClient(redisOpts) - lc.Append( - fx.Hook{ - OnStop: func(ctx context.Context) error { - return client.Close() - }, - }, - ) - - return client, nil - }, func(r *redis.Client) *scs.SessionManager { return sessions.New(r) }, - func(config cfg.Config, lc fx.Lifecycle) (*gorm.DB, error) { - db, err := gorm.Open( - postgres.Open(config.DatabaseUrl), - ) - if err != nil { - return nil, err - } - d, _ := db.DB() - d.SetMaxIdleConns(1) - d.SetMaxOpenConns(10) - d.SetConnMaxLifetime(time.Hour) - - lc.Append( - fx.Hook{ - OnStop: func(_ context.Context) error { - return d.Close() - }, - }, - ) - - return db, nil - }, buscore.NewNatsBusFx("api"), interceptors.New, impl_protected.New, diff --git a/apps/discord/app/app.go b/apps/discord/app/app.go index 697a74d7d..089b6ecf5 100644 --- a/apps/discord/app/app.go +++ b/apps/discord/app/app.go @@ -2,48 +2,31 @@ package app import ( "github.com/satont/twir/apps/discord/internal/discord_go" - "github.com/satont/twir/apps/discord/internal/gorm" "github.com/satont/twir/apps/discord/internal/grpc" "github.com/satont/twir/apps/discord/internal/messages_updater" - "github.com/satont/twir/apps/discord/internal/redis" "github.com/satont/twir/apps/discord/internal/sended_messages_store" cfg "github.com/satont/twir/libs/config" - "github.com/satont/twir/libs/logger" - twirsentry "github.com/satont/twir/libs/sentry" + "github.com/twirapp/twir/libs/baseapp" buscore "github.com/twirapp/twir/libs/bus-core" "github.com/twirapp/twir/libs/grpc/clients" "github.com/twirapp/twir/libs/grpc/tokens" - "github.com/twirapp/twir/libs/uptrace" "go.uber.org/fx" ) var App = fx.Module( "discord", + baseapp.CreateBaseApp("discord"), fx.Provide( - cfg.NewFx, - gorm.New, - redis.New, buscore.NewNatsBusFx("discord"), sended_messages_store.New, messages_updater.New, discord_go.New, - twirsentry.NewFx(twirsentry.NewFxOpts{Service: "discord"}), - uptrace.NewFx("discord"), - logger.NewFx( - logger.Opts{ - Service: "discord", - }, - ), func(config cfg.Config) tokens.TokensClient { return clients.NewTokens(config.AppEnv) }, grpc.New, ), fx.Invoke( - uptrace.NewFx("discord"), - redis.New, - gorm.New, - // discord_go.New, messages_updater.New, grpc.New, ), diff --git a/apps/discord/internal/gorm/gorm.go b/apps/discord/internal/gorm/gorm.go deleted file mode 100644 index 4a6097cac..000000000 --- a/apps/discord/internal/gorm/gorm.go +++ /dev/null @@ -1,41 +0,0 @@ -package gorm - -import ( - "context" - "time" - - cfg "github.com/satont/twir/libs/config" - "go.uber.org/fx" - "gorm.io/driver/postgres" - "gorm.io/gorm" -) - -type Opts struct { - fx.In - - Config cfg.Config - LC fx.Lifecycle -} - -func New(opts Opts) (*gorm.DB, error) { - db, err := gorm.Open( - postgres.Open(opts.Config.DatabaseUrl), - ) - if err != nil { - return nil, err - } - d, _ := db.DB() - d.SetMaxIdleConns(1) - d.SetMaxOpenConns(10) - d.SetConnMaxLifetime(time.Hour) - - opts.LC.Append( - fx.Hook{ - OnStop: func(ctx context.Context) error { - return d.Close() - }, - }, - ) - - return db, nil -} diff --git a/apps/discord/internal/redis/redis.go b/apps/discord/internal/redis/redis.go deleted file mode 100644 index 5328f9161..000000000 --- a/apps/discord/internal/redis/redis.go +++ /dev/null @@ -1,31 +0,0 @@ -package redis - -import ( - "context" - - "github.com/redis/go-redis/v9" - cfg "github.com/satont/twir/libs/config" - "go.uber.org/fx" -) - -func New(config cfg.Config, lc fx.Lifecycle) (*redis.Client, error) { - opts, err := redis.ParseURL(config.RedisUrl) - if err != nil { - return nil, err - } - - client := redis.NewClient(opts) - - lc.Append( - fx.Hook{ - OnStart: func(ctx context.Context) error { - return client.Ping(ctx).Err() - }, - OnStop: func(ctx context.Context) error { - return client.Close() - }, - }, - ) - - return client, nil -} diff --git a/apps/emotes-cacher/app/app.go b/apps/emotes-cacher/app/app.go index f6109de02..307b2e35d 100644 --- a/apps/emotes-cacher/app/app.go +++ b/apps/emotes-cacher/app/app.go @@ -1,50 +1,21 @@ package app import ( - "time" - - "github.com/redis/go-redis/v9" bus_listener "github.com/satont/twir/apps/emotes-cacher/internal/bus-listener" - cfg "github.com/satont/twir/libs/config" "github.com/satont/twir/libs/logger" - twirsentry "github.com/satont/twir/libs/sentry" + "github.com/twirapp/twir/libs/baseapp" buscore "github.com/twirapp/twir/libs/bus-core" "github.com/twirapp/twir/libs/uptrace" "go.uber.org/fx" - "gorm.io/driver/postgres" - "gorm.io/gorm" ) const service = "emotes-cacher" var App = fx.Module( service, + baseapp.CreateBaseApp(service), fx.Provide( - cfg.NewFx, - twirsentry.NewFx(twirsentry.NewFxOpts{Service: service}), - logger.NewFx(logger.Opts{Service: service}), - uptrace.NewFx("emotes-cacher"), buscore.NewNatsBusFx("emotes-cacher"), - func(cfg cfg.Config) (*gorm.DB, error) { - db, err := gorm.Open(postgres.Open(cfg.DatabaseUrl)) - if err != nil { - return nil, err - } - d, _ := db.DB() - d.SetMaxIdleConns(1) - d.SetMaxOpenConns(10) - d.SetConnMaxLifetime(time.Hour) - - return db, nil - }, - func(cfg cfg.Config) (*redis.Client, error) { - redisUrl, err := redis.ParseURL(cfg.RedisUrl) - if err != nil { - return nil, err - } - - return redis.NewClient(redisUrl), nil - }, ), fx.Invoke( uptrace.NewFx("emotes-cacher"), diff --git a/apps/events/app/app.go b/apps/events/app/app.go index 50454edc8..cc725c011 100644 --- a/apps/events/app/app.go +++ b/apps/events/app/app.go @@ -1,19 +1,15 @@ package app import ( - "log/slog" - eventsActivity "github.com/satont/twir/apps/events/internal/activities/events" "github.com/satont/twir/apps/events/internal/chat_alerts" - "github.com/satont/twir/apps/events/internal/gorm" "github.com/satont/twir/apps/events/internal/grpc_impl" "github.com/satont/twir/apps/events/internal/hydrator" - "github.com/satont/twir/apps/events/internal/redis" "github.com/satont/twir/apps/events/internal/workers" "github.com/satont/twir/apps/events/internal/workflows" cfg "github.com/satont/twir/libs/config" "github.com/satont/twir/libs/logger" - twirsentry "github.com/satont/twir/libs/sentry" + "github.com/twirapp/twir/libs/baseapp" buscore "github.com/twirapp/twir/libs/bus-core" "github.com/twirapp/twir/libs/grpc/clients" "github.com/twirapp/twir/libs/grpc/tokens" @@ -24,24 +20,14 @@ import ( var App = fx.Module( "events", + baseapp.CreateBaseApp("events"), fx.Provide( - cfg.NewFx, - twirsentry.NewFx(twirsentry.NewFxOpts{Service: "events"}), - logger.NewFx( - logger.Opts{ - Service: "events", - Level: slog.LevelDebug, - }, - ), - uptrace.NewFx("events"), func(config cfg.Config) tokens.TokensClient { return clients.NewTokens(config.AppEnv) }, func(config cfg.Config) websockets.WebsocketClient { return clients.NewWebsocket(config.AppEnv) }, - gorm.New, - redis.New, hydrator.New, eventsActivity.New, workflows.NewEventsWorkflow, diff --git a/apps/events/internal/gorm/gorm.go b/apps/events/internal/gorm/gorm.go deleted file mode 100644 index 5d2683eec..000000000 --- a/apps/events/internal/gorm/gorm.go +++ /dev/null @@ -1,22 +0,0 @@ -package gorm - -import ( - "time" - - config "github.com/satont/twir/libs/config" - "gorm.io/driver/postgres" - "gorm.io/gorm" -) - -func New(cfg config.Config) (*gorm.DB, error) { - db, err := gorm.Open(postgres.Open(cfg.DatabaseUrl)) - if err != nil { - return nil, err - } - d, _ := db.DB() - d.SetMaxIdleConns(1) - d.SetMaxOpenConns(10) - d.SetConnMaxLifetime(time.Hour) - - return db, nil -} diff --git a/apps/events/internal/redis/redis.go b/apps/events/internal/redis/redis.go deleted file mode 100644 index fd1dd0859..000000000 --- a/apps/events/internal/redis/redis.go +++ /dev/null @@ -1,17 +0,0 @@ -package redis - -import ( - "github.com/redis/go-redis/v9" - config "github.com/satont/twir/libs/config" -) - -func New(cfg config.Config) (*redis.Client, error) { - params, err := redis.ParseURL(cfg.RedisUrl) - if err != nil { - return nil, err - } - - rdb := redis.NewClient(params) - - return rdb, nil -} diff --git a/apps/eventsub/app/app.go b/apps/eventsub/app/app.go index 896bd6311..277ba293c 100644 --- a/apps/eventsub/app/app.go +++ b/apps/eventsub/app/app.go @@ -1,16 +1,12 @@ package app import ( - "time" - - "github.com/redis/go-redis/v9" bus_listener "github.com/satont/twir/apps/eventsub/internal/bus-listener" "github.com/satont/twir/apps/eventsub/internal/handler" "github.com/satont/twir/apps/eventsub/internal/manager" "github.com/satont/twir/apps/eventsub/internal/tunnel" cfg "github.com/satont/twir/libs/config" - "github.com/satont/twir/libs/logger" - twirsentry "github.com/satont/twir/libs/sentry" + "github.com/twirapp/twir/libs/baseapp" buscore "github.com/twirapp/twir/libs/bus-core" "github.com/twirapp/twir/libs/grpc/clients" "github.com/twirapp/twir/libs/grpc/events" @@ -19,32 +15,11 @@ import ( "github.com/twirapp/twir/libs/grpc/websockets" "github.com/twirapp/twir/libs/uptrace" "go.uber.org/fx" - "gorm.io/driver/postgres" - "gorm.io/gorm" - gormLogger "gorm.io/gorm/logger" ) var App = fx.Options( + baseapp.CreateBaseApp("eventsub"), fx.Provide( - cfg.NewFx, - twirsentry.NewFx(twirsentry.NewFxOpts{Service: "eventsub"}), - logger.NewFx(logger.Opts{Service: "eventsub"}), - uptrace.NewFx("eventsub"), - func(config cfg.Config) (*gorm.DB, error) { - db, err := gorm.Open( - postgres.Open(config.DatabaseUrl), &gorm.Config{ - Logger: gormLogger.Default.LogMode(gormLogger.Silent), - }, - ) - if err != nil { - return nil, err - } - d, _ := db.DB() - d.SetMaxIdleConns(1) - d.SetMaxOpenConns(10) - d.SetConnMaxLifetime(time.Hour) - return db, nil - }, func(config cfg.Config) tokens.TokensClient { return clients.NewTokens(config.AppEnv) }, @@ -57,15 +32,6 @@ var App = fx.Options( func(config cfg.Config) websockets.WebsocketClient { return clients.NewWebsocket(config.AppEnv) }, - func(config cfg.Config) (*redis.Client, error) { - redisUrl, err := redis.ParseURL(config.RedisUrl) - if err != nil { - return nil, err - } - - redisClient := redis.NewClient(redisUrl) - return redisClient, nil - }, buscore.NewNatsBusFx("eventsub"), tunnel.New, manager.NewCreds, diff --git a/apps/eventsub/go.mod b/apps/eventsub/go.mod index bdeda432d..282cb6d68 100644 --- a/apps/eventsub/go.mod +++ b/apps/eventsub/go.mod @@ -30,7 +30,7 @@ require ( github.com/twirapp/twir/libs/grpc v0.0.0-20240126231400-72985ccc25a5 github.com/twirapp/twir/libs/integrations v0.0.0-00010101000000-000000000000 github.com/twirapp/twir/libs/uptrace v0.0.0-00010101000000-000000000000 - github.com/twirapp/twitch-eventsub-framework v1.3.2 + github.com/twirapp/twitch-eventsub-framework v1.3.6 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 go.opentelemetry.io/otel v1.24.0 go.opentelemetry.io/otel/trace v1.24.0 diff --git a/apps/eventsub/go.sum b/apps/eventsub/go.sum index c1f60d6f3..3d79a01f8 100644 --- a/apps/eventsub/go.sum +++ b/apps/eventsub/go.sum @@ -45,9 +45,11 @@ github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5x github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7 h1:y3N7Bm7Y9/CtpiVkw/ZWj6lSlDF3F74SfKwfTCer72Q= +github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 h1:6UKoz5ujsI55KNpsJH3UwCq3T8kKbZwNZBNPuTTje8U= @@ -86,6 +88,7 @@ github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg= +github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -123,6 +126,7 @@ github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1 github.com/quic-go/quic-go v0.41.0 h1:aD8MmHfgqTURWNJy48IYFg2OnxwHT3JL7ahGs73lb4k= github.com/quic-go/quic-go v0.41.0/go.mod h1:qCkNjqczPEvgsOnxZ0eCD14lv+B2LHlFAB++CNOh9hA= github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8= +github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/refraction-networking/utls v1.6.1 h1:n1JG5karzdGWsI6iZmGrOv3SNzR4c+4M8J6KWGsk3lA= github.com/refraction-networking/utls v1.6.1/go.mod h1:+EbcQOvQvXoFV9AEKbuGlljt1doLRKAVY1jJHe9EtDo= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= @@ -153,14 +157,24 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/twirapp/twitch-eventsub-framework v1.3.2 h1:lID4wdcg8rPu4vrBOE/yuEya55/nnHc9nBTCl1284T0= github.com/twirapp/twitch-eventsub-framework v1.3.2/go.mod h1:G67t1kwDexqC38efhh97md/HcnEZFqPFs+B1HeZkBtc= +github.com/twirapp/twitch-eventsub-framework v1.3.3 h1:n/eYSPhtn8vLv32wr20o01+RHbSGqfU3umv8fbXwUY0= +github.com/twirapp/twitch-eventsub-framework v1.3.3/go.mod h1:G67t1kwDexqC38efhh97md/HcnEZFqPFs+B1HeZkBtc= +github.com/twirapp/twitch-eventsub-framework v1.3.4 h1:ZG+xR4EFruKuOxJTcbe1sXSrhuo0xYbXqPxaZsiH4Lo= +github.com/twirapp/twitch-eventsub-framework v1.3.4/go.mod h1:G67t1kwDexqC38efhh97md/HcnEZFqPFs+B1HeZkBtc= +github.com/twirapp/twitch-eventsub-framework v1.3.5 h1:p5eqGyi+azNovuYVE4XL9tGeSj5qIzUPUKP+MKj7qWI= +github.com/twirapp/twitch-eventsub-framework v1.3.5/go.mod h1:G67t1kwDexqC38efhh97md/HcnEZFqPFs+B1HeZkBtc= +github.com/twirapp/twitch-eventsub-framework v1.3.6 h1:8FJiVMxi20HsJZSuvZT+HXUpUt/o2tBCTcsf1BSQbV0= +github.com/twirapp/twitch-eventsub-framework v1.3.6/go.mod h1:G67t1kwDexqC38efhh97md/HcnEZFqPFs+B1HeZkBtc= github.com/uptrace/uptrace-go v1.21.0 h1:oJoUjhiVT7aiuoG6B3ClVHtJozLn3cK9hQt8U5dQO1M= github.com/uptrace/uptrace-go v1.21.0/go.mod h1:/aXAFGKOqeAFBqWa1xtzLnGX2xJm1GScqz9NJ0TJjLM= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.48.0 h1:P+/g8GpuJGYbOp2tAdKrIPUX9JO02q8Q0YNlHolpibA= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.48.0/go.mod h1:tIKj3DbO8N9Y2xo52og3irLsPI4GW02DSMtrVgNMgxg= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw= go.opentelemetry.io/contrib/instrumentation/runtime v0.46.1 h1:m9ReioVPIffxjJlGNRd0d5poy+9oTro3D+YbiEzUDOc= go.opentelemetry.io/contrib/instrumentation/runtime v0.46.1/go.mod h1:CANkrsXNzqOKXfOomu2zhOmc1/J5UZK9SGjrat6ZCG0= go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= +go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.44.0 h1:jd0+5t/YynESZqsSyPz+7PAFdEop0dlN0+PkyHYo8oI= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.44.0/go.mod h1:U707O40ee1FpQGyhvqnzmCJm1Wh6OX6GGBVn0E6Uyyk= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 h1:cl5P5/GIfFh4t6xyruOgJP5QiA1pw4fYYdv6nc6CBWw= @@ -170,15 +184,19 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0/go.mod h go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.21.0 h1:VhlEQAPp9R1ktYfrPk5SOryw1e9LDDTZCbIPFrho0ec= go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.21.0/go.mod h1:kB3ufRbfU+CQ4MlUcqtW8Z7YEOBeK2DJ6CmR5rYYF3E= go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= +go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw= +go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= go.opentelemetry.io/otel/sdk/metric v1.21.0 h1:smhI5oD714d6jHE6Tie36fPx4WDFIg+Y6RfAY4ICcR0= go.opentelemetry.io/otel/sdk/metric v1.21.0/go.mod h1:FJ8RAsoPGv/wYMgBdUJXOm+6pzFY3YdljnXtv1SBE8Q= go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= +go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= go.uber.org/dig v1.17.1 h1:Tga8Lz8PcYNsWsyHMZ1Vm0OQOUaJNDyvPImgbAu9YSc= go.uber.org/dig v1.17.1/go.mod h1:Us0rSJiThwCv2GteUN0Q7OKvU7n5J4dxZ9JKUXozFdE= go.uber.org/fx v1.21.0 h1:qqD6k7PyFHONffW5speYx403ywanuASqU4Rqdpc22XY= +go.uber.org/fx v1.21.0/go.mod h1:HT2M7d7RHo+ebKGh9NRcrsrHHfpZ60nW3QRubMRfv48= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= @@ -186,31 +204,44 @@ go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.ngrok.com/muxado/v2 v2.0.0 h1:bu9eIDhRdYNtIXNnqat/HyMeHYOAbUH55ebD7gTvW6c= golang.ngrok.com/muxado/v2 v2.0.0/go.mod h1:wzxJYX4xiAtmwumzL+QsukVwFRXmPNv86vB8RPpOxyM= golang.ngrok.com/ngrok v1.8.0 h1:YzI3vDAlL9WOGC7/2ieM/XsCqb+qlxPsl6t66uyjzLc= golang.ngrok.com/ngrok v1.8.0/go.mod h1:c+Vdu7nhdE0bGFIuHkOnB8R+JEwtSWATOeY7MA53NKI= golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= +golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 h1:LfspQV/FYTatPTr/3HzIcmiUFH7PGP+OQ6mgDYo3yuQ= +golang.org/x/exp v0.0.0-20240222234643-814bf88cf225/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc= golang.org/x/mod v0.16.0 h1:QX4fJ0Rr5cPQCF7O9lh9Se4pmwfwskqZfq5moyldzic= +golang.org/x/mod v0.16.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= +golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/oauth2 v0.16.0 h1:aDkGMBSYxElaoP81NpoUoz2oo2R2wHdZpGToUxfyQrQ= golang.org/x/oauth2 v0.16.0/go.mod h1:hqZ+0LWXsiVoZpeld6jVt06P3adbS2Uu911W1SsJv2o= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8= +golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw= +golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc= google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= google.golang.org/genproto/googleapis/api v0.0.0-20240304212257-790db918fca8 h1:8eadJkXbwDEMNwcB5O0s5Y5eCfyuCLdvaiOIaGTrWmQ= +google.golang.org/genproto/googleapis/api v0.0.0-20240304212257-790db918fca8/go.mod h1:O1cOfN1Cy6QEYr7VxtjOyP5AdAuR0aJ/MYZaaof623Y= google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8 h1:IR+hp6ypxjH24bkMfEJ0yHR21+gwPWdV+/IBrPQyn3k= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8/go.mod h1:UCOku4NytXMJuLQE5VuqA5lX3PcHCBo8pxNyvkf4xBs= google.golang.org/grpc v1.62.0 h1:HQKZ/fa1bXkX1oFOvSjmZEUL8wLSaZTjCcLAlmZRtdk= +google.golang.org/grpc v1.62.0/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= @@ -218,4 +249,6 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gorm.io/driver/postgres v1.5.7 h1:8ptbNJTDbEmhdr62uReG5BGkdQyeasu/FZHxI0IMGnM= +gorm.io/driver/postgres v1.5.7/go.mod h1:3e019WlBaYI5o5LIdNV+LyxCMNtLOQETBXL2h4chKpA= gorm.io/gorm v1.25.9 h1:wct0gxZIELDk8+ZqF/MVnHLkA1rvYlBWUMv2EdsK1g8= +gorm.io/gorm v1.25.9/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= diff --git a/apps/eventsub/internal/bus-listener/bus-listener.go b/apps/eventsub/internal/bus-listener/bus-listener.go index 82c4f5ecc..a066a0b75 100644 --- a/apps/eventsub/internal/bus-listener/bus-listener.go +++ b/apps/eventsub/internal/bus-listener/bus-listener.go @@ -5,6 +5,7 @@ import ( "github.com/satont/twir/apps/eventsub/internal/manager" model "github.com/satont/twir/libs/gomodels" + "github.com/satont/twir/libs/logger" buscore "github.com/twirapp/twir/libs/bus-core" "github.com/twirapp/twir/libs/bus-core/eventsub" "go.uber.org/fx" @@ -15,6 +16,7 @@ type BusListener struct { eventSubClient *manager.Manager gorm *gorm.DB bus *buscore.Bus + logger logger.Logger } type Opts struct { @@ -24,6 +26,7 @@ type Opts struct { Manager *manager.Manager Gorm *gorm.DB Bus *buscore.Bus + Logger logger.Logger } func New(opts Opts) (*BusListener, error) { @@ -31,6 +34,7 @@ func New(opts Opts) (*BusListener, error) { eventSubClient: opts.Manager, gorm: opts.Gorm, bus: opts.Bus, + logger: opts.Logger, } opts.Lc.Append( @@ -60,11 +64,19 @@ func (c *BusListener) subscribeToEvents( msg.ChannelID, ).First(&channel).Error if err != nil { + c.logger.Error("error getting channel", err) + return struct{}{} + } + + var topics []model.EventsubTopic + if err := c.gorm.WithContext(ctx).Find(&topics).Error; err != nil { + c.logger.Error("error getting topics", err) return struct{}{} } if err := c.eventSubClient.SubscribeToNeededEvents( ctx, + topics, msg.ChannelID, channel.BotID, ); err != nil { diff --git a/apps/eventsub/internal/handler/handler.go b/apps/eventsub/internal/handler/handler.go index 6d7f90f20..5a913b25f 100644 --- a/apps/eventsub/internal/handler/handler.go +++ b/apps/eventsub/internal/handler/handler.go @@ -3,6 +3,7 @@ package handler import ( "context" "errors" + "log/slog" "net" "net/http" @@ -10,6 +11,7 @@ import ( "github.com/satont/twir/apps/eventsub/internal/manager" "github.com/satont/twir/apps/eventsub/internal/tunnel" cfg "github.com/satont/twir/libs/config" + model "github.com/satont/twir/libs/gomodels" "github.com/satont/twir/libs/logger" bus_core "github.com/twirapp/twir/libs/bus-core" "github.com/twirapp/twir/libs/cache/7tv" @@ -20,6 +22,7 @@ import ( "github.com/twirapp/twir/libs/grpc/websockets" seventvintegration "github.com/twirapp/twir/libs/integrations/seventv" eventsub_framework "github.com/twirapp/twitch-eventsub-framework" + eventsub_bindings "github.com/twirapp/twitch-eventsub-framework/esb" "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "go.opentelemetry.io/otel/trace" "go.uber.org/fx" @@ -66,6 +69,20 @@ type Opts struct { func New(opts Opts) *Handler { handler := eventsub_framework.NewSubHandler(true, []byte(opts.Config.TwitchClientSecret)) + handler.OnNotification = func( + h *eventsub_bindings.ResponseHeaders, + notification *eventsub_bindings.EventNotification, + ) { + if err := opts.Gorm. + Model(&model.EventsubSubscription{}). + Where("id = ?", notification.Subscription.ID). + Update("status", notification.Subscription.Status). + Error; err != nil { + opts.Logger.Error("failed to update subscription", slog.Any("err", err)) + return + } + } + myHandler := &Handler{ manager: opts.Manager, logger: opts.Logger, @@ -81,6 +98,9 @@ func New(opts Opts) *Handler { seventvCache: seventv.New(opts.Redis), } + handler.HandleUserAuthorizationRevoke = myHandler.handleUserAuthorizationRevoke + handler.OnRevocate = myHandler.handleSubRevocate + handler.HandleChannelUpdate = myHandler.handleChannelUpdate handler.HandleStreamOnline = myHandler.handleStreamOnline handler.HandleStreamOffline = myHandler.handleStreamOffline diff --git a/apps/eventsub/internal/handler/revoke.go b/apps/eventsub/internal/handler/revoke.go new file mode 100644 index 000000000..b1ac303a4 --- /dev/null +++ b/apps/eventsub/internal/handler/revoke.go @@ -0,0 +1,80 @@ +package handler + +import ( + "database/sql" + "log/slog" + + model "github.com/satont/twir/libs/gomodels" + "github.com/twirapp/twitch-eventsub-framework/esb" +) + +func (c *Handler) handleUserAuthorizationRevoke( + h *esb.ResponseHeaders, + event *esb.EventUserAuthorizationRevoke, +) { + c.logger.Info( + "handleUserAuthorizationRevoke", + slog.String("user_id", event.UserID), + slog.String("user_login", event.UserLogin), + ) + + if err := c.gorm.Model(&model.Channels{}). + Where("id = ?", event.UserID). + Update(`"isBotMod"`, false). + Update(`"isEnabled"`, false).Error; err != nil { + c.logger.Error("failed to update channel", slog.Any("err", err)) + } + + user := &model.Users{} + if err := c.gorm. + Where("id = ?", event.UserID). + First(user).Error; err != nil { + c.logger.Error("failed to get user", slog.Any("err", err)) + } + + if user.TokenID.Valid { + if err := c.gorm. + Delete( + &model.Tokens{}, + "id = ?", + user.TokenID.String, + ).Error; err != nil { + c.logger.Error( + "failed to delete token", + slog.Any("err", err), + ) + } + + user.TokenID = sql.NullString{} + if err := c.gorm.Save(&user).Error; err != nil { + c.logger.Error("failed to update user", slog.Any("err", err)) + } + } +} + +func (c *Handler) handleSubRevocate( + h *esb.ResponseHeaders, + revocation *esb.RevocationNotification, +) { + topic := &model.EventsubTopic{} + if err := c.gorm. + Where("topic = ?", revocation.Subscription.Type). + First(topic).Error; err != nil { + c.logger.Error("failed to get topic", slog.Any("err", err)) + return + } + + subscription := &model.EventsubSubscription{} + if err := c.gorm. + Where("topic_id = ?", topic.ID). + First(subscription).Error; err != nil { + c.logger.Error("failed to get subscription", slog.Any("err", err)) + return + } + + subscription.Status = revocation.Subscription.Status + + if err := c.gorm.Save(&subscription).Error; err != nil { + c.logger.Error("failed to delete subscription", slog.Any("err", err)) + } +} diff --git a/apps/eventsub/internal/manager/manager.go b/apps/eventsub/internal/manager/manager.go index 2fd17d14d..049b4dfbf 100644 --- a/apps/eventsub/internal/manager/manager.go +++ b/apps/eventsub/internal/manager/manager.go @@ -5,11 +5,9 @@ import ( "errors" "log/slog" "sync" - "time" - "github.com/avast/retry-go/v4" + "github.com/google/uuid" "github.com/nicklaw5/helix/v2" - "github.com/samber/lo" "github.com/satont/twir/apps/eventsub/internal/tunnel" cfg "github.com/satont/twir/libs/config" model "github.com/satont/twir/libs/gomodels" @@ -17,6 +15,7 @@ import ( "github.com/satont/twir/libs/twitch" "github.com/twirapp/twir/libs/grpc/tokens" eventsub_framework "github.com/twirapp/twitch-eventsub-framework" + "go.uber.org/atomic" "go.uber.org/fx" "gorm.io/gorm" ) @@ -31,7 +30,7 @@ type Manager struct { tunnel *tunnel.AppTunnel } -type ManagerOpts struct { +type Opts struct { fx.In Lc fx.Lifecycle @@ -43,7 +42,7 @@ type ManagerOpts struct { Tunnel *tunnel.AppTunnel } -func NewManager(opts ManagerOpts) (*Manager, error) { +func NewManager(opts Opts) (*Manager, error) { client := eventsub_framework.NewSubClient(opts.Creds) manager := &Manager{ @@ -58,7 +57,58 @@ func NewManager(opts ManagerOpts) (*Manager, error) { opts.Lc.Append( fx.Hook{ OnStart: func(ctx context.Context) error { + if opts.Config.AppEnv != "production" { + if err := manager. + gorm. + Session(&gorm.Session{AllowGlobalUpdate: true}). + Delete(&model.EventsubSubscription{}). + Error; err != nil { + return err + } + } + go func() { + if opts.Config.AppEnv != "production" { + twitchClient, err := twitch.NewAppClient(opts.Config, opts.TokensGrpc) + if err != nil { + panic(err) + } + + var subscriptions []helix.EventSubSubscription + cursor := "" + for { + subs, err := twitchClient.GetEventSubSubscriptions( + &helix.EventSubSubscriptionsParams{ + After: cursor, + }, + ) + if err != nil { + panic(err) + } + + subscriptions = append(subscriptions, subs.Data.EventSubSubscriptions...) + + if subs.Data.Pagination.Cursor == "" { + break + } + + cursor = subs.Data.Pagination.Cursor + } + + var unsubWg sync.WaitGroup + + for _, sub := range subscriptions { + sub := sub + unsubWg.Add(1) + go func() { + defer unsubWg.Done() + manager.Unsubscribe(ctx, sub.ID) + }() + } + + unsubWg.Wait() + } + requestContext := context.Background() var channels []model.Channels err := manager.gorm.Where( @@ -71,12 +121,30 @@ func NewManager(opts ManagerOpts) (*Manager, error) { panic(err) } + var topics []model.EventsubTopic + if err := opts.Gorm.WithContext(requestContext).Find(&topics).Error; err != nil { + panic(err) + } + for _, channel := range channels { - err = manager.SubscribeToNeededEvents(requestContext, channel.ID, channel.BotID) + err = manager.SubscribeToNeededEvents(requestContext, topics, channel.ID, channel.BotID) if err != nil { continue } } + + manager.SubscribeWithLimits( + requestContext, + &eventsub_framework.SubRequest{ + Type: "user.authorization.revoke", + Condition: map[string]string{ + "client_id": opts.Config.TwitchClientId, + }, + Callback: opts.Tunnel.GetAddr(), + Secret: opts.Config.TwitchClientSecret, + Version: "1", + }, + ) }() return nil @@ -87,296 +155,147 @@ func NewManager(opts ManagerOpts) (*Manager, error) { return manager, nil } -type SubRequest struct { - Version string - Condition map[string]string -} - -func (c *Manager) SubscribeToNeededEvents(ctx context.Context, userId, botId string) error { - channelCondition := map[string]string{ - "broadcaster_user_id": userId, - } - userCondition := map[string]string{ - "user_id": userId, - } - - channelConditionWithBotId := map[string]string{ - "broadcaster_user_id": userId, - "user_id": botId, - } - - channelConditionWithModeratorId := map[string]string{ - "broadcaster_user_id": userId, - "moderator_user_id": botId, - } - - neededSubs := map[string]SubRequest{ - "channel.update": { - Version: "2", - Condition: channelCondition, - }, - "stream.online": { - Version: "1", - Condition: channelCondition, - }, - "stream.offline": { - Version: "1", - Condition: channelCondition, - }, - "user.update": { - Condition: userCondition, - Version: "1", - }, - "channel.follow": { - Version: "2", - Condition: map[string]string{ - "broadcaster_user_id": userId, - "moderator_user_id": userId, - }, - }, - "channel.moderator.add": { - Version: "1", - Condition: channelCondition, - }, - "channel.moderator.remove": { - Version: "1", - Condition: channelCondition, - }, - "channel.channel_points_custom_reward_redemption.add": { - Version: "1", - Condition: channelCondition, - }, - "channel.channel_points_custom_reward_redemption.update": { - Version: "1", - Condition: channelCondition, - }, - "channel.poll.begin": { - Version: "1", - Condition: channelCondition, - }, - "channel.poll.progress": { - Version: "1", - Condition: channelCondition, - }, - "channel.poll.end": { - Version: "1", - Condition: channelCondition, - }, - "channel.prediction.begin": { - Version: "1", - Condition: channelCondition, - }, - "channel.prediction.lock": { - Version: "1", - Condition: channelCondition, - }, - "channel.prediction.progress": { - Version: "1", - Condition: channelCondition, - }, - "channel.prediction.end": { - Version: "1", - Condition: channelCondition, - }, - "channel.ban": { - Version: "1", - Condition: channelCondition, - }, - "channel.subscribe": { - Version: "1", - Condition: channelCondition, - }, - "channel.subscription.gift": { - Version: "1", - Condition: channelCondition, - }, - "channel.subscription.message": { - Version: "1", - Condition: channelCondition, - }, - "channel.raid": { - Version: "1", - Condition: map[string]string{ - "to_broadcaster_user_id": userId, - }, - }, - "channel.chat.clear": { - Version: "1", - Condition: channelConditionWithBotId, - }, - "channel.chat.clear_user_messages": { - Version: "1", - Condition: channelConditionWithBotId, - }, - "channel.chat.message_delete": { - Version: "1", - Condition: channelConditionWithBotId, - }, - "channel.chat.notification": { - Version: "1", - Condition: channelConditionWithBotId, - }, - "channel.chat.message": { - Version: "1", - Condition: channelConditionWithBotId, - }, - "channel.unban_request.create": { - Version: "1", - Condition: channelConditionWithModeratorId, - }, - "channel.unban_request.resolve": { - Version: "1", - Condition: channelConditionWithModeratorId, - }, - } - - twitchClient, err := twitch.NewAppClient(c.config, c.tokensGrpc) - if err != nil { - return err - } - - var subscriptions []helix.EventSubSubscription - cursor := "" - for { - subs, err := twitchClient.GetEventSubSubscriptions( - &helix.EventSubSubscriptionsParams{ - UserID: userId, - After: cursor, - }, - ) - if err != nil { - return err +func getTypeCondition( + t model.EventsubConditionType, + topic, + channelID, + botId string, +) map[string]string { + switch t { + case model.EventsubConditionTypeBroadcasterUserID: + return map[string]string{ + "broadcaster_user_id": channelID, } - - subscriptions = append(subscriptions, subs.Data.EventSubSubscriptions...) - - if subs.Data.Pagination.Cursor == "" { - break + case model.EventsubConditionTypeUserID: + return map[string]string{ + "user_id": channelID, } - - cursor = subs.Data.Pagination.Cursor - } - - if c.config.AppEnv != "production" { - var unsubWg sync.WaitGroup - - for _, sub := range subscriptions { - sub := sub - unsubWg.Add(1) - go func() { - defer unsubWg.Done() - c.Unsubscribe(ctx, sub.ID) - }() + case model.EventsubConditionTypeBroadcasterWithUserID: + data := map[string]string{ + "broadcaster_user_id": channelID, + "user_id": botId, } - - unsubWg.Wait() - - for key, value := range neededSubs { - key := key - value := value - - go func() { - c.Subscribe( - ctx, &eventsub_framework.SubRequest{ - Type: key, - Condition: value.Condition, - Callback: c.tunnel.GetAddr(), - Secret: c.config.TwitchClientSecret, - Version: value.Version, - }, - ) - - c.logger.Info( - "Subscribed", - slog.String("type", key), - slog.String("user_id", userId), - ) - }() + if topic == "channel.follow" { + data["user_id"] = channelID } - } else { - for key, value := range neededSubs { - for _, sub := range subscriptions { - if sub.Type != key { - continue - } + return data + case model.EventsubConditionTypeBroadcasterWithModeratorID: + return map[string]string{ + "broadcaster_user_id": channelID, + "moderator_user_id": botId, + } + case model.EventsubConditionTypeToBroadcasterID: + return map[string]string{ + "to_broadcaster_user_id": channelID, + } + default: + return nil + } +} - if sub.Status == "notification_failures_exceeded" { - c.logger.Info( - "Notification failures exceeded, resubscribing", - slog.String("type", key), - slog.String("user_id", userId), - ) +var statusesForSkip = []string{ + "enabled", + "webhook_callback_verification_pending", + "authorization_revoked", + "user_removed", + "version_removed", +} - if err := retry.Do( - func() error { - _, subscribeErr := c.Subscribe( - ctx, &eventsub_framework.SubRequest{ - Type: key, - Condition: value.Condition, - Callback: c.tunnel.GetAddr(), - Secret: c.config.TwitchClientSecret, - Version: value.Version, - }, - ) +func (c *Manager) SubscribeToNeededEvents( + ctx context.Context, + topics []model.EventsubTopic, + broadcasterId, + botId string, +) error { + var existedSubscriptions []model.EventsubSubscription + if err := c.gorm. + WithContext(ctx). + Where(&model.EventsubSubscription{UserID: broadcasterId}). + Where("status NOT IN ?", statusesForSkip). + Find(&existedSubscriptions). + Error; err != nil { + return err + } - return subscribeErr - }, - retry.Attempts(0), - retry.Delay(1*time.Second), - retry.RetryIf( - func(err error) bool { - var e *eventsub_framework.TwitchError - if errors.As(err, &e) && e.Status != 409 { - if e.Status == 429 { - return true - } - } - - return false - }, - ), - ); err != nil { - c.logger.Error( - "Failed to resubscribe", - slog.Any("err", err), - slog.String("type", key), - slog.String("user_id", userId), - ) - } - } + var wg sync.WaitGroup + newSubsCount := atomic.NewInt64(0) + + for _, topic := range topics { + wg.Add(1) + + topic := topic + go func() { + defer wg.Done() + condition := getTypeCondition(topic.ConditionType, topic.Topic, broadcasterId, botId) + if condition == nil { + c.logger.Error( + "failed to get condition", + slog.String("topic", topic.Topic), + slog.String("channel_id", broadcasterId), + slog.String("condition_type", string(topic.ConditionType)), + ) + return } - _, isExists := lo.Find( - subscriptions, func(item helix.EventSubSubscription) bool { - return item.Type == key + status, err := c.SubscribeWithLimits( + ctx, + &eventsub_framework.SubRequest{ + Type: topic.Topic, + Condition: condition, + Callback: c.tunnel.GetAddr(), + Secret: c.config.TwitchClientSecret, + Version: topic.Version, }, ) - if !isExists { - c.logger.Info( - "Subscription not found, resubscribing", - slog.String("type", key), - slog.String("user_id", userId), + var casterErr *eventsub_framework.TwitchError + if err != nil && !errors.As(err, &casterErr) { + c.logger.Error( + "failed to subscribe to event", + slog.Any("err", err), + slog.Any("topic", topic.Topic), + slog.Any("condition", condition), + slog.String("version", topic.Version), + slog.String("callback", c.tunnel.GetAddr()), ) + return + } - if _, err := c.Subscribe( - ctx, &eventsub_framework.SubRequest{ - Type: key, - Condition: value.Condition, - Callback: c.tunnel.GetAddr(), - Secret: c.config.TwitchClientSecret, - Version: value.Version, + if len(status.Data) > 0 || (casterErr != nil && casterErr.Status == 409) { + subStatus := "enabled" + subId := uuid.New() + if len(status.Data) > 0 { + subStatus = status.Data[0].Status + subId = uuid.MustParse(status.Data[0].ID) + } + + if err := c.gorm.Create( + &model.EventsubSubscription{ + ID: subId, + TopicID: topic.ID, + UserID: broadcasterId, + Status: subStatus, + Version: topic.Version, + CallbackUrl: c.tunnel.GetAddr(), }, - ); err != nil { - c.logger.Error( - "Failed to resubscribe", - slog.Any("err", err), - slog.String("type", key), - slog.String("user_id", userId), - ) + ).Error; err != nil { + c.logger.Error("failed to create subscription", slog.Any("err", err)) } + + newSubsCount.Inc() } - } + }() + } + + wg.Wait() + + if newSubsCount.Load() > 0 { + c.logger.Info( + "New subscriptions created for channel", + slog.String("channel_id", broadcasterId), + slog.String("bot_id", botId), + slog.Int64("count", newSubsCount.Load()), + ) } return nil diff --git a/apps/eventsub/internal/manager/subscribe.go b/apps/eventsub/internal/manager/subscribe.go new file mode 100644 index 000000000..d42bd1041 --- /dev/null +++ b/apps/eventsub/internal/manager/subscribe.go @@ -0,0 +1,41 @@ +package manager + +import ( + "context" + "errors" + "time" + + "github.com/avast/retry-go/v4" + eventsub_framework "github.com/twirapp/twitch-eventsub-framework" + "github.com/twirapp/twitch-eventsub-framework/esb" +) + +func (c *Manager) SubscribeWithLimits( + ctx context.Context, + srq *eventsub_framework.SubRequest, +) ( + *esb.RequestStatus, + error, +) { + data, err := retry.DoWithData( + func() (*esb.RequestStatus, error) { + data, subscribeErr := c.Subscribe(ctx, srq) + + return data, subscribeErr + }, + retry.Attempts(0), + retry.Delay(1*time.Second), + retry.RetryIf( + func(err error) bool { + var e *eventsub_framework.TwitchError + if errors.As(err, &e) && e.Status == 429 { + return true + } + + return false + }, + ), + ) + + return data, err +} diff --git a/apps/timers/app/app.go b/apps/timers/app/app.go index 7b917e2e3..76fc368d4 100644 --- a/apps/timers/app/app.go +++ b/apps/timers/app/app.go @@ -1,12 +1,8 @@ package app import ( - "log/slog" - "github.com/satont/twir/apps/timers/internal/activity" bus_listener "github.com/satont/twir/apps/timers/internal/bus-listener" - "github.com/satont/twir/apps/timers/internal/gorm" - "github.com/satont/twir/apps/timers/internal/redis" "github.com/satont/twir/apps/timers/internal/repositories/channels" "github.com/satont/twir/apps/timers/internal/repositories/streams" "github.com/satont/twir/apps/timers/internal/repositories/timers" @@ -14,7 +10,7 @@ import ( "github.com/satont/twir/apps/timers/internal/workflow" cfg "github.com/satont/twir/libs/config" "github.com/satont/twir/libs/logger" - sentryInternal "github.com/satont/twir/libs/sentry" + "github.com/twirapp/twir/libs/baseapp" buscore "github.com/twirapp/twir/libs/bus-core" "github.com/twirapp/twir/libs/grpc/clients" "github.com/twirapp/twir/libs/grpc/parser" @@ -24,14 +20,9 @@ import ( var App = fx.Module( "timers", + baseapp.CreateBaseApp("timers"), fx.Provide( - cfg.NewFx, - sentryInternal.NewFx(sentryInternal.NewFxOpts{Service: "timers"}), - logger.NewFx(logger.Opts{Level: slog.LevelInfo, Service: "timers"}), - uptrace.NewFx("timers"), - gorm.New, buscore.NewNatsBusFx("timers"), - redis.New, timers.NewGorm, activity.New, workflow.New, diff --git a/apps/timers/internal/gorm/gorm.go b/apps/timers/internal/gorm/gorm.go deleted file mode 100644 index 7d012df5f..000000000 --- a/apps/timers/internal/gorm/gorm.go +++ /dev/null @@ -1,37 +0,0 @@ -package gorm - -import ( - "context" - "time" - - cfg "github.com/satont/twir/libs/config" - "go.uber.org/fx" - "gorm.io/driver/postgres" - "gorm.io/gorm" -) - -func New(config cfg.Config, lc fx.Lifecycle) (*gorm.DB, error) { - db, err := gorm.Open(postgres.Open(config.DatabaseUrl)) - if err != nil { - return nil, err - } - - d, err := db.DB() - if err != nil { - return nil, err - } - d.SetMaxIdleConns(1) - d.SetMaxOpenConns(10) - d.SetConnMaxLifetime(time.Hour) - - lc.Append( - fx.Hook{ - OnStart: nil, - OnStop: func(ctx context.Context) error { - return d.Close() - }, - }, - ) - - return db, nil -} diff --git a/apps/timers/internal/redis/redis.go b/apps/timers/internal/redis/redis.go deleted file mode 100644 index 62cfcbf70..000000000 --- a/apps/timers/internal/redis/redis.go +++ /dev/null @@ -1,15 +0,0 @@ -package redis - -import ( - "github.com/redis/go-redis/v9" - cfg "github.com/satont/twir/libs/config" -) - -func New(config cfg.Config) (*redis.Client, error) { - redisOpts, err := redis.ParseURL(config.RedisUrl) - if err != nil { - return nil, err - } - - return redis.NewClient(redisOpts), nil -} diff --git a/apps/tokens/app/app.go b/apps/tokens/app/app.go index ab3355fad..1fc46fcb2 100644 --- a/apps/tokens/app/app.go +++ b/apps/tokens/app/app.go @@ -1,25 +1,18 @@ package app import ( - "github.com/satont/twir/apps/tokens/internal/gorm" "github.com/satont/twir/apps/tokens/internal/grpc_impl" "github.com/satont/twir/apps/tokens/internal/redis" - "github.com/satont/twir/libs/config" "github.com/satont/twir/libs/logger" - twirsentry "github.com/satont/twir/libs/sentry" + "github.com/twirapp/twir/libs/baseapp" "github.com/twirapp/twir/libs/uptrace" "go.uber.org/fx" ) var App = fx.Module( "tokens", + baseapp.CreateBaseApp("tokens"), fx.Provide( - cfg.NewFx, - twirsentry.NewFx(twirsentry.NewFxOpts{Service: "tokens"}), - logger.NewFx(logger.Opts{Service: "tokens"}), - uptrace.NewFx("tokens"), - gorm.New, - redis.New, redis.NewRedisLock, ), fx.Invoke( diff --git a/apps/tokens/internal/gorm/gorm.go b/apps/tokens/internal/gorm/gorm.go deleted file mode 100644 index 7d012df5f..000000000 --- a/apps/tokens/internal/gorm/gorm.go +++ /dev/null @@ -1,37 +0,0 @@ -package gorm - -import ( - "context" - "time" - - cfg "github.com/satont/twir/libs/config" - "go.uber.org/fx" - "gorm.io/driver/postgres" - "gorm.io/gorm" -) - -func New(config cfg.Config, lc fx.Lifecycle) (*gorm.DB, error) { - db, err := gorm.Open(postgres.Open(config.DatabaseUrl)) - if err != nil { - return nil, err - } - - d, err := db.DB() - if err != nil { - return nil, err - } - d.SetMaxIdleConns(1) - d.SetMaxOpenConns(10) - d.SetConnMaxLifetime(time.Hour) - - lc.Append( - fx.Hook{ - OnStart: nil, - OnStop: func(ctx context.Context) error { - return d.Close() - }, - }, - ) - - return db, nil -} diff --git a/apps/tokens/internal/redis/redis.go b/apps/tokens/internal/redis/redis.go index 856ee6ce6..9ea7b3da9 100644 --- a/apps/tokens/internal/redis/redis.go +++ b/apps/tokens/internal/redis/redis.go @@ -4,18 +4,8 @@ import ( "github.com/go-redsync/redsync/v4" "github.com/go-redsync/redsync/v4/redis/goredis/v9" "github.com/redis/go-redis/v9" - cfg "github.com/satont/twir/libs/config" ) -func New(config cfg.Config) (*redis.Client, error) { - redisOpts, err := redis.ParseURL(config.RedisUrl) - if err != nil { - return nil, err - } - - return redis.NewClient(redisOpts), nil -} - func NewRedisLock(redis *redis.Client) *redsync.Redsync { pool := goredis.NewPool(redis) return redsync.New(pool) diff --git a/apps/websockets/app/app.go b/apps/websockets/app/app.go index a9457e982..a66c1521b 100644 --- a/apps/websockets/app/app.go +++ b/apps/websockets/app/app.go @@ -5,7 +5,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" bus_listener "github.com/satont/twir/apps/websockets/internal/bus-listener" - "github.com/satont/twir/apps/websockets/internal/gorm" "github.com/satont/twir/apps/websockets/internal/grpc_impl" "github.com/satont/twir/apps/websockets/internal/namespaces/overlays/alerts" "github.com/satont/twir/apps/websockets/internal/namespaces/overlays/be_right_back" @@ -17,10 +16,9 @@ import ( "github.com/satont/twir/apps/websockets/internal/namespaces/overlays/registry/overlays" "github.com/satont/twir/apps/websockets/internal/namespaces/overlays/tts" "github.com/satont/twir/apps/websockets/internal/namespaces/youtube" - "github.com/satont/twir/apps/websockets/internal/redis" config "github.com/satont/twir/libs/config" "github.com/satont/twir/libs/logger" - twirsentry "github.com/satont/twir/libs/sentry" + "github.com/twirapp/twir/libs/baseapp" buscore "github.com/twirapp/twir/libs/bus-core" "github.com/twirapp/twir/libs/grpc/clients" "github.com/twirapp/twir/libs/grpc/parser" @@ -32,14 +30,9 @@ import ( const service = "Websockets" var App = fx.Module( - "websockets", + service, + baseapp.CreateBaseApp(service), fx.Provide( - config.NewFx, - twirsentry.NewFx(twirsentry.NewFxOpts{Service: service}), - logger.NewFx(logger.Opts{Service: service}), - uptrace.NewFx(service), - redis.New, - gorm.New, buscore.NewNatsBusFx(service), func(cfg config.Config) parser.ParserClient { return clients.NewParser(cfg.AppEnv) diff --git a/apps/websockets/internal/gorm/gorm.go b/apps/websockets/internal/gorm/gorm.go deleted file mode 100644 index 4a6097cac..000000000 --- a/apps/websockets/internal/gorm/gorm.go +++ /dev/null @@ -1,41 +0,0 @@ -package gorm - -import ( - "context" - "time" - - cfg "github.com/satont/twir/libs/config" - "go.uber.org/fx" - "gorm.io/driver/postgres" - "gorm.io/gorm" -) - -type Opts struct { - fx.In - - Config cfg.Config - LC fx.Lifecycle -} - -func New(opts Opts) (*gorm.DB, error) { - db, err := gorm.Open( - postgres.Open(opts.Config.DatabaseUrl), - ) - if err != nil { - return nil, err - } - d, _ := db.DB() - d.SetMaxIdleConns(1) - d.SetMaxOpenConns(10) - d.SetConnMaxLifetime(time.Hour) - - opts.LC.Append( - fx.Hook{ - OnStop: func(ctx context.Context) error { - return d.Close() - }, - }, - ) - - return db, nil -} diff --git a/apps/websockets/internal/redis/redis.go b/apps/websockets/internal/redis/redis.go deleted file mode 100644 index 5328f9161..000000000 --- a/apps/websockets/internal/redis/redis.go +++ /dev/null @@ -1,31 +0,0 @@ -package redis - -import ( - "context" - - "github.com/redis/go-redis/v9" - cfg "github.com/satont/twir/libs/config" - "go.uber.org/fx" -) - -func New(config cfg.Config, lc fx.Lifecycle) (*redis.Client, error) { - opts, err := redis.ParseURL(config.RedisUrl) - if err != nil { - return nil, err - } - - client := redis.NewClient(opts) - - lc.Append( - fx.Hook{ - OnStart: func(ctx context.Context) error { - return client.Ping(ctx).Err() - }, - OnStop: func(ctx context.Context) error { - return client.Close() - }, - }, - ) - - return client, nil -} diff --git a/apps/ytsr/app/app.go b/apps/ytsr/app/app.go index ab8daeebc..3438c5eb9 100644 --- a/apps/ytsr/app/app.go +++ b/apps/ytsr/app/app.go @@ -2,21 +2,15 @@ package app import ( "github.com/satont/twir/apps/ytsr/internal/grpc_impl" - cfg "github.com/satont/twir/libs/config" "github.com/satont/twir/libs/logger" - twirsentry "github.com/satont/twir/libs/sentry" + "github.com/twirapp/twir/libs/baseapp" "github.com/twirapp/twir/libs/uptrace" "go.uber.org/fx" ) var App = fx.Module( "ytsr", - fx.Provide( - cfg.NewFx, - twirsentry.NewFx(twirsentry.NewFxOpts{Service: "ytsr"}), - uptrace.NewFx("ytsr"), - logger.NewFx(logger.Opts{Service: "ytsr"}), - ), + baseapp.CreateBaseApp("ytsr"), fx.Invoke( uptrace.NewFx("ytsr"), grpc_impl.New, diff --git a/frontend/dashboard/src/api/admin/actions.ts b/frontend/dashboard/src/api/admin/actions.ts index e6e91afad..00847dc8c 100644 --- a/frontend/dashboard/src/api/admin/actions.ts +++ b/frontend/dashboard/src/api/admin/actions.ts @@ -9,3 +9,11 @@ export function useMutationDropAllAuthSessions() { } `)) } + +export function useMutationEventSubSubscribe() { + return useMutation(graphql(` + mutation EventsubSubscribe($opts: EventsubSubscribeInput!) { + eventsubSubscribe(opts: $opts) + } + `)) +} diff --git a/frontend/dashboard/src/assets/index.css b/frontend/dashboard/src/assets/index.css index d97d74f31..c8f49c62f 100644 --- a/frontend/dashboard/src/assets/index.css +++ b/frontend/dashboard/src/assets/index.css @@ -40,7 +40,7 @@ --background:240 10% 3.9%; --foreground:0 0% 98%; - --card:240 10% 3.9%; + --card: 240, 8%, 10%; --card-foreground:0 0% 98%; --popover:240 10% 3.9%; diff --git a/frontend/dashboard/src/components/games/8ball.vue b/frontend/dashboard/src/components/games/8ball.vue index bda7f3182..e58dfd32c 100644 --- a/frontend/dashboard/src/components/games/8ball.vue +++ b/frontend/dashboard/src/components/games/8ball.vue @@ -7,7 +7,7 @@ import { useI18n } from 'vue-i18n' import Card from './card.vue' import { useGamesApi } from '@/api/games/games.js' -import CommandButton from '@/features/commands/components/command-button.vue' +import CommandButton from '@/features/commands/ui/command-button.vue' const isModalOpened = ref(false) diff --git a/frontend/dashboard/src/components/games/duel.vue b/frontend/dashboard/src/components/games/duel.vue index 9983a2831..80c8ac652 100644 --- a/frontend/dashboard/src/components/games/duel.vue +++ b/frontend/dashboard/src/components/games/duel.vue @@ -20,7 +20,7 @@ import type { GamesQuery } from '@/gql/graphql' import { useGamesApi } from '@/api/games/games.js' import IconDuel from '@/assets/games/duel.svg?use' import { useNaiveDiscrete } from '@/composables/use-naive-discrete' -import CommandButton from '@/features/commands/components/command-button.vue' +import CommandButton from '@/features/commands/ui/command-button.vue' const gamesApi = useGamesApi() const { data: settings } = gamesApi.useGamesQuery() diff --git a/frontend/dashboard/src/components/games/russianRoulette.vue b/frontend/dashboard/src/components/games/russianRoulette.vue index efcbcb344..c160269ce 100644 --- a/frontend/dashboard/src/components/games/russianRoulette.vue +++ b/frontend/dashboard/src/components/games/russianRoulette.vue @@ -19,7 +19,7 @@ import type { GamesQuery } from '@/gql/graphql' import { useGamesApi } from '@/api/games/games' import { useNaiveDiscrete } from '@/composables/use-naive-discrete' -import CommandButton from '@/features/commands/components/command-button.vue' +import CommandButton from '@/features/commands/ui/command-button.vue' const isModalOpened = ref(false) diff --git a/frontend/dashboard/src/components/integrations/seven-tv.vue b/frontend/dashboard/src/components/integrations/seven-tv.vue index 5a77a08b7..72a7cd8a5 100644 --- a/frontend/dashboard/src/components/integrations/seven-tv.vue +++ b/frontend/dashboard/src/components/integrations/seven-tv.vue @@ -31,7 +31,7 @@ import { useSevenTv } from '@/components/integrations/use-seven-tv' import WithSettings from '@/components/integrations/variants/withSettings.vue' import RewardsSelector from '@/components/rewardsSelector.vue' import { useNaiveDiscrete } from '@/composables/use-naive-discrete' -import CommandsList from '@/features/commands/components/list.vue' +import CommandsList from '@/features/commands/ui/list.vue' const { t } = useI18n() diff --git a/frontend/dashboard/src/components/overlays/brb/settings.vue b/frontend/dashboard/src/components/overlays/brb/settings.vue index bcf3d1267..fa16ad359 100644 --- a/frontend/dashboard/src/components/overlays/brb/settings.vue +++ b/frontend/dashboard/src/components/overlays/brb/settings.vue @@ -18,7 +18,7 @@ import { useCopyOverlayLink } from '../copyOverlayLink' import type { Settings } from '@twir/api/messages/overlays_be_right_back/overlays_be_right_back' import { useBeRightBackOverlayManager, useProfile } from '@/api' -import commandButton from '@/features/commands/components/command-button.vue' +import commandButton from '@/features/commands/ui/command-button.vue' defineProps<{ showSettings: boolean diff --git a/frontend/dashboard/src/components/overlays/tts.vue b/frontend/dashboard/src/components/overlays/tts.vue index 65852838c..64dc0f6be 100644 --- a/frontend/dashboard/src/components/overlays/tts.vue +++ b/frontend/dashboard/src/components/overlays/tts.vue @@ -1,33 +1,33 @@ diff --git a/frontend/dashboard/src/components/songRequests/settings.vue b/frontend/dashboard/src/components/songRequests/settings.vue index 2cf0ad0a1..10cfc1071 100644 --- a/frontend/dashboard/src/components/songRequests/settings.vue +++ b/frontend/dashboard/src/components/songRequests/settings.vue @@ -27,7 +27,7 @@ import type { VNodeChild } from 'vue' import { useCommandsApi } from '@/api/commands/commands' import { useSongRequestsApi } from '@/api/song-requests' import TwitchSearchUsers from '@/components/twitchUsers/multiple.vue' -import CommandList from '@/features/commands/components/list.vue' +import CommandList from '@/features/commands/ui/list.vue' import { SongRequestsSearchChannelOrVideoOptsType } from '@/gql/graphql' const { t } = useI18n() diff --git a/frontend/dashboard/src/components/ui/badge/index.ts b/frontend/dashboard/src/components/ui/badge/index.ts index d90c2ef42..abf81efb9 100644 --- a/frontend/dashboard/src/components/ui/badge/index.ts +++ b/frontend/dashboard/src/components/ui/badge/index.ts @@ -3,23 +3,24 @@ import { type VariantProps, cva } from 'class-variance-authority' export { default as Badge } from './Badge.vue' export const badgeVariants = cva( - 'inline-flex items-center rounded-full border px-2.5 py-0.5 text-xs font-semibold transition-colors focus:outline-none focus:ring-2 focus:ring-ring focus:ring-offset-2', - { - variants: { - variant: { - default: + 'inline-flex items-center rounded-full border px-2.5 py-0.5 text-xs font-semibold transition-colors focus:outline-none focus:ring-2 focus:ring-ring focus:ring-offset-2', + { + variants: { + variant: { + default: 'border-transparent bg-primary text-primary-foreground hover:bg-primary/80', - secondary: + secondary: 'border-transparent bg-secondary text-secondary-foreground hover:bg-secondary/80', - destructive: + destructive: 'border-transparent bg-destructive text-destructive-foreground hover:bg-destructive/80', - outline: 'text-foreground', - }, - }, - defaultVariants: { - variant: 'default', - }, - }, + outline: 'text-foreground', + success: 'border-transparent bg-green-600 text-destructive-foreground hover:bg-green-600/80', + }, + }, + defaultVariants: { + variant: 'default', + }, + }, ) export type BadgeVariants = VariantProps diff --git a/frontend/dashboard/src/components/ui/card/Card.vue b/frontend/dashboard/src/components/ui/card/Card.vue index dda8de7c4..12dd5ec7d 100644 --- a/frontend/dashboard/src/components/ui/card/Card.vue +++ b/frontend/dashboard/src/components/ui/card/Card.vue @@ -11,7 +11,7 @@ const props = defineProps<{
+import DropSessions from './ui/drop-sessions.vue' +import EventsubSubscribe from './ui/eventsub-subscribe.vue' + + + diff --git a/frontend/dashboard/src/features/admin-panel/actions/ui/drop-sessions.vue b/frontend/dashboard/src/features/admin-panel/actions/ui/drop-sessions.vue new file mode 100644 index 000000000..6a181567d --- /dev/null +++ b/frontend/dashboard/src/features/admin-panel/actions/ui/drop-sessions.vue @@ -0,0 +1,47 @@ + + + diff --git a/frontend/dashboard/src/features/admin-panel/actions/ui/eventsub-subscribe.vue b/frontend/dashboard/src/features/admin-panel/actions/ui/eventsub-subscribe.vue new file mode 100644 index 000000000..8d8c173ea --- /dev/null +++ b/frontend/dashboard/src/features/admin-panel/actions/ui/eventsub-subscribe.vue @@ -0,0 +1,167 @@ + + + diff --git a/frontend/dashboard/src/features/admin-panel/admin-actions/admin-actions.vue b/frontend/dashboard/src/features/admin-panel/admin-actions/admin-actions.vue deleted file mode 100644 index 8bd71acb9..000000000 --- a/frontend/dashboard/src/features/admin-panel/admin-actions/admin-actions.vue +++ /dev/null @@ -1,7 +0,0 @@ - - - diff --git a/frontend/dashboard/src/features/admin-panel/admin-actions/components/drop-sessions.vue b/frontend/dashboard/src/features/admin-panel/admin-actions/components/drop-sessions.vue deleted file mode 100644 index e763660af..000000000 --- a/frontend/dashboard/src/features/admin-panel/admin-actions/components/drop-sessions.vue +++ /dev/null @@ -1,33 +0,0 @@ - - - - - diff --git a/frontend/dashboard/src/features/admin-panel/manage-badges/components/badges-form.vue b/frontend/dashboard/src/features/admin-panel/manage-badges/components/badges-form.vue deleted file mode 100644 index f9aa21bdf..000000000 --- a/frontend/dashboard/src/features/admin-panel/manage-badges/components/badges-form.vue +++ /dev/null @@ -1,96 +0,0 @@ - - - diff --git a/frontend/dashboard/src/features/admin-panel/manage-badges/manage-badges.vue b/frontend/dashboard/src/features/admin-panel/manage-badges/manage-badges.vue index 0ae82652a..1fd91d064 100644 --- a/frontend/dashboard/src/features/admin-panel/manage-badges/manage-badges.vue +++ b/frontend/dashboard/src/features/admin-panel/manage-badges/manage-badges.vue @@ -1,10 +1,10 @@