Skip to content

Commit

Permalink
chore(integrations): add donationalerts rate-limiting sliding-window
Browse files Browse the repository at this point in the history
  • Loading branch information
Satont committed May 10, 2024
1 parent 2246439 commit 079f112
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 87 deletions.
2 changes: 2 additions & 0 deletions apps/integrations/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
"nice-grpc": "2.1.8",
"pg": "8.11.5",
"protobufjs": "7.2.6",
"rate-limiter-algorithms": "2.1.0",
"redis": "4.6.13",
"socket.io-client": "2.3.1",
"ws": "8.16.0",
"xmlhttprequest": "1.8.0"
Expand Down
6 changes: 6 additions & 0 deletions apps/integrations/src/libs/redis.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { config } from '@twir/config'
import { createClient } from 'redis'

export const client = await createClient({
url: config.REDIS_URL,
}).connect()
103 changes: 64 additions & 39 deletions apps/integrations/src/services/donationAlerts.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,37 @@
import Centrifuge from 'centrifuge';
import WebSocket from 'ws';
import { setTimeout as sleep } from 'node:timers/promises'

import { onDonation } from '../utils/onDonation.js';
import Centrifuge from 'centrifuge'
import { RateLimiter, RedisStore } from 'rate-limiter-algorithms'
import WebSocket from 'ws'

import { client } from '../libs/redis.js'
import { onDonation } from '../utils/onDonation.js'

export const globalRequestLimiter = new RateLimiter({
store: new RedisStore({
prefix: 'integrations:rla:',
rawCall: (...args) => client.sendCommand(args),
}),
algorithm: 'sliding-window-counter',
limit: 59,
windowMs: 1 * 60 * 1000,
})

export class DonationAlerts {
/**
* @type {Centrifuge | null}
*/
#socket;
#socket
/**
*
* @type {Centrifuge.Subscription | null}
*/
#channel;
#channel

#accessToken;
#donationAlertsUserId;
#socketConnectionToken;
#twitchUserId;
#accessToken
#donationAlertsUserId
#socketConnectionToken
#twitchUserId

/**
*
Expand All @@ -32,64 +46,75 @@ export class DonationAlerts {
socketConnectionToken,
twitchUserId,
) {
this.#accessToken = accessToken;
this.#donationAlertsUserId = donationAlertsUserId;
this.#socketConnectionToken = socketConnectionToken;
this.#twitchUserId = twitchUserId;
this.#accessToken = accessToken
this.#donationAlertsUserId = donationAlertsUserId
this.#socketConnectionToken = socketConnectionToken
this.#twitchUserId = twitchUserId
}

async init() {
this.#socket = new Centrifuge('wss://centrifugo.donationalerts.com/connection/websocket', {
websocket: WebSocket,
onPrivateSubscribe: async (ctx, cb) => {
const request = await fetch('https://www.donationalerts.com/api/v1/centrifuge/subscribe', {
method: 'POST',
body: JSON.stringify(ctx.data),
headers: { Authorization: `Bearer ${this.#accessToken}` },
});

const response = await request.json();
if (!request.ok) {
console.error(response);
cb({ status: request.status, data: {} });
}
while (true) {
const { isAllowed } = await globalRequestLimiter.consume(this.#twitchUserId)
if (!isAllowed) {
await sleep(1000)
continue
}

const request = await fetch('https://www.donationalerts.com/api/v1/centrifuge/subscribe', {
method: 'POST',
body: JSON.stringify(ctx.data),
headers: { Authorization: `Bearer ${this.#accessToken}` },
})

cb({ status: 200, data: { channels: response.channels } });
const response = await request.json()
if (!request.ok) {
console.error(response)
cb({ status: request.status, data: {} })
break
}

cb({ status: 200, data: { channels: response.channels } })
break
}
},
});
})

this.#socket.setToken(this.#socketConnectionToken)
this.#socket.connect()

this.#socket.setToken(this.#socketConnectionToken);
this.#socket.connect();

this.#socket.on('connect', () => {
console.info(`Connected to donationAlerts #${this.#donationAlertsUserId}`);
});
console.info(`Connected to donationAlerts #${this.#donationAlertsUserId}`)
})

this.#channel = this.#socket.subscribe(`$alerts:donation_${this.#donationAlertsUserId}`);
this.#channel = this.#socket.subscribe(`$alerts:donation_${this.#donationAlertsUserId}`)

this.#channel.on('publish', async ({ data }) => this.#donateCallback(data));
this.#channel.on('publish', async ({ data }) => this.#donateCallback(data))

return this;
return this
}

/**
* @param {DonationAlertsMessage} data
*/
async #donateCallback(data) {
console.info(`[DONATIONALERTS #${this.#twitchUserId}] Donation from ${data.username}: ${data.amount} ${data.currency}`)
await onDonation({
twitchUserId: this.#twitchUserId,
amount: data.amount,
currency: data.currency,
message: data.message,
userName: data.username,
});
})
}

async destroy() {
this.#channel?.removeAllListeners()?.unsubscribe();
this.#socket?.removeAllListeners()?.disconnect();
this.#channel?.removeAllListeners()?.unsubscribe()
this.#socket?.removeAllListeners()?.disconnect()

this.#socket = null;
this.#channel = null;
this.#socket = null
this.#channel = null
}
}
108 changes: 60 additions & 48 deletions apps/integrations/src/store/donationAlerts.js
Original file line number Diff line number Diff line change
@@ -1,37 +1,43 @@
import { setTimeout as sleep } from 'timers/promises';
import { setTimeout as sleep } from 'node:timers/promises'

import { db } from '../libs/db.js';
import { DonationAlerts } from '../services/donationAlerts.js';
import { db } from '../libs/db.js'
import { DonationAlerts, globalRequestLimiter } from '../services/donationAlerts.js'

/**
*
* @type {Map<string, DonationAlerts>}
*/
export const donationAlertsStore = new Map();
export const donationAlertsStore = new Map()

/**
*
* @param {Integration} integration
*/
export async function addIntegration(integration) {
if (
!integration.accessToken ||
!integration.refreshToken ||
!integration.integration ||
!integration.integration.clientId ||
!integration.integration.clientSecret
!integration.accessToken
|| !integration.refreshToken
|| !integration.integration
|| !integration.integration.clientId
|| !integration.integration.clientSecret
) {
return;
return
}

if (donationAlertsStore.get(integration.channelId)) {
await removeIntegration(integration.channelId);
await removeIntegration(integration.channelId)
}

let accessToken;
let refreshToken;
let accessToken
let refreshToken

while (true) {
const { isAllowed } = await globalRequestLimiter.consume(integration.id)
if (!isAllowed) {
await sleep(1000)
continue
}

while(true) {
const refresh = await fetch('https://www.donationalerts.com/oauth/token', {
method: 'POST',
headers: {
Expand All @@ -43,77 +49,83 @@ export async function addIntegration(integration) {
client_id: integration.integration.clientId,
client_secret: integration.integration.clientSecret,
}).toString(),
});
})

if (!refresh.ok) {
if (refresh.status === 429) {
await sleep(5000);
continue;
await sleep(1000)
continue
}
console.error('cannot refresh DA tokens:', await refresh.text());
break;
console.error('cannot refresh DA tokens:', await refresh.text())
break
}

const refreshResponse = await refresh.json();
accessToken = refreshResponse.access_token;
refreshToken = refreshResponse.refresh_token;
break;
const refreshResponse = await refresh.json()
accessToken = refreshResponse.access_token
refreshToken = refreshResponse.refresh_token
break
}

if (!accessToken || !refreshToken) {
return;
return
}

await db('channels_integrations').where('id', integration.id).update({
accessToken: accessToken,
refreshToken: refreshToken,
});
accessToken,
refreshToken,
})

let profileData;
let profileData

while (true) {
const { isAllowed } = await globalRequestLimiter.consume(integration.id)
if (!isAllowed) {
await sleep(1000)
continue
}

while(true) {
const request = await fetch('https://www.donationalerts.com/api/v1/user/oauth', {
headers: {
Authorization: `Bearer ${accessToken}`,
},
});
})

if (!request.ok) {
if (request.status === 429) {
await sleep(5000);
continue;
await sleep(1000)
continue
}
console.error('cannot get donationAlerts profile', await request.text());
break;
console.error('cannot get donationAlerts profile', await request.text())
break
}

const response = await request.json();
profileData = response.data;
break;
const response = await request.json()
profileData = response.data
break
}

const { id, socket_connection_token } = profileData;
const { id, socket_connection_token } = profileData
const instance = new DonationAlerts(
accessToken,
id,
socket_connection_token,
integration.channelId,
);
await instance.init();
)
await instance.init()

donationAlertsStore.set(integration.channelId, instance);
donationAlertsStore.set(integration.channelId, instance)

return instance;
return instance
}

/**
*
* @param channelId
*/
export const removeIntegration = async (channelId) => {
const existed = donationAlertsStore.get(channelId);
if (!existed) return;
export async function removeIntegration(channelId) {
const existed = donationAlertsStore.get(channelId)
if (!existed) return

await existed.destroy();
donationAlertsStore.delete(channelId);
};
await existed.destroy()
donationAlertsStore.delete(channelId)
}
11 changes: 11 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 079f112

Please sign in to comment.