Skip to content

Commit

Permalink
feat(api-gql): add ws router for future scaling (#741)
Browse files Browse the repository at this point in the history
* feat(api-gql): add ws router for future scaling

* kruto

* kruto

* kruto

* kruto
  • Loading branch information
Satont committed May 28, 2024
1 parent ad79d5d commit d31eebf
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 66 deletions.
5 changes: 5 additions & 0 deletions apps/api-gql/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
pubclicroutes "github.com/twirapp/twir/apps/api-gql/internal/routes/public"
"github.com/twirapp/twir/apps/api-gql/internal/routes/webhooks"
"github.com/twirapp/twir/apps/api-gql/internal/sessions"
"github.com/twirapp/twir/apps/api-gql/internal/wsrouter"
"github.com/twirapp/twir/libs/baseapp"
buscore "github.com/twirapp/twir/libs/bus-core"
commandscache "github.com/twirapp/twir/libs/cache/commands"
Expand Down Expand Up @@ -38,6 +39,10 @@ func main() {
commandscache.New,
keywordscacher.New,
buscore.NewNatsBusFx("api-gql"),
fx.Annotate(
wsrouter.NewNatsSubscription,
fx.As(new(wsrouter.WsRouter)),
),
subscriptions_store.New,
resolvers.New,
directives.New,
Expand Down
11 changes: 2 additions & 9 deletions apps/api-gql/internal/gql/resolvers/keywords.resolver.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

111 changes: 56 additions & 55 deletions apps/api-gql/internal/gql/resolvers/notifications.resolver.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions apps/api-gql/internal/gql/resolvers/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/satont/twir/libs/twitch"
subscriptions_store "github.com/twirapp/twir/apps/api-gql/internal/gql/subscriptions-store"
"github.com/twirapp/twir/apps/api-gql/internal/sessions"
"github.com/twirapp/twir/apps/api-gql/internal/wsrouter"
bus_core "github.com/twirapp/twir/libs/bus-core"
generic_cacher "github.com/twirapp/twir/libs/cache/generic-cacher"
twitchcahe "github.com/twirapp/twir/libs/cache/twitch"
Expand All @@ -39,6 +40,7 @@ type Resolver struct {
redis *redis.Client
keywordsCacher *generic_cacher.GenericCacher[[]model.ChannelsKeywords]
tokensClient tokens.TokensClient
wsRouter wsrouter.WsRouter
}

type Opts struct {
Expand All @@ -56,6 +58,7 @@ type Opts struct {
Logger logger.Logger
Redis *redis.Client
KeywordsCacher *generic_cacher.GenericCacher[[]model.ChannelsKeywords]
WsRouter wsrouter.WsRouter
}

func New(opts Opts) (*Resolver, error) {
Expand All @@ -78,6 +81,7 @@ func New(opts Opts) (*Resolver, error) {
cachedCommandsClient: opts.CachedCommandsClient,
keywordsCacher: opts.KeywordsCacher,
tokensClient: opts.TokensGrpc,
wsRouter: opts.WsRouter,
}, nil
}

Expand Down
5 changes: 5 additions & 0 deletions apps/api-gql/internal/gql/resolvers/subscriptions.keys.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package resolvers

const (
notificationsSubscriptionKey = "api.newNotifications"
)
76 changes: 76 additions & 0 deletions apps/api-gql/internal/wsrouter/nats_wsrouter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package wsrouter

import (
"github.com/goccy/go-json"
"github.com/nats-io/nats.go"
)

func NewNatsSubscription(opts Opts) (*WsRouterNats, error) {
nc, err := nats.Connect(opts.Config.NatsUrl)
if err != nil {
return nil, err
}

return &WsRouterNats{
nc: nc,
}, nil
}

type WsRouterNats struct {
nc *nats.Conn
}

var _ WsRouter = &WsRouterNats{}

type WsRouterNatsSubscription struct {
subs []*nats.Subscription
dataChann chan []byte
}

func (c *WsRouterNatsSubscription) Unsubscribe() error {
for _, sub := range c.subs {
if err := sub.Unsubscribe(); err != nil {
return err
}
}

return nil
}

func (c *WsRouterNatsSubscription) GetChannel() chan []byte {
return c.dataChann
}

func (c *WsRouterNats) Subscribe(keys []string) (WsRouterSubscription, error) {
ch := make(chan []byte)
subs := make([]*nats.Subscription, 0, len(keys))

for _, key := range keys {
sub, err := c.nc.Subscribe(
key,
func(msg *nats.Msg) {
ch <- msg.Data
},
)

if err != nil {
return nil, err
}

subs = append(subs, sub)
}

return &WsRouterNatsSubscription{
subs: subs,
dataChann: ch,
}, nil
}

func (c *WsRouterNats) Publish(key string, data any) error {
dataBytes, err := json.Marshal(data)
if err != nil {
return err
}

return c.nc.Publish(key, dataBytes)
}
22 changes: 22 additions & 0 deletions apps/api-gql/internal/wsrouter/wsrouter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package wsrouter

import (
config "github.com/satont/twir/libs/config"
"go.uber.org/fx"
)

type Opts struct {
fx.In

Config config.Config
}

type WsRouter interface {
Subscribe(keys []string) (WsRouterSubscription, error)
Publish(key string, data any) error
}

type WsRouterSubscription interface {
GetChannel() chan []byte
Unsubscribe() error
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ const { textareaRef, applyModifier } = useTextarea()
variant="secondary"
@click="notificationsForm.onReset"
>
<template v-if="notificationsForm.editableMessageId">
<template v-if="notificationsForm.editableMessageId.value">
{{ t('sharedButtons.cancel') }}
</template>
<template v-else>
{{ t('sharedButtons.reset') }}
</template>
</Button>
<Button type="submit">
<template v-if="notificationsForm.editableMessageId">
<template v-if="notificationsForm.editableMessageId.value">
{{ t('sharedButtons.edit') }}
</template>
<template v-else>
Expand Down

0 comments on commit d31eebf

Please sign in to comment.