Skip to content

Commit

Permalink
refactor(eventsub): do not populate subscriptions in production (#743)
Browse files Browse the repository at this point in the history
* refactor(eventsub): do not populate subscriptions in production

* kruto
  • Loading branch information
Satont committed May 31, 2024
1 parent 7cfae27 commit 256edfb
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 162 deletions.
17 changes: 11 additions & 6 deletions apps/eventsub/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/satont/twir/apps/eventsub

go 1.21.5
go 1.22.2

toolchain go1.22.3

replace (
github.com/satont/twir/libs/config => ../../libs/config
Expand All @@ -17,28 +19,28 @@ require (
github.com/avast/retry-go/v4 v4.5.1
github.com/google/uuid v1.6.0
github.com/lib/pq v1.10.9
github.com/nicklaw5/helix/v2 v2.25.3
github.com/nicklaw5/helix/v2 v2.28.1
github.com/redis/go-redis/v9 v9.5.1
github.com/samber/lo v1.39.0
github.com/satont/twir/libs/config v0.0.0-20240126231400-72985ccc25a5
github.com/satont/twir/libs/gomodels v0.0.0-20240225024146-742838c78cea
github.com/satont/twir/libs/logger v0.0.0-20240208100157-ecbe2d7afcfd
github.com/satont/twir/libs/sentry v0.0.0-20240208100157-ecbe2d7afcfd
github.com/satont/twir/libs/twitch v0.0.0-20240126231400-72985ccc25a5
github.com/satont/twir/libs/types v0.0.0-20240126231400-72985ccc25a5
github.com/twirapp/twir/libs/baseapp v0.0.0-20240531050209-7cfae27458cc
github.com/twirapp/twir/libs/bus-core v0.0.0-20240225024146-742838c78cea
github.com/twirapp/twir/libs/cache v0.0.0-20240531050209-7cfae27458cc
github.com/twirapp/twir/libs/grpc v0.0.0-20240126231400-72985ccc25a5
github.com/twirapp/twir/libs/integrations v0.0.0-00010101000000-000000000000
github.com/twirapp/twir/libs/uptrace v0.0.0-00010101000000-000000000000
github.com/twirapp/twitch-eventsub-framework v1.3.6
github.com/twirapp/twitch-eventsub-framework v1.3.7
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0
go.opentelemetry.io/otel v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
go.uber.org/atomic v1.11.0
go.uber.org/fx v1.21.0
go.uber.org/zap v1.27.0
golang.ngrok.com/ngrok v1.8.0
google.golang.org/protobuf v1.33.0
gorm.io/driver/postgres v1.5.7
gorm.io/gorm v1.25.9
)

Expand Down Expand Up @@ -91,6 +93,8 @@ require (
github.com/samber/slog-multi v1.0.2 // indirect
github.com/samber/slog-sentry/v2 v2.4.0 // indirect
github.com/samber/slog-zerolog/v2 v2.2.0 // indirect
github.com/satont/twir/libs/sentry v0.0.0-20240208100157-ecbe2d7afcfd // indirect
github.com/satont/twir/libs/types v0.0.0-20240126231400-72985ccc25a5 // indirect
github.com/satori/go.uuid v1.2.0 // indirect
github.com/uptrace/uptrace-go v1.21.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.48.0 // indirect
Expand Down Expand Up @@ -119,4 +123,5 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20240304212257-790db918fca8 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8 // indirect
google.golang.org/grpc v1.62.0 // indirect
gorm.io/driver/postgres v1.5.7 // indirect
)
23 changes: 13 additions & 10 deletions apps/eventsub/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL
github.com/cloudflare/circl v1.3.7 h1:qlCDlTPz2n9fu58M0Nh1J/JzcFpfgkFHHX3O35r5vcU=
github.com/cloudflare/circl v1.3.7/go.mod h1:sRTcRWXGLrKw6yIGJ+l7amYJFfAXbZG0kBSc8r4zxgA=
github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa h1:jQCWAUqqlij9Pgj2i/PB79y4KOPYVyFYdROxgaCwdTQ=
github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq8dk6e9PdstVsDgu9RuyIIJqAaF//0IM=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A=
github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/getsentry/sentry-go v0.26.0 h1:IX3++sF6/4B5JcevhdZfdKIHfyvMmAq/UnqcyT2H6mA=
Expand Down Expand Up @@ -109,8 +111,8 @@ github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/nicklaw5/helix/v2 v2.25.3 h1:BSTFa1UguvryFb8biCyYgnVnshftU2zMGuHSLi84tsg=
github.com/nicklaw5/helix/v2 v2.25.3/go.mod h1:zZcKsyyBWDli34x3QleYsVMiiNGMXPAEU5NjsiZDtvY=
github.com/nicklaw5/helix/v2 v2.28.1 h1:bLVKMrZ0MiSgCLB3nsi7+OrhognsIusqvNL4XFoRG0A=
github.com/nicklaw5/helix/v2 v2.28.1/go.mod h1:zZcKsyyBWDli34x3QleYsVMiiNGMXPAEU5NjsiZDtvY=
github.com/onsi/ginkgo/v2 v2.15.0 h1:79HwNRBAZHOEwrczrgSOPy+eFTTlIGELKy5as+ClttY=
github.com/onsi/ginkgo/v2 v2.15.0/go.mod h1:HlxMHtYF57y6Dpf+mc5529KKmSq9h2FpCF+/ZkwUxKM=
github.com/onsi/gomega v1.30.0 h1:hvMK7xYz4D3HapigLTeGdId/NcfQx1VHMJc60ew99+8=
Expand Down Expand Up @@ -155,16 +157,15 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/twirapp/twitch-eventsub-framework v1.3.2 h1:lID4wdcg8rPu4vrBOE/yuEya55/nnHc9nBTCl1284T0=
github.com/twirapp/twitch-eventsub-framework v1.3.2/go.mod h1:G67t1kwDexqC38efhh97md/HcnEZFqPFs+B1HeZkBtc=
github.com/twirapp/twitch-eventsub-framework v1.3.3 h1:n/eYSPhtn8vLv32wr20o01+RHbSGqfU3umv8fbXwUY0=
github.com/twirapp/twitch-eventsub-framework v1.3.3/go.mod h1:G67t1kwDexqC38efhh97md/HcnEZFqPFs+B1HeZkBtc=
github.com/twirapp/twitch-eventsub-framework v1.3.4 h1:ZG+xR4EFruKuOxJTcbe1sXSrhuo0xYbXqPxaZsiH4Lo=
github.com/twirapp/twitch-eventsub-framework v1.3.4/go.mod h1:G67t1kwDexqC38efhh97md/HcnEZFqPFs+B1HeZkBtc=
github.com/twirapp/twitch-eventsub-framework v1.3.5 h1:p5eqGyi+azNovuYVE4XL9tGeSj5qIzUPUKP+MKj7qWI=
github.com/twirapp/twitch-eventsub-framework v1.3.5/go.mod h1:G67t1kwDexqC38efhh97md/HcnEZFqPFs+B1HeZkBtc=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/twirapp/twir/libs/baseapp v0.0.0-20240531050209-7cfae27458cc h1:6CK3BWdPW5BpqwnAQJff4Z4J3drBp0vWH5tAhofwJzA=
github.com/twirapp/twir/libs/baseapp v0.0.0-20240531050209-7cfae27458cc/go.mod h1:hKOpVrA7x23znTdaCO9sgjh5L/W7rhCy2bXDgtYhIzc=
github.com/twirapp/twir/libs/cache v0.0.0-20240531050209-7cfae27458cc h1:v1kvbaCu8vYsSRPhhmuQfsVRx96q72blI5TKaNLbrZY=
github.com/twirapp/twir/libs/cache v0.0.0-20240531050209-7cfae27458cc/go.mod h1:QN8X1SS0Fp5Hug0++0DDu6xFx7Lv2ixxHBMdW4OCKJk=
github.com/twirapp/twitch-eventsub-framework v1.3.6 h1:8FJiVMxi20HsJZSuvZT+HXUpUt/o2tBCTcsf1BSQbV0=
github.com/twirapp/twitch-eventsub-framework v1.3.6/go.mod h1:G67t1kwDexqC38efhh97md/HcnEZFqPFs+B1HeZkBtc=
github.com/twirapp/twitch-eventsub-framework v1.3.7 h1:HwpolVPgZRVCJCD+AzlLol2nRZId+lLLR41qXilDYKY=
github.com/twirapp/twitch-eventsub-framework v1.3.7/go.mod h1:G67t1kwDexqC38efhh97md/HcnEZFqPFs+B1HeZkBtc=
github.com/uptrace/uptrace-go v1.21.0 h1:oJoUjhiVT7aiuoG6B3ClVHtJozLn3cK9hQt8U5dQO1M=
github.com/uptrace/uptrace-go v1.21.0/go.mod h1:/aXAFGKOqeAFBqWa1xtzLnGX2xJm1GScqz9NJ0TJjLM=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.48.0 h1:P+/g8GpuJGYbOp2tAdKrIPUX9JO02q8Q0YNlHolpibA=
Expand Down Expand Up @@ -193,6 +194,8 @@ go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y
go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU=
go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I=
go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/dig v1.17.1 h1:Tga8Lz8PcYNsWsyHMZ1Vm0OQOUaJNDyvPImgbAu9YSc=
go.uber.org/dig v1.17.1/go.mod h1:Us0rSJiThwCv2GteUN0Q7OKvU7n5J4dxZ9JKUXozFdE=
go.uber.org/fx v1.21.0 h1:qqD6k7PyFHONffW5speYx403ywanuASqU4Rqdpc22XY=
Expand Down
18 changes: 1 addition & 17 deletions apps/eventsub/internal/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package handler
import (
"context"
"errors"
"log/slog"
"net"
"net/http"

"github.com/redis/go-redis/v9"
"github.com/satont/twir/apps/eventsub/internal/manager"
"github.com/satont/twir/apps/eventsub/internal/tunnel"
cfg "github.com/satont/twir/libs/config"
model "github.com/satont/twir/libs/gomodels"
"github.com/satont/twir/libs/logger"
bus_core "github.com/twirapp/twir/libs/bus-core"
"github.com/twirapp/twir/libs/cache/7tv"
Expand All @@ -22,7 +20,6 @@ import (
"github.com/twirapp/twir/libs/grpc/websockets"
seventvintegration "github.com/twirapp/twir/libs/integrations/seventv"
eventsub_framework "github.com/twirapp/twitch-eventsub-framework"
eventsub_bindings "github.com/twirapp/twitch-eventsub-framework/esb"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/trace"
"go.uber.org/fx"
Expand Down Expand Up @@ -69,20 +66,6 @@ type Opts struct {
func New(opts Opts) *Handler {
handler := eventsub_framework.NewSubHandler(true, []byte(opts.Config.TwitchClientSecret))

handler.OnNotification = func(
h *eventsub_bindings.ResponseHeaders,
notification *eventsub_bindings.EventNotification,
) {
if err := opts.Gorm.
Model(&model.EventsubSubscription{}).
Where("id = ?", notification.Subscription.ID).
Update("status", notification.Subscription.Status).
Error; err != nil {
opts.Logger.Error("failed to update subscription", slog.Any("err", err))
return
}
}

myHandler := &Handler{
manager: opts.Manager,
logger: opts.Logger,
Expand All @@ -98,6 +81,7 @@ func New(opts Opts) *Handler {
seventvCache: seventv.New(opts.Redis),
}

handler.OnNotification = myHandler.onNotification
handler.HandleUserAuthorizationRevoke = myHandler.handleUserAuthorizationRevoke
handler.OnRevocate = myHandler.handleSubRevocate

Expand Down
87 changes: 87 additions & 0 deletions apps/eventsub/internal/handler/notification.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package handler

import (
"log/slog"

"github.com/google/uuid"
model "github.com/satont/twir/libs/gomodels"
eventsub_bindings "github.com/twirapp/twitch-eventsub-framework/esb"
"gorm.io/gorm/clause"
)

// order metters
// user_id should be the last
var conditionKeys = []string{
"broadcaster_user_id",
"broadcaster_user_id",
"to_broadcaster_user_id",
"user_id",
}

var knownTopicsEntitiesCache = map[string]model.EventsubTopic{}

func (c *Handler) onNotification(
_ *eventsub_bindings.ResponseHeaders,
notification *eventsub_bindings.EventNotification,
) {
condition, ok := notification.Subscription.Condition.(map[string]any)
if !ok {
c.logger.Error(
"failed to cast condition",
slog.Any("condition", notification.Subscription.Condition),
)
return
}

var userId string
for _, key := range conditionKeys {
if val, ok := condition[key].(string); ok {
userId = val
break
}
}

if userId == "" {
c.logger.Error("failed to find user_id")
return
}

var topicId uuid.UUID
if cachedTopic, topicFound := knownTopicsEntitiesCache[notification.Subscription.Type]; topicFound {
topicId = cachedTopic.ID
} else {
topicEntity := model.EventsubTopic{}
if err := c.gorm.
Where("topic = ?", notification.Subscription.Type).
First(&topicEntity).
Error; err != nil {
c.logger.Error("failed to find topic", slog.Any("err", err))
return
}
knownTopicsEntitiesCache[notification.Subscription.Type] = topicEntity
topicId = topicEntity.ID
}

if topicId == uuid.Nil {
c.logger.Error("failed to find topic_id")
return
}

if err := c.gorm.Clauses(
clause.OnConflict{
Columns: []clause.Column{{Name: "topic_id"}, {Name: "user_id"}},
DoUpdates: clause.Assignments(map[string]interface{}{"status": notification.Subscription.Status}),
},
).Create(
&model.EventsubSubscription{
ID: uuid.New(),
UserID: userId,
TopicID: topicId,
Status: notification.Subscription.Status,
Version: notification.Subscription.Version,
CallbackUrl: notification.Subscription.Transport.Callback,
},
).Error; err != nil {
c.logger.Error("failed to create/update subscription", slog.Any("err", err))
}
}
35 changes: 15 additions & 20 deletions apps/eventsub/internal/handler/revoke.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package handler

import (
"context"
"database/sql"
"log/slog"

model "github.com/satont/twir/libs/gomodels"
eventsub_framework "github.com/twirapp/twitch-eventsub-framework"
"github.com/twirapp/twitch-eventsub-framework/esb"
)

Expand Down Expand Up @@ -53,28 +55,21 @@ func (c *Handler) handleUserAuthorizationRevoke(
}

func (c *Handler) handleSubRevocate(
h *esb.ResponseHeaders,
_ *esb.ResponseHeaders,
revocation *esb.RevocationNotification,
) {
topic := &model.EventsubTopic{}
if err := c.gorm.
Where("topic = ?", revocation.Subscription.Type).
First(topic).Error; err != nil {
c.logger.Error("failed to get topic", slog.Any("err", err))
return
}

subscription := &model.EventsubSubscription{}
if err := c.gorm.
Where("topic_id = ?", topic.ID).
First(subscription).Error; err != nil {
c.logger.Error("failed to get subscription", slog.Any("err", err))
return
}

subscription.Status = revocation.Subscription.Status
c.logger.Info("handleSubRevocate", slog.Any("revocation", revocation))

if err := c.gorm.Save(&subscription).Error; err != nil {
c.logger.Error("failed to delete subscription", slog.Any("err", err))
if revocation.Subscription.Status == "notification_failures_exceeded" {
c.manager.SubscribeWithLimits(
context.Background(),
&eventsub_framework.SubRequest{
Type: revocation.Subscription.Type,
Condition: revocation.Subscription.Condition,
Callback: revocation.Subscription.Transport.Callback,
Secret: c.config.TwitchClientSecret,
Version: revocation.Subscription.Version,
},
)
}
}
Loading

0 comments on commit 256edfb

Please sign in to comment.