Skip to content

Commit

Permalink
refactor: store messages in redis-stack instead of db
Browse files Browse the repository at this point in the history
  • Loading branch information
Satont committed Jun 19, 2024
1 parent 0289210 commit 7d3318e
Show file tree
Hide file tree
Showing 21 changed files with 494 additions and 375 deletions.
1 change: 1 addition & 0 deletions .github/workflows/dockerv3.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ jobs:
- apps/tokens
- apps/websockets
- apps/ytsr
- apps/chat-messages-store
- frontend/dashboard
- frontend/landing
- frontend/overlays
Expand Down
4 changes: 2 additions & 2 deletions apps/bots/internal/bus-listener/bus-listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ func New(opts Opts) (*BusListener, error) {
OnStart: func(ctx context.Context) error {
listener.bus.Bots.SendMessage.SubscribeGroup("bots", listener.sendMessage)
listener.bus.Bots.DeleteMessage.SubscribeGroup("bots", listener.deleteMessage)
listener.bus.Bots.ProcessMessage.SubscribeGroup("bots", listener.handleChatMessage)
listener.bus.ChatMessages.SubscribeGroup("bots", listener.handleChatMessage)
listener.bus.Bots.BanUser.SubscribeGroup("bots", listener.banUser)

return nil
},
OnStop: func(ctx context.Context) error {
listener.bus.Bots.SendMessage.Unsubscribe()
listener.bus.Bots.ProcessMessage.Unsubscribe()
listener.bus.ChatMessages.Unsubscribe()
listener.bus.Bots.DeleteMessage.Unsubscribe()
listener.bus.Bots.BanUser.Unsubscribe()

Expand Down
31 changes: 0 additions & 31 deletions apps/bots/internal/messagehandler/handle_store_message.go

This file was deleted.

1 change: 0 additions & 1 deletion apps/bots/internal/messagehandler/messagehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ var handlersForExecute = []func(
(*MessageHandler).handleGreetings,
(*MessageHandler).handleKeywords,
(*MessageHandler).handleEmotesUsages,
(*MessageHandler).handleStoreMessage,
(*MessageHandler).handleTts,
(*MessageHandler).handleRemoveLurker,
(*MessageHandler).handleModeration,
Expand Down
27 changes: 27 additions & 0 deletions apps/chat-messages-store/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
FROM node:20-alpine as builder
WORKDIR /app
RUN npm i -g pnpm@8

COPY package.json pnpm-lock.yaml pnpm-workspace.yaml .npmrc ./
COPY apps/chat-messages-store /app/apps/chat-messages-store
COPY libs/config /app/libs/config
COPY libs/bus-core /app/libs/bus-core

RUN pnpm prune --prod

FROM node:20-alpine as node_prod_base
WORKDIR /app
RUN apk add wget && \
wget -q -t3 'https://packages.doppler.com/public/cli/rsa.8004D9FF50437357.key' -O /etc/apk/keys/cli@doppler-8004D9FF50437357.rsa.pub && \
echo 'https://packages.doppler.com/public/cli/alpine/any-version/main' | tee -a /etc/apk/repositories && \
apk add doppler && apk del wget && \
rm -rf /var/cache/apk/*
COPY package.json pnpm-lock.yaml pnpm-workspace.yaml .npmrc docker-entrypoint.sh ./
RUN chmod +x docker-entrypoint.sh
RUN npm i -g pnpm@8
ENTRYPOINT ["/app/docker-entrypoint.sh"]

FROM node_prod_base
WORKDIR /app
COPY --from=builder /app /app
CMD ["pnpm", "--filter=@twir/chat-messages-store", "start"]
19 changes: 19 additions & 0 deletions apps/chat-messages-store/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"name": "@twir/chat-messages-store",
"main": "src/index.js",
"type": "module",
"scripts": {
"start": "node src/index.js",
"dev": "node --watch --watch-preserve-output src/index.js"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"@twir/bus-core": "workspace:*",
"@twir/config": "workspace:*",
"nats": "2.22.0",
"redis": "4.6.13",
"redis-om": "0.4.3"
}
}
84 changes: 84 additions & 0 deletions apps/chat-messages-store/src/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { config } from '@twir/config'
import { JSONCodec, connect as natsConnect } from 'nats'
import { createClient } from 'redis'
import { EntityId, Repository, Schema } from 'redis-om'

const redisClient = createClient({
url: config.REDIS_URL,
})
await redisClient.connect()

const schema = new Schema('chat-messages-store:messages', {
message_id: { type: 'string' },
channel_id: { type: 'string' },
user_id: { type: 'string' },
user_login: { type: 'string' },
text: { type: 'text' },
can_be_deleted: { type: 'boolean' },
created_at: { type: 'string' },
})

const repository = new Repository(schema, redisClient)
await repository.createIndex()

const sc = JSONCodec()
const nc = await natsConnect({
servers: config.NATS_URL,
})

const chatMessagesSub = nc.subscribe(
'chat.messages',
{
queue: 'chat-messages-store',
},
)
const chatMessagesStorePub = nc.subscribe(
'chat_messages_store.get_by_text_for_delete',
{ queue: 'chat-messages-store' },
)

const ignoredBadges = ['broadcaster', 'moderator', 'vip', 'subscriber'];

(async () => {
for await (const m of chatMessagesSub) {
const data = sc.decode(m.data)

const canBeDeleted = !data.badges.some(b => ignoredBadges.includes(b.set_id))

const entity = await repository.save({
message_id: data.message_id,
channel_id: data.broadcaster_user_id,
user_id: data.chatter_user_id,
user_login: data.chatter_user_login,
text: data.message.text,
can_be_deleted: canBeDeleted,
created_at: new Date().toISOString(),
})
await repository.expire(entity[EntityId], 60 * 60)
}
})();

(async () => {
for await (const m of chatMessagesStorePub) {
const data = sc.decode(m.data)
if (!data.channel_id || !data.text) {
m.respond(sc.encode({ messages: [] }))
continue
}

const messages = await repository
.search()
.where('channel_id').equal(data.channel_id)
.and('can_be_deleted').equal(true)
.and('text').match(`*${data.text}*`)
.return.all()

m.respond(sc.encode({
messages,
}))
}

// eslint-disable-next-line style/semi
})();

console.info('[chat-messages-store] is running')
2 changes: 1 addition & 1 deletion apps/eventsub/internal/handler/chat_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (c *Handler) handleChannelChatMessage(
ChannelPointsCustomRewardId: event.ChannelPointsCustomRewardID,
}

if err := c.bus.Bots.ProcessMessage.Publish(data); err != nil {
if err := c.bus.ChatMessages.Publish(data); err != nil {
c.logger.Error("cannot handle message", slog.Any("err", err))
}

Expand Down
38 changes: 12 additions & 26 deletions apps/parser/internal/commands/nuke/nuke.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package nuke

import (
"context"
"strings"

"github.com/guregu/null"
"github.com/lib/pq"
command_arguments "github.com/satont/twir/apps/parser/internal/command-arguments"
"github.com/satont/twir/apps/parser/internal/types"
"github.com/twirapp/twir/libs/bus-core/bots"
chat_messages_store "github.com/twirapp/twir/libs/bus-core/chat-messages-store"

model "github.com/satont/twir/libs/gomodels"
)
Expand Down Expand Up @@ -37,34 +37,28 @@ var Command = &types.DefaultCommand{
) {
phrase := parseCtx.ArgsParser.Get(nukePhraseArgName).String()

var messages []model.ChannelChatMessage
err := parseCtx.Services.Gorm.WithContext(ctx).
Where(
`"can_be_deleted" IS TRUE AND text LIKE ? AND "created_at" > NOW() - INTERVAL '60 minutes' AND "channel_id" = ?`,
"%"+strings.ToLower(phrase)+"%",
parseCtx.Channel.ID,
).
Find(&messages).
Error
messages, err := parseCtx.Services.Bus.ChatMessagesStore.GetChatMessagesByTextForDelete.Request(
ctx,
chat_messages_store.GetChatMessagesByTextRequest{
ChannelID: parseCtx.Channel.ID,
Text: phrase,
},
)
if err != nil {
return nil, &types.CommandHandlerError{
Message: "cannot get messages",
Err: err,
}
}

if len(messages) == 0 {
if len(messages.Data.Messages) == 0 {
return nil, nil
}

mappedMessagesIds := make([]string, 0, len(messages))

for _, message := range messages {
if !message.CanBeDeleted {
continue
}
mappedMessagesIds := make([]string, 0, len(messages.Data.Messages))

mappedMessagesIds = append(mappedMessagesIds, message.MessageId)
for _, message := range messages.Data.Messages {
mappedMessagesIds = append(mappedMessagesIds, message.MessageID)
}

if err := parseCtx.Services.Bus.Bots.DeleteMessage.Publish(
Expand All @@ -80,14 +74,6 @@ var Command = &types.DefaultCommand{
}
}

if err = parseCtx.Services.Gorm.WithContext(ctx).Where(`"messageId" IN ?`, mappedMessagesIds).
Delete(&model.ChannelChatMessage{}).Error; err != nil {
return nil, &types.CommandHandlerError{
Message: "cannot delete messages from db",
Err: err,
}
}

return nil, nil
},
}
1 change: 1 addition & 0 deletions cli/internal/cmds/dev/nodejs/nodejs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package nodejs
var appsForStart = []twirApp{
{name: "eval"},
{name: "integrations"},
{name: "chat-messages-store"},
}

type NodejsApps struct {
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ services:
- tsuwari-dev

redis:
image: redis/redis-stack:latest
image: redis/redis-stack-server:latest
restart: always
ports:
- '6385:6379'
Expand Down
25 changes: 20 additions & 5 deletions docker-compose.stack.yml
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,11 @@ services:
test: exit 0

redis:
image: bitnami/redis:latest
image: redis/redis-stack-server:7.2.0-v11
volumes:
- redis-data:/bitnami/redis/data
- redis-stack-data:/data
networks:
- twir
environment:
- ALLOW_EMPTY_PASSWORD=yes
command: /opt/bitnami/scripts/redis/run.sh --protected-mode no
deploy:
restart_policy:
condition: any
Expand Down Expand Up @@ -236,6 +233,23 @@ services:
networks:
- twir

chat-messages-store:
image: twirapp/chat-messages-store:latest
secrets:
- twir_doppler_token
deploy:
update_config:
parallelism: 2
restart_policy:
condition: any
delay: 30s
max_attempts: 30
mode: replicated
replicas: 6
endpoint_mode: dnsrr
networks:
- twir

timers:
image: twirapp/timers:latest
secrets:
Expand Down Expand Up @@ -550,6 +564,7 @@ services:
volumes:
postgres-data:
redis-data:
redis-stack-data:
minio-data:

networks:
Expand Down
12 changes: 8 additions & 4 deletions libs/bus-core/bus-services.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package buscore

import (
botsservice "github.com/twirapp/twir/libs/bus-core/bots"
chat_messages_store "github.com/twirapp/twir/libs/bus-core/chat-messages-store"
emotes_cacher "github.com/twirapp/twir/libs/bus-core/emotes-cacher"
"github.com/twirapp/twir/libs/bus-core/eval"
"github.com/twirapp/twir/libs/bus-core/eventsub"
Expand Down Expand Up @@ -31,10 +32,9 @@ type channelBus struct {
}

type botsBus struct {
ProcessMessage Queue[twitch.TwitchChatMessage, struct{}]
SendMessage Queue[botsservice.SendMessageRequest, struct{}]
DeleteMessage Queue[botsservice.DeleteMessageRequest, struct{}]
BanUser Queue[botsservice.BanRequest, struct{}]
SendMessage Queue[botsservice.SendMessageRequest, struct{}]
DeleteMessage Queue[botsservice.DeleteMessageRequest, struct{}]
BanUser Queue[botsservice.BanRequest, struct{}]
}

type emotesCacherBus struct {
Expand All @@ -60,3 +60,7 @@ type schedulerBus struct {
CreateDefaultCommands Queue[scheduler.CreateDefaultCommandsRequest, struct{}]
CreateDefaultRoles Queue[scheduler.CreateDefaultRolesRequest, struct{}]
}

type chatMessagesStoreBus struct {
GetChatMessagesByTextForDelete Queue[chat_messages_store.GetChatMessagesByTextRequest, chat_messages_store.GetChatMessagesByTextResponse]
}
Loading

0 comments on commit 7d3318e

Please sign in to comment.