From 60ebd2015e3b08128846ce1c659e308da607efe9 Mon Sep 17 00:00:00 2001 From: Nicko Guyer Date: Fri, 3 Nov 2023 13:49:41 -0400 Subject: [PATCH 01/13] Use separate eventstream per namespace Signed-off-by: Nicko Guyer --- .env | 6 +- .vscode/launch.json | 33 +++++ .vscode/settings.json | 4 +- Dockerfile | 8 +- src/event-stream/event-stream.service.ts | 18 ++- .../eventstream-proxy.base.ts | 133 +++++++++++------- .../eventstream-proxy.interfaces.ts | 1 + src/main.ts | 4 - src/tokens/tokens.controller.ts | 2 +- src/tokens/tokens.interfaces.ts | 28 ++++ src/tokens/tokens.service.spec.ts | 38 +++++ src/tokens/tokens.service.ts | 40 ++---- src/websocket-events/websocket-events.base.ts | 8 ++ test/app.e2e-context.ts | 13 ++ test/suites/erc20.ts | 107 +++----------- test/suites/erc721.ts | 97 ++----------- test/suites/websocket.ts | 129 +++++++++++------ 17 files changed, 363 insertions(+), 306 deletions(-) create mode 100644 .vscode/launch.json diff --git a/.env b/.env index 2bbb638..5e4d496 100644 --- a/.env +++ b/.env @@ -1,7 +1,7 @@ PORT=3000 ETHCONNECT_URL=http://127.0.0.1:5102 -ETHCONNECT_TOPIC=token -FACTORY_CONTRACT_ADDRESS= -AUTO_INIT=true +ETHCONNECT_TOPIC=tokens_local +FACTORY_CONTRACT_ADDRESS="0xd85b3fba5552c48389607954e042e7313a9aec6e" +AUTO_INIT=false USE_LEGACY_ERC20_SAMPLE=false USE_LEGACY_ERC721_SAMPLE=false diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..1377495 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,33 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Run Tests", + "runtimeExecutable": "npm", + "args": ["run", "test"], + "request": "launch", + "type": "node", + "outputCapture": "std" + }, + { + "name": "Run E2E Tests", + "runtimeExecutable": "npm", + "args": ["run", "test:e2e"], + "request": "launch", + "type": "node", + "outputCapture": "std" + }, + { + "type": "node", + "request": "launch", + "name": "Launch Program", + "skipFiles": ["/**"], + "program": "${file}", + "preLaunchTask": "tsc: build - tsconfig.json", + "outFiles": ["${workspaceFolder}/dist/**/*.js"] + } + ] +} diff --git a/.vscode/settings.json b/.vscode/settings.json index 108b279..4fb545a 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -5,7 +5,5 @@ }, "eslint.validate": ["javascript"], "solidity.defaultCompiler": "remote", - "cSpell.words": [ - "fftm" - ] + "cSpell.words": ["eventstream", "fftm"] } diff --git a/Dockerfile b/Dockerfile index 2129032..126786d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM node:16-alpine3.15 as build +FROM node:20-alpine3.17 as build USER node WORKDIR /home/node ADD --chown=node:node package*.json ./ @@ -6,7 +6,7 @@ RUN npm install ADD --chown=node:node . . RUN npm run build -FROM node:16-alpine3.15 as solidity-build +FROM node:20-alpine3.17 as solidity-build RUN apk add python3 alpine-sdk USER node WORKDIR /home/node @@ -15,7 +15,7 @@ RUN npm install ADD --chown=node:node ./samples/solidity . RUN npx hardhat compile -FROM node:16-alpine3.15 +FROM node:20-alpine3.17 RUN apk add curl jq RUN mkdir -p /app/contracts/source \ && chgrp -R 0 /app/ \ @@ -31,6 +31,8 @@ COPY --from=solidity-build --chown=1001:0 /home/node/contracts /home/node/packag RUN npm install --production WORKDIR /app/contracts COPY --from=solidity-build --chown=1001:0 /home/node/artifacts/contracts/TokenFactory.sol/TokenFactory.json ./ +# We also need to keep copying it to the old location to maintain compatibility with the FireFly CLI +COPY --from=solidity-build --chown=1001:0 /home/node/artifacts/contracts/TokenFactory.sol/TokenFactory.json /home/node/contracts/ WORKDIR /app COPY --from=build --chown=1001:0 /home/node/dist ./dist COPY --from=build --chown=1001:0 /home/node/package.json /home/node/package-lock.json ./ diff --git a/src/event-stream/event-stream.service.ts b/src/event-stream/event-stream.service.ts index 45a8cc1..0917e02 100644 --- a/src/event-stream/event-stream.service.ts +++ b/src/event-stream/event-stream.service.ts @@ -20,7 +20,7 @@ import { AxiosRequestConfig } from 'axios'; import { lastValueFrom } from 'rxjs'; import WebSocket from 'ws'; import { FFRequestIDHeader } from '../request-context/constants'; -import { Context } from '../request-context/request-context.decorator'; +import { Context, newContext } from '../request-context/request-context.decorator'; import { IAbiMethod } from '../tokens/tokens.interfaces'; import { getHttpRequestOptions, getWebsocketOptions } from '../utils'; import { @@ -46,6 +46,7 @@ export class EventStreamSocket { constructor( private url: string, private topic: string, + private namespace: string, private username: string, private password: string, private handleEvents: (events: EventBatch) => void, @@ -67,7 +68,7 @@ export class EventStreamSocket { } else { this.logger.log('Event stream websocket connected'); } - this.produce({ type: 'listen', topic: this.topic }); + this.produce({ type: 'listen', topic: `${this.topic}/${this.namespace}` }); this.produce({ type: 'listenreplies' }); this.ping(); }) @@ -109,7 +110,11 @@ export class EventStreamSocket { } ack(batchNumber: number | undefined) { - this.produce({ type: 'ack', topic: this.topic, batchNumber }); + this.produce({ type: 'ack', topic: `${this.topic}/${this.namespace}`, batchNumber }); + } + + nack(batchNumber: number | undefined) { + this.produce({ type: 'nack', topic: `${this.topic}/${this.namespace}`, batchNumber }); } close() { @@ -331,15 +336,20 @@ export class EventStreamService { return true; } - connect( + async connect( url: string, topic: string, + namespace: string, handleEvents: (events: EventBatch) => void, handleReceipt: (receipt: EventStreamReply) => void, ) { + const name = `${topic}/${namespace}`; + await this.createOrUpdateStream(newContext(), name, topic); + return new EventStreamSocket( url, topic, + namespace, this.username, this.password, handleEvents, diff --git a/src/eventstream-proxy/eventstream-proxy.base.ts b/src/eventstream-proxy/eventstream-proxy.base.ts index e708fbd..2d3aeec 100644 --- a/src/eventstream-proxy/eventstream-proxy.base.ts +++ b/src/eventstream-proxy/eventstream-proxy.base.ts @@ -18,12 +18,14 @@ import { Logger } from '@nestjs/common'; import { MessageBody, SubscribeMessage } from '@nestjs/websockets'; import { v4 as uuidv4 } from 'uuid'; import { Context, newContext } from '../request-context/request-context.decorator'; -import { EventBatch, EventStreamReply } from '../event-stream/event-stream.interfaces'; +import { EventBatch, EventStream, EventStreamReply } from '../event-stream/event-stream.interfaces'; import { EventStreamService, EventStreamSocket } from '../event-stream/event-stream.service'; import { + WebSocketActionBase, WebSocketEventsBase, WebSocketEx, WebSocketMessage, + WebSocketStart, } from '../websocket-events/websocket-events.base'; import { AckMessageData, @@ -40,20 +42,20 @@ import { * @WebSocketGateway({ path: '/api/stream' }) */ export abstract class EventStreamProxyBase extends WebSocketEventsBase { - socket?: EventStreamSocket; + namespaceClients: Map> = new Map(); + namespaceEventStreamSocket: Map = new Map(); url?: string; topic?: string; private connectListeners: ConnectionListener[] = []; private eventListeners: EventListener[] = []; private awaitingAck: WebSocketMessageWithId[] = []; - private currentClient: WebSocketEx | undefined; private subscriptionNames = new Map(); private queue = Promise.resolve(); constructor( protected readonly logger: Logger, - protected eventstream: EventStreamService, + protected eventStreamService: EventStreamService, requireAuth = false, ) { super(logger, requireAuth); @@ -66,55 +68,75 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { handleConnection(client: WebSocketEx) { super.handleConnection(client); - if (this.server.clients.size === 1) { - this.logger.log(`Initializing event stream proxy`); - Promise.all(this.connectListeners.map(l => l.onConnect())) - .then(() => { - this.setCurrentClient(client); - this.startListening(); - }) - .catch(err => { - this.logger.error(`Error initializing event stream proxy: ${err}`); - }); - } + client.on('message', (message: string) => { + const action = JSON.parse(message) as WebSocketActionBase; + switch (action.type) { + case 'start': + const startAction = action as WebSocketStart; + this.startListening(client, startAction.namespace); + break; + } + }); } private queueTask(task: () => void) { this.queue = this.queue.finally(task); } - private startListening() { + private async startListening(client: WebSocketEx, namespace: string) { if (this.url === undefined || this.topic === undefined) { return; } - this.socket = this.eventstream.connect( - this.url, - this.topic, - events => { - this.queueTask(() => this.processEvents(events)); - }, - receipt => { - this.broadcast('receipt', receipt); - }, - ); + try { + if (!this.namespaceEventStreamSocket.has(namespace)) { + const eventStreamSocket = await this.eventStreamService.connect( + this.url, + this.topic, + namespace, + events => { + this.queueTask(() => this.processEvents(events, namespace)); + }, + receipt => { + this.broadcast('receipt', receipt); + }, + ); + this.namespaceEventStreamSocket.set(namespace, eventStreamSocket); + } + let clientSet = this.namespaceClients.get(namespace); + if (!clientSet) { + clientSet = new Set(); + } + clientSet.add(client); + this.namespaceClients.set(namespace, clientSet); + } catch (e) { + this.logger.error(`Error connecting to event stream websocket: ${e.message}`); + } } handleDisconnect(client: WebSocketEx) { super.handleDisconnect(client); - if (this.server.clients.size === 0) { - this.stopListening(); - } else if (client.id === this.currentClient?.id) { - for (const newClient of this.server.clients) { - this.setCurrentClient(newClient as WebSocketEx); - break; - } - } - } - private stopListening() { - this.socket?.close(); - this.socket = undefined; - this.currentClient = undefined; + // Iterate over all the namespaces this client was subscribed to + this.namespaceClients.forEach((clientSet, namespace) => { + clientSet.delete(client); + + // Nack any messages that are inflight for that namespace + const nackedMessageIds: Set = new Set(); + this.awaitingAck + .filter(msg => msg.namespace === namespace) + .map(msg => { + this.namespaceEventStreamSocket.get(namespace)?.nack(msg.batchNumber); + nackedMessageIds.add(msg.id); + }); + this.awaitingAck = this.awaitingAck.filter(msg => nackedMessageIds.has(msg.id)); + + // If all clients for this namespace have disconnected, also close the connection to EVMConnect + if (clientSet.size == 0) { + this.namespaceEventStreamSocket.get(namespace)?.close(); + this.namespaceEventStreamSocket.delete(namespace); + this.namespaceClients.delete(namespace); + } + }); } addConnectionListener(listener: ConnectionListener) { @@ -125,7 +147,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { this.eventListeners.push(listener); } - private async processEvents(batch: EventBatch) { + private async processEvents(batch: EventBatch, namespace: string) { const messages: WebSocketMessage[] = []; const eventHandlers: Promise[] = []; for (const event of batch.events) { @@ -156,6 +178,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { } } const message: WebSocketMessageWithId = { + namespace: namespace, id: uuidv4(), event: 'batch', data: { @@ -164,7 +187,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { batchNumber: batch.batchNumber, }; this.awaitingAck.push(message); - this.currentClient?.send(JSON.stringify(message)); + this.send(namespace, JSON.stringify(message)); } private async getSubscriptionName(ctx: Context, subId: string) { @@ -174,7 +197,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { } try { - const sub = await this.eventstream.getSubscription(ctx, subId); + const sub = await this.eventStreamService.getSubscription(ctx, subId); if (sub !== undefined) { this.subscriptionNames.set(subId, sub.name); return sub.name; @@ -185,13 +208,6 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { return undefined; } - private setCurrentClient(client: WebSocketEx) { - this.currentClient = client; - for (const message of this.awaitingAck) { - this.currentClient.send(JSON.stringify(message)); - } - } - @SubscribeMessage('ack') handleAck(@MessageBody() data: AckMessageData) { if (data.id === undefined) { @@ -201,7 +217,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { const inflight = this.awaitingAck.find(msg => msg.id === data.id); this.logger.log(`Received ack ${data.id} inflight=${!!inflight}`); - if (this.socket !== undefined && inflight !== undefined) { + if (this.namespaceEventStreamSocket !== undefined && inflight !== undefined) { this.awaitingAck = this.awaitingAck.filter(msg => msg.id !== data.id); if ( // If nothing is left awaiting an ack - then we clearly need to ack @@ -212,7 +228,22 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { !this.awaitingAck.find(msg => msg.batchNumber === inflight.batchNumber)) ) { this.logger.log(`In-flight batch complete (batchNumber=${inflight.batchNumber})`); - this.socket.ack(inflight.batchNumber); + this.namespaceEventStreamSocket.get(inflight.namespace)?.ack(inflight.batchNumber); + } + } + } + + send(namespace, payload: string) { + const clients = this.namespaceClients.get(namespace); + if (clients) { + // Randomly select a connected client for this namespace to distribute load + const selected = Math.floor(Math.random() * clients.size); + let i = 0; + for (let client of clients.keys()) { + if (i++ == selected) { + client.send(payload); + return; + } } } } diff --git a/src/eventstream-proxy/eventstream-proxy.interfaces.ts b/src/eventstream-proxy/eventstream-proxy.interfaces.ts index 20bfca3..8588292 100644 --- a/src/eventstream-proxy/eventstream-proxy.interfaces.ts +++ b/src/eventstream-proxy/eventstream-proxy.interfaces.ts @@ -30,6 +30,7 @@ export interface EventListener { } export interface WebSocketMessageWithId extends WebSocketMessage { + namespace: string; id: string; batchNumber: number | undefined; } diff --git a/src/main.ts b/src/main.ts index 549da08..6af6237 100644 --- a/src/main.ts +++ b/src/main.ts @@ -107,10 +107,6 @@ async function bootstrap() { .configure(ethConnectUrl, fftmUrl, username, password, passthroughHeaders, blockchainRetryCfg); app.get(AbiMapperService).configure(legacyERC20, legacyERC721); - if (autoInit.toLowerCase() !== 'false') { - await app.get(TokensService).init(newContext()); - } - const port = config.get('PORT', 3000); console.log(`Listening on port ${port}`); await app.listen(port); diff --git a/src/tokens/tokens.controller.ts b/src/tokens/tokens.controller.ts index 2daeeb2..356e96b 100644 --- a/src/tokens/tokens.controller.ts +++ b/src/tokens/tokens.controller.ts @@ -42,7 +42,7 @@ export class TokensController { @HttpCode(204) @ApiOperation({ summary: 'Perform one-time initialization (if not auto-initialized)' }) init(@RequestContext() ctx: Context) { - return this.service.init(ctx); + // Do nothing. Endpoint retained for backwards compatibility with older tooling. } @Post('createpool') diff --git a/src/tokens/tokens.interfaces.ts b/src/tokens/tokens.interfaces.ts index d174053..8d9fa99 100644 --- a/src/tokens/tokens.interfaces.ts +++ b/src/tokens/tokens.interfaces.ts @@ -139,6 +139,10 @@ export class TokenPoolConfig { } export class TokenPool { + @ApiProperty() + @IsNotEmpty() + namespace: string; + @ApiProperty({ enum: TokenType }) @IsEnum(TokenType) type: TokenType; @@ -213,6 +217,10 @@ export class BlockchainEvent { } export class TokenPoolActivate { + @ApiProperty() + @IsNotEmpty() + namespace: string; + @ApiProperty() @IsNotEmpty() poolLocator: string; @@ -227,6 +235,10 @@ export class TokenPoolActivate { } export class TokenPoolDeactivate { + @ApiProperty() + @IsNotEmpty() + namespace: string; + @ApiProperty() @IsNotEmpty() poolLocator: string; @@ -271,6 +283,10 @@ export class CheckInterfaceResponse implements TokenAbi { } export class TokenTransfer { + @ApiProperty() + @IsNotEmpty() + namespace: string; + @ApiProperty() @IsNotEmpty() poolLocator: string; @@ -330,6 +346,10 @@ export class TokenApprovalConfig { } export class TokenApproval { + @ApiProperty() + @IsNotEmpty() + namespace: string; + @ApiProperty() @IsNotEmpty() poolLocator: string; @@ -366,6 +386,10 @@ export class TokenApproval { // Websocket notifications class tokenEventBase { + @ApiProperty() + @IsNotEmpty() + namespace: string; + @ApiProperty() poolLocator: string; @@ -391,6 +415,10 @@ export class TokenPoolEventInfo { } export class TokenPoolEvent extends tokenEventBase { + @ApiProperty() + @IsNotEmpty() + namespace: string; + @ApiProperty() type: TokenType; diff --git a/src/tokens/tokens.service.spec.ts b/src/tokens/tokens.service.spec.ts index 8f70a35..54e888e 100644 --- a/src/tokens/tokens.service.spec.ts +++ b/src/tokens/tokens.service.spec.ts @@ -230,6 +230,7 @@ describe('TokensService', () => { useValue: { addConnectionListener: jest.fn(), addEventListener: jest.fn(), + configure: jest.fn(), }, }, ], @@ -263,6 +264,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenPool = { + namespace: 'ns1', type: TokenType.FUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -276,6 +278,7 @@ describe('TokensService', () => { await service.createPool(ctx, request).then(resp => { expect(resp).toEqual({ + namespace: 'ns1', data: `{"tx":${TX}}`, poolLocator: ERC20_NO_DATA_POOL_ID, standard: 'ERC20', @@ -296,6 +299,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenPoolActivate = { + namespace: 'ns1', poolLocator: ERC20_NO_DATA_POOL_ID, poolData: 'ns1', }; @@ -306,6 +310,7 @@ describe('TokensService', () => { }; const response: TokenPoolEvent = { + namespace: 'ns1', poolData: 'ns1', poolLocator: ERC20_NO_DATA_POOL_ID, standard: 'ERC20', @@ -350,6 +355,7 @@ describe('TokensService', () => { }; const request: TokenMint = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_NO_DATA_POOL_ID, @@ -385,6 +391,7 @@ describe('TokensService', () => { }; const request: TokenTransfer = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_NO_DATA_POOL_ID, @@ -421,6 +428,7 @@ describe('TokensService', () => { }; const request: TokenBurn = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_NO_DATA_POOL_ID, @@ -455,6 +463,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenPool = { + namespace: 'ns1', type: TokenType.FUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -468,6 +477,7 @@ describe('TokensService', () => { await service.createPool(ctx, request).then(resp => { expect(resp).toEqual({ + namespace: 'ns1', data: `{"tx":${TX}}`, poolLocator: ERC20_WITH_DATA_POOL_ID, standard: 'ERC20', @@ -488,6 +498,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenPool = { + namespace: 'ns1', type: TokenType.FUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -501,6 +512,7 @@ describe('TokensService', () => { await service.createPool(ctx, request).then(resp => { expect(resp).toEqual({ + namespace: 'ns1', data: `{"tx":${TX}}`, poolLocator: ERC20_WITH_DATA_POOL_ID, standard: 'ERC20', @@ -521,6 +533,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenPoolActivate = { + namespace: 'ns1', poolLocator: ERC20_WITH_DATA_POOL_ID, poolData: 'ns1', }; @@ -531,6 +544,7 @@ describe('TokensService', () => { }; const response: TokenPoolEvent = { + namespace: 'ns1', poolData: 'ns1', poolLocator: ERC20_WITH_DATA_POOL_ID, standard: 'ERC20', @@ -575,6 +589,7 @@ describe('TokensService', () => { }; const request: TokenMint = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_WITH_DATA_POOL_ID, @@ -610,6 +625,7 @@ describe('TokensService', () => { }; const request: TokenTransfer = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_WITH_DATA_POOL_ID, @@ -646,6 +662,7 @@ describe('TokensService', () => { }; const request: TokenBurn = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_WITH_DATA_POOL_ID, @@ -680,6 +697,7 @@ describe('TokensService', () => { it('should return ERC721NoData pool details successfully', async () => { const request: TokenPool = { + namespace: 'ns1', type: TokenType.NONFUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -695,6 +713,7 @@ describe('TokensService', () => { await service.createPool(ctx, request).then(resp => { expect(resp).toEqual({ + namespace: 'ns1', data: `{"tx":${TX}}`, poolLocator: ERC721_NO_DATA_POOL_ID, standard: 'ERC721', @@ -715,6 +734,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenPoolActivate = { + namespace: 'ns1', poolLocator: ERC721_NO_DATA_POOL_ID, poolData: 'ns1', }; @@ -725,6 +745,7 @@ describe('TokensService', () => { }; const response: TokenPoolEvent = { + namespace: 'ns1', poolData: 'ns1', poolLocator: ERC721_NO_DATA_POOL_ID, standard: 'ERC721', @@ -767,6 +788,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenMint = { + namespace: 'ns1', amount: '2', signer: IDENTITY, poolLocator: ERC721_NO_DATA_POOL_ID, @@ -784,6 +806,7 @@ describe('TokensService', () => { }; const request: TokenMint = { + namespace: 'ns1', signer: IDENTITY, poolLocator: ERC721_NO_DATA_POOL_ID, to: '0x123', @@ -818,6 +841,7 @@ describe('TokensService', () => { }; const request: TokenTransfer = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_NO_DATA_POOL_ID, @@ -856,6 +880,7 @@ describe('TokensService', () => { }; const request: TokenBurn = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_NO_DATA_POOL_ID, @@ -890,6 +915,7 @@ describe('TokensService', () => { it('should return ERC721WithData pool details successfully - implicit withData config', async () => { const request: TokenPool = { + namespace: 'ns1', type: TokenType.NONFUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -904,6 +930,7 @@ describe('TokensService', () => { await service.createPool(ctx, request).then(resp => { expect(resp).toEqual({ + namespace: 'ns1', data: `{"tx":${TX}}`, poolLocator: ERC721_WITH_DATA_POOL_ID, standard: 'ERC721', @@ -924,6 +951,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenPoolActivate = { + namespace: 'ns1', poolLocator: ERC721_WITH_DATA_POOL_ID, poolData: 'ns1', }; @@ -934,6 +962,7 @@ describe('TokensService', () => { }; const response: TokenPoolEvent = { + namespace: 'ns1', poolData: 'ns1', poolLocator: ERC721_WITH_DATA_POOL_ID, standard: 'ERC721', @@ -975,6 +1004,7 @@ describe('TokensService', () => { it('should not mint ERC721WithData token due to invalid amount', async () => { const ctx = newContext(); const request: TokenMint = { + namespace: 'ns1', amount: '2', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_POOL_ID, @@ -992,6 +1022,7 @@ describe('TokensService', () => { }; const request: TokenMint = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_V1_POOL_ID, @@ -1030,6 +1061,7 @@ describe('TokensService', () => { }; const request: TokenMint = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_V1_POOL_ID, @@ -1069,6 +1101,7 @@ describe('TokensService', () => { }; const request: TokenMint = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_V1_POOL_ID, @@ -1112,6 +1145,7 @@ describe('TokensService', () => { }; const request: TokenMint = { + namespace: 'ns1', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_POOL_ID, to: '0x123', @@ -1149,6 +1183,7 @@ describe('TokensService', () => { }; const request: TokenTransfer = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_POOL_ID, @@ -1185,6 +1220,7 @@ describe('TokensService', () => { }; const request: TokenBurn = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_POOL_ID, @@ -1219,6 +1255,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenPoolActivate = { + namespace: 'ns1', poolLocator: 'address=0x123&standard=notAStandard&type=fungible', poolData: 'ns1', }; @@ -1231,6 +1268,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenPoolActivate = { + namespace: 'ns1', poolLocator: 'address=0x123&type=fungible', poolData: 'ns1', }; diff --git a/src/tokens/tokens.service.ts b/src/tokens/tokens.service.ts index b82c961..d033351 100644 --- a/src/tokens/tokens.service.ts +++ b/src/tokens/tokens.service.ts @@ -71,7 +71,6 @@ export class TokensService { baseUrl: string; topic: string; - stream: EventStream; factoryAddress = ''; constructor( @@ -87,33 +86,25 @@ export class TokensService { this.factoryAddress = factoryAddress.toLowerCase(); this.proxy.addConnectionListener(this); this.proxy.addEventListener(new TokenListener(this.mapper, this.blockchain)); + const wsUrl = new URL('/ws', this.baseUrl.replace('http', 'ws')).href; + this.proxy.configure(wsUrl, this.topic); } async onConnect() { const wsUrl = new URL('/ws', this.baseUrl.replace('http', 'ws')).href; - const stream = await this.getStream(newContext()); - this.proxy.configure(wsUrl, stream.name); - } - - /** - * One-time initialization of event stream and base subscription. - */ - async init(ctx: Context) { - this.stream = await this.getStream(ctx); - if (this.factoryAddress !== '') { - await this.createFactorySubscription(ctx, this.factoryAddress); - } + this.proxy.configure(wsUrl, this.topic); } - private async createFactorySubscription(ctx: Context, address: string) { + private async getOrCreateFactorySubscription(ctx: Context, address: string, namespace) { const eventABI = this.mapper.getCreateEvent(); const methodABI = this.mapper.getCreateMethod(); - if (eventABI !== undefined && methodABI !== undefined) { + const stream = await this.getStream(ctx, namespace); + if (eventABI !== undefined && methodABI !== undefined && stream !== undefined) { await this.eventstream.getOrCreateSubscription( ctx, this.baseUrl, eventABI, - this.stream.id, + stream.id, packSubscriptionName(address, eventABI.name), address, [methodABI], @@ -122,15 +113,10 @@ export class TokensService { } } - private async getStream(ctx: Context) { - const stream = this.stream; - if (stream !== undefined) { - return stream; - } + private async getStream(ctx: Context, namespace: string) { await this.migrationCheck(ctx); this.logger.log('Creating stream with name ' + this.topic); - this.stream = await this.eventstream.createOrUpdateStream(ctx, this.topic, this.topic); - return this.stream; + return this.eventstream.createOrUpdateStream(ctx, `${this.topic}/${namespace}`, this.topic); } /** @@ -276,6 +262,7 @@ export class TokensService { } return { + namespace: dto.namespace, data: dto.data, poolLocator: packPoolLocator(poolLocator), standard: dto.type === TokenType.FUNGIBLE ? 'ERC20' : 'ERC721', @@ -296,7 +283,7 @@ export class TokensService { address: string, dto: TokenPool, ): Promise { - await this.createFactorySubscription(ctx, address); + await this.getOrCreateFactorySubscription(ctx, address, dto.namespace); const { method, params } = await this.mapper.getCreateMethodAndParams(ctx, address, dto); const response = await this.blockchain.sendTransaction( ctx, @@ -347,7 +334,7 @@ export class TokensService { poolLocator.type === TokenType.FUNGIBLE, ); const eventAbis = this.getEventAbis(poolLocator); - const stream = await this.getStream(ctx); + const stream = await this.getStream(ctx, dto.namespace); const promises = [ this.eventstream.getOrCreateSubscription( @@ -389,6 +376,7 @@ export class TokensService { const poolInfo = await this.queryPool(ctx, poolLocator); const tokenPoolEvent: TokenPoolEvent = { + namespace: dto.namespace, poolData: dto.poolData, poolLocator: dto.poolLocator, standard: poolLocator.type === TokenType.FUNGIBLE ? 'ERC20' : 'ERC721', @@ -412,7 +400,7 @@ export class TokensService { throw new BadRequestException('Invalid pool locator'); } - const stream = await this.getStream(ctx); + const stream = await this.getStream(ctx, dto.namespace); const eventAbis = this.getEventAbis(poolLocator); const promises = [ this.eventstream.deleteSubscriptionByName( diff --git a/src/websocket-events/websocket-events.base.ts b/src/websocket-events/websocket-events.base.ts index b25efa7..8b71689 100644 --- a/src/websocket-events/websocket-events.base.ts +++ b/src/websocket-events/websocket-events.base.ts @@ -50,6 +50,14 @@ export interface WebSocketMessage { data: any; } +export interface WebSocketActionBase { + type: 'start' | 'ack' | 'nack' | 'protocol_error'; +} + +export interface WebSocketStart extends WebSocketActionBase { + namespace: string; +} + /** * Base class for websocket gateways. * diff --git a/test/app.e2e-context.ts b/test/app.e2e-context.ts index 49a1f52..d17b7f4 100644 --- a/test/app.e2e-context.ts +++ b/test/app.e2e-context.ts @@ -27,16 +27,28 @@ export class TestContext { }; eventHandler: (events: EventBatch) => void; receiptHandler: (receipt: EventStreamReply) => void; + connected: Promise; + private resolveConnected: () => void; + private rejectConnected: () => void; + + resetConnectedPromise() { + this.connected = new Promise((resolve, reject) => { + this.resolveConnected = resolve; + this.rejectConnected = reject; + }); + } eventstream = { connect: ( url: string, topic: string, + namespace: string, handleEvents: (events: EventBatch) => void, handleReceipt: (receipt: EventStreamReply) => void, ) => { this.eventHandler = handleEvents; this.receiptHandler = handleReceipt; + this.resolveConnected(); }, getStreams: jest.fn(), @@ -85,6 +97,7 @@ export class TestContext { (this.app.getHttpServer() as Server).listen(); this.server = request(this.app.getHttpServer()); + this.resetConnectedPromise(); } async end() { diff --git a/test/suites/erc20.ts b/test/suites/erc20.ts index 65568ee..0ff305c 100644 --- a/test/suites/erc20.ts +++ b/test/suites/erc20.ts @@ -89,10 +89,10 @@ export default (context: TestContext) => { }), ); }; - describe('ERC20WithData', () => { it('Create pool - unrecognized fields', async () => { const request = { + namespace: 'ns1', type: TokenType.FUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -102,8 +102,8 @@ export default (context: TestContext) => { symbol: SYMBOL, isBestPool: true, // will be stripped but will not cause an error }; - const expectedResponse = expect.objectContaining({ + namespace: 'ns1', data: `{"tx":${TX}}`, poolLocator: `address=${CONTRACT_ADDRESS}&schema=${ERC20_WITH_DATA_SCHEMA}&type=${TokenType.FUNGIBLE}`, standard: 'ERC20', @@ -116,16 +116,14 @@ export default (context: TestContext) => { schema: ERC20_WITH_DATA_SCHEMA, }, }); - mockPoolQuery(true); context.http.get = jest.fn(() => new FakeObservable(expectedResponse)); - const response = await context.server.post('/createpool').send(request).expect(200); expect(response.body).toEqual(expectedResponse); }); - it('Create pool - invalid type', async () => { const request: TokenPool = { + namespace: 'ns1', type: 'funkible' as TokenType, requestId: REQUEST, signer: IDENTITY, @@ -134,19 +132,17 @@ export default (context: TestContext) => { name: NAME, symbol: SYMBOL, }; - const response = { statusCode: 400, message: ['type must be a valid enum value'], error: 'Bad Request', }; - context.http.post = jest.fn(() => new FakeObservable(response)); await context.server.post('/createpool').send(request).expect(400).expect(response); }); - it('Create pool - correct fields', async () => { const request: TokenPool = { + namespace: 'ns1', type: TokenType.FUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -155,8 +151,8 @@ export default (context: TestContext) => { name: NAME, symbol: SYMBOL, }; - const expectedResponse = expect.objectContaining({ + namespace: 'ns1', data: `{"tx":${TX}}`, poolLocator: `address=${CONTRACT_ADDRESS}&schema=${ERC20_WITH_DATA_SCHEMA}&type=${TokenType.FUNGIBLE}`, standard: 'ERC20', @@ -169,22 +165,19 @@ export default (context: TestContext) => { schema: ERC20_WITH_DATA_SCHEMA, }, }); - mockPoolQuery(true); context.http.get = jest.fn(() => new FakeObservable(expectedResponse)); - const response = await context.server.post('/createpool').send(request).expect(200); expect(response.body).toEqual(expectedResponse); }); - it('Mint token', async () => { const request: TokenMint = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_WITH_DATA_POOL_ID, to: '0x123', }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -194,29 +187,24 @@ export default (context: TestContext) => { method: ERC20WithDataABI.abi.find(abi => abi.name === MINT_WITH_DATA) as IAbiMethod, params: ['0x123', '20', '0x00'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/mint').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Transfer token', async () => { const request: TokenTransfer = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_WITH_DATA_POOL_ID, to: '0x123', from: IDENTITY, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -226,28 +214,23 @@ export default (context: TestContext) => { method: ERC20WithDataABI.abi.find(abi => abi.name === TRANSFER_WITH_DATA) as IAbiMethod, params: [IDENTITY, '0x123', '20', '0x00'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/transfer').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Burn token', async () => { const request: TokenBurn = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_WITH_DATA_POOL_ID, from: IDENTITY, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -257,29 +240,24 @@ export default (context: TestContext) => { method: ERC20WithDataABI.abi.find(abi => abi.name === BURN_WITH_DATA) as IAbiMethod, params: [IDENTITY, '20', '0x00'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/burn').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Token approval', async () => { const request: TokenApproval = { + namespace: 'ns1', poolLocator: ERC20_WITH_DATA_POOL_ID, signer: IDENTITY, operator: '2', approved: true, config: { allowance: '100' }, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -289,24 +267,20 @@ export default (context: TestContext) => { method: ERC20WithDataABI.abi.find(abi => abi.name === APPROVE_WITH_DATA) as IAbiMethod, params: ['2', '100', '0x00'], }; - const response: EthConnectAsyncResponse = { id: '1', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/approval').send(request).expect(202).expect({ id: '1' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); }); - describe('ERC20NoData', () => { it('Create pool - unrecognized fields', async () => { const request = { + namespace: 'ns1', type: TokenType.FUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -316,8 +290,8 @@ export default (context: TestContext) => { symbol: SYMBOL, isBestPool: true, // will be stripped but will not cause an error }; - const expectedResponse = expect.objectContaining({ + namespace: 'ns1', data: `{"tx":${TX}}`, poolLocator: `address=${CONTRACT_ADDRESS}&schema=${ERC20_NO_DATA_SCHEMA}&type=${TokenType.FUNGIBLE}`, standard: 'ERC20', @@ -329,16 +303,14 @@ export default (context: TestContext) => { schema: ERC20_NO_DATA_SCHEMA, }, }); - mockPoolQuery(false); context.http.get = jest.fn(() => new FakeObservable(expectedResponse)); - const response = await context.server.post('/createpool').send(request).expect(200); expect(response.body).toEqual(expectedResponse); }); - it('Create pool - invalid type', async () => { const request: TokenPool = { + namespace: 'ns1', type: 'funkible' as TokenType, requestId: REQUEST, signer: IDENTITY, @@ -347,18 +319,16 @@ export default (context: TestContext) => { name: NAME, symbol: SYMBOL, }; - const response = { statusCode: 400, message: ['type must be a valid enum value'], error: 'Bad Request', }; - await context.server.post('/createpool').send(request).expect(400).expect(response); }); - it('Create pool - correct fields', async () => { const request: TokenPool = { + namespace: 'ns1', type: TokenType.FUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -367,8 +337,8 @@ export default (context: TestContext) => { name: NAME, symbol: SYMBOL, }; - const expectedResponse = expect.objectContaining({ + namespace: 'ns1', data: `{"tx":${TX}}`, poolLocator: `address=${CONTRACT_ADDRESS}&schema=${ERC20_NO_DATA_SCHEMA}&type=${TokenType.FUNGIBLE}`, standard: 'ERC20', @@ -380,22 +350,19 @@ export default (context: TestContext) => { schema: ERC20_NO_DATA_SCHEMA, }, }); - mockPoolQuery(false); context.http.get = jest.fn(() => new FakeObservable(expectedResponse)); - const response = await context.server.post('/createpool').send(request).expect(200); expect(response.body).toEqual(expectedResponse); }); - it('Mint token', async () => { const request: TokenMint = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_NO_DATA_POOL_ID, to: '0x123', }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -405,29 +372,24 @@ export default (context: TestContext) => { method: ERC20NoDataABI.abi.find(abi => abi.name === MINT_NO_DATA) as IAbiMethod, params: ['0x123', '20'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/mint').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Transfer token', async () => { const request: TokenTransfer = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_NO_DATA_POOL_ID, to: '0x123', from: IDENTITY, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -437,28 +399,23 @@ export default (context: TestContext) => { method: ERC20NoDataABI.abi.find(abi => abi.name === TRANSFER_NO_DATA) as IAbiMethod, params: ['0x123', '20'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/transfer').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Burn token', async () => { const request: TokenBurn = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_NO_DATA_POOL_ID, from: IDENTITY, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -468,29 +425,24 @@ export default (context: TestContext) => { method: ERC20NoDataABI.abi.find(abi => abi.name === BURN_NO_DATA) as IAbiMethod, params: ['20'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/burn').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Token approval', async () => { const request: TokenApproval = { + namespace: 'ns1', poolLocator: ERC20_NO_DATA_POOL_ID, signer: IDENTITY, operator: '2', approved: true, config: { allowance: '100' }, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -500,20 +452,15 @@ export default (context: TestContext) => { method: ERC20WithDataABI.abi.find(abi => abi.name === APPROVE_NO_DATA) as IAbiMethod, params: ['2', '100'], }; - const response: EthConnectAsyncResponse = { id: '1', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/approval').send(request).expect(202).expect({ id: '1' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Burn token - custom ABI', async () => { const burnMethods = [ { @@ -548,8 +495,8 @@ export default (context: TestContext) => { outputs: [], }, ]; - const request: TokenBurn = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_NO_DATA_POOL_ID, @@ -559,7 +506,6 @@ export default (context: TestContext) => { methods: burnMethods, }, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -569,20 +515,15 @@ export default (context: TestContext) => { method: burnMethods[0], params: ['20'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/burn').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Burn token - custom ABI, burn from other', async () => { const burnMethods = [ { @@ -617,8 +558,8 @@ export default (context: TestContext) => { outputs: [], }, ]; - const request: TokenBurn = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_NO_DATA_POOL_ID, @@ -628,7 +569,6 @@ export default (context: TestContext) => { methods: burnMethods, }, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -638,27 +578,21 @@ export default (context: TestContext) => { method: burnMethods[1], params: ['0x2', '20'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/burn').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Check interface', async () => { const request: CheckInterfaceRequest = { poolLocator: ERC20_NO_DATA_POOL_ID, format: InterfaceFormat.ABI, methods: ERC20NoDataABI.abi, }; - const response: CheckInterfaceResponse = { approval: { format: InterfaceFormat.ABI, @@ -682,7 +616,6 @@ export default (context: TestContext) => { ], }, }; - await context.server.post('/checkinterface').send(request).expect(200).expect(response); }); }); diff --git a/test/suites/erc721.ts b/test/suites/erc721.ts index bf41034..fa592df 100644 --- a/test/suites/erc721.ts +++ b/test/suites/erc721.ts @@ -83,7 +83,6 @@ export default (context: TestContext) => { }), ); }; - const mockURIQuery = (withURI: boolean) => { context.http.post.mockReturnValueOnce( new FakeObservable({ @@ -91,10 +90,10 @@ export default (context: TestContext) => { }), ); }; - describe('ERC721WithData', () => { it('Create pool - correct fields', async () => { const request: TokenPool = { + namespace: 'ns1', type: TokenType.NONFUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -103,7 +102,6 @@ export default (context: TestContext) => { name: NAME, symbol: SYMBOL, }; - const expectedResponse = expect.objectContaining({ data: `{"tx":${TX}}`, poolLocator: `address=${CONTRACT_ADDRESS}&schema=${ERC721_WITH_DATA_SCHEMA}&type=${TokenType.NONFUNGIBLE}`, @@ -116,17 +114,15 @@ export default (context: TestContext) => { schema: ERC721_WITH_DATA_SCHEMA, }, }); - mockURIQuery(true); mockPoolQuery(undefined); context.http.get = jest.fn(() => new FakeObservable(expectedResponse)); - const response = await context.server.post('/createpool').send(request).expect(200); expect(response.body).toEqual(expectedResponse); }); - it('Create pool - base URI', async () => { const request: TokenPool = { + namespace: 'ns1', type: TokenType.NONFUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -135,7 +131,6 @@ export default (context: TestContext) => { name: NAME, symbol: SYMBOL, }; - const expectedResponse = expect.objectContaining({ data: `{"tx":${TX}}`, poolLocator: `address=${CONTRACT_ADDRESS}&schema=${ERC721_WITH_DATA_SCHEMA}&type=${TokenType.NONFUNGIBLE}`, @@ -148,22 +143,19 @@ export default (context: TestContext) => { schema: ERC721_WITH_DATA_SCHEMA, }, }); - mockURIQuery(true); mockPoolQuery(undefined); context.http.get = jest.fn(() => new FakeObservable(expectedResponse)); - const response = await context.server.post('/createpool').send(request).expect(200); expect(response.body).toEqual(expectedResponse); }); - it('Mint token', async () => { const request: TokenMint = { + namespace: 'ns1', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_POOL_ID, to: '0x123', }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -173,29 +165,24 @@ export default (context: TestContext) => { method: ERC721WithDataV2ABI.abi.find(abi => abi.name === MINT_WITH_URI) as IAbiMethod, params: ['0x123', '0x00', ''], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/mint').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Transfer token', async () => { const request: TokenTransfer = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_POOL_ID, to: '0x123', from: IDENTITY, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -205,28 +192,23 @@ export default (context: TestContext) => { method: ERC721WithDataV2ABI.abi.find(abi => abi.name === TRANSFER_WITH_DATA) as IAbiMethod, params: [IDENTITY, '0x123', '721', '0x00'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/transfer').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Burn token', async () => { const request: TokenBurn = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_POOL_ID, from: IDENTITY, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -236,29 +218,24 @@ export default (context: TestContext) => { method: ERC721WithDataV2ABI.abi.find(abi => abi.name === BURN_WITH_DATA) as IAbiMethod, params: [IDENTITY, '721', '0x00'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/burn').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Token approval for all', async () => { const request: TokenApproval = { + namespace: 'ns1', poolLocator: ERC721_WITH_DATA_POOL_ID, signer: IDENTITY, operator: '2', approved: true, config: {}, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -270,29 +247,24 @@ export default (context: TestContext) => { ) as IAbiMethod, params: ['2', true, '0x00'], }; - const response: EthConnectAsyncResponse = { id: '1', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/approval').send(request).expect(202).expect({ id: '1' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Token approval for one', async () => { const request: TokenApproval = { + namespace: 'ns1', poolLocator: ERC721_WITH_DATA_POOL_ID, signer: IDENTITY, operator: '2', approved: true, config: { tokenIndex: '5' }, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -302,24 +274,20 @@ export default (context: TestContext) => { method: ERC721WithDataV2ABI.abi.find(abi => abi.name === APPROVE_WITH_DATA) as IAbiMethod, params: ['2', '5', '0x00'], }; - const response: EthConnectAsyncResponse = { id: '1', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/approval').send(request).expect(202).expect({ id: '1' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); }); - describe('ERC721NoData', () => { it('Create pool - correct fields', async () => { const request: TokenPool = { + namespace: 'ns1', type: TokenType.NONFUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -328,7 +296,6 @@ export default (context: TestContext) => { name: NAME, symbol: SYMBOL, }; - const expectedResponse = expect.objectContaining({ data: `{"tx":${TX}}`, poolLocator: `address=${CONTRACT_ADDRESS}&schema=${ERC721_NO_DATA_SCHEMA}&type=${TokenType.NONFUNGIBLE}`, @@ -341,23 +308,20 @@ export default (context: TestContext) => { schema: ERC721_NO_DATA_SCHEMA, }, }); - mockURIQuery(false); mockURIQuery(false); mockPoolQuery(false); context.http.get = jest.fn(() => new FakeObservable(expectedResponse)); - const response = await context.server.post('/createpool').send(request).expect(200); expect(response.body).toEqual(expectedResponse); }); - it('Mint token', async () => { const request: TokenMint = { + namespace: 'ns1', signer: IDENTITY, poolLocator: ERC721_NO_DATA_POOL_ID, to: '0x123', }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -367,29 +331,24 @@ export default (context: TestContext) => { method: ERC721NoDataABI.abi.find(abi => abi.name === MINT_NO_DATA) as IAbiMethod, params: ['0x123'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/mint').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Transfer token', async () => { const request: TokenTransfer = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_NO_DATA_POOL_ID, to: '0x123', from: IDENTITY, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -401,28 +360,23 @@ export default (context: TestContext) => { ) as IAbiMethod, params: [IDENTITY, '0x123', '721', '0x00'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/transfer').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Burn token', async () => { const request: TokenBurn = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_NO_DATA_POOL_ID, from: IDENTITY, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -432,29 +386,24 @@ export default (context: TestContext) => { method: ERC721NoDataABI.abi.find(abi => abi.name === BURN_NO_DATA) as IAbiMethod, params: ['721'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/burn').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Token approval for all', async () => { const request: TokenApproval = { + namespace: 'ns1', poolLocator: ERC721_NO_DATA_POOL_ID, signer: IDENTITY, operator: '2', approved: true, config: {}, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -464,29 +413,24 @@ export default (context: TestContext) => { method: ERC721NoDataABI.abi.find(abi => abi.name === APPROVE_FOR_ALL_NO_DATA) as IAbiMethod, params: ['2', true], }; - const response: EthConnectAsyncResponse = { id: '1', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/approval').send(request).expect(202).expect({ id: '1' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Token approval for one', async () => { const request: TokenApproval = { + namespace: 'ns1', poolLocator: ERC721_NO_DATA_POOL_ID, signer: IDENTITY, operator: '2', approved: true, config: { tokenIndex: '5' }, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -496,20 +440,15 @@ export default (context: TestContext) => { method: ERC721NoDataABI.abi.find(abi => abi.name === APPROVE_NO_DATA) as IAbiMethod, params: ['2', '5'], }; - const response: EthConnectAsyncResponse = { id: '1', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/approval').send(request).expect(202).expect({ id: '1' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Mint token - custom ABI', async () => { const safeMintAutoIndex = { name: 'safeMint', @@ -524,8 +463,8 @@ export default (context: TestContext) => { ], outputs: [], }; - const request: TokenMint = { + namespace: 'ns1', signer: IDENTITY, poolLocator: ERC721_NO_DATA_POOL_ID, to: '0x123', @@ -534,7 +473,6 @@ export default (context: TestContext) => { methods: [safeMintAutoIndex], }, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -544,27 +482,21 @@ export default (context: TestContext) => { method: safeMintAutoIndex, params: ['0x123'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/mint').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Check interface', async () => { const request: CheckInterfaceRequest = { poolLocator: ERC721_NO_DATA_POOL_ID, format: InterfaceFormat.ABI, methods: ERC721NoDataABI.abi, }; - const response: CheckInterfaceResponse = { approval: { format: InterfaceFormat.ABI, @@ -593,7 +525,6 @@ export default (context: TestContext) => { ], }, }; - await context.server.post('/checkinterface').send(request).expect(200).expect(response); }); }); diff --git a/test/suites/websocket.ts b/test/suites/websocket.ts index 01c2bca..55e05d7 100644 --- a/test/suites/websocket.ts +++ b/test/suites/websocket.ts @@ -234,7 +234,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20MintTransferEvent], batchNumber: 12345 }); }) @@ -290,7 +295,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20MintTransferEvent] }); }) @@ -375,7 +385,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC721MintTransferEvent] }); }) @@ -429,7 +444,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20TransferEvent] }); }) @@ -516,7 +536,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC721TransferEvent] }); }) @@ -569,7 +594,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20BurnEvent] }); }) @@ -654,7 +684,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC721BurnEvent] }); }) @@ -713,7 +748,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20ApprovalEvent] }); }) @@ -772,7 +812,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC721ApprovalEvent] }); }) @@ -831,7 +876,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockApprovalForAllEvent] }); }) @@ -847,7 +897,12 @@ export default (context: TestContext) => { it('Success receipt', () => { return context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.receiptHandler).toBeDefined(); context.receiptHandler({ headers: { @@ -873,7 +928,12 @@ export default (context: TestContext) => { it('Error receipt', () => { return context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.receiptHandler).toBeDefined(); context.receiptHandler({ headers: { @@ -905,7 +965,12 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20MintTransferEvent] }); }) @@ -918,25 +983,16 @@ export default (context: TestContext) => { }) .close(); - await context.server.ws('/api/ws').expectJson(message => { - expect(message.id).toBeDefined(); - expect(message.event).toEqual('batch'); - expect(message.data.events).toHaveLength(1); - expect(message.data.events[0].event).toEqual('token-mint'); - return true; - }); - }); - - it('Client switchover', async () => { - context.eventstream.getSubscription.mockReturnValueOnce({ - name: packSubscriptionName(CONTRACT_ADDRESS, '', 'default'), - }); - - const ws1 = context.server.ws('/api/ws'); - const ws2 = context.server.ws('/api/ws'); + context.resetConnectedPromise(); - await ws1 - .exec(() => { + await context.server + .ws('/api/ws') + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20MintTransferEvent] }); }) @@ -946,15 +1002,6 @@ export default (context: TestContext) => { expect(message.data.events).toHaveLength(1); expect(message.data.events[0].event).toEqual('token-mint'); return true; - }) - .close(); - - await ws2.expectJson(message => { - expect(message.id).toBeDefined(); - expect(message.event).toEqual('batch'); - expect(message.data.events).toHaveLength(1); - expect(message.data.events[0].event).toEqual('token-mint'); - return true; - }); + }); }); }; From 77ed3fee1b8a5b61f53eb7f845b102f7684275f8 Mon Sep 17 00:00:00 2001 From: Nicko Guyer Date: Mon, 6 Nov 2023 11:20:23 -0500 Subject: [PATCH 02/13] Remove unused imports Signed-off-by: Nicko Guyer --- src/eventstream-proxy/eventstream-proxy.base.ts | 2 +- src/tokens/tokens.interfaces.ts | 4 ---- src/tokens/tokens.service.ts | 3 +-- 3 files changed, 2 insertions(+), 7 deletions(-) diff --git a/src/eventstream-proxy/eventstream-proxy.base.ts b/src/eventstream-proxy/eventstream-proxy.base.ts index 2d3aeec..1c08cab 100644 --- a/src/eventstream-proxy/eventstream-proxy.base.ts +++ b/src/eventstream-proxy/eventstream-proxy.base.ts @@ -18,7 +18,7 @@ import { Logger } from '@nestjs/common'; import { MessageBody, SubscribeMessage } from '@nestjs/websockets'; import { v4 as uuidv4 } from 'uuid'; import { Context, newContext } from '../request-context/request-context.decorator'; -import { EventBatch, EventStream, EventStreamReply } from '../event-stream/event-stream.interfaces'; +import { EventBatch, EventStreamReply } from '../event-stream/event-stream.interfaces'; import { EventStreamService, EventStreamSocket } from '../event-stream/event-stream.service'; import { WebSocketActionBase, diff --git a/src/tokens/tokens.interfaces.ts b/src/tokens/tokens.interfaces.ts index 8d9fa99..aa4bb78 100644 --- a/src/tokens/tokens.interfaces.ts +++ b/src/tokens/tokens.interfaces.ts @@ -415,10 +415,6 @@ export class TokenPoolEventInfo { } export class TokenPoolEvent extends tokenEventBase { - @ApiProperty() - @IsNotEmpty() - namespace: string; - @ApiProperty() type: TokenType; diff --git a/src/tokens/tokens.service.ts b/src/tokens/tokens.service.ts index d033351..2609677 100644 --- a/src/tokens/tokens.service.ts +++ b/src/tokens/tokens.service.ts @@ -15,10 +15,9 @@ // limitations under the License. import { BadRequestException, Injectable, Logger, NotFoundException } from '@nestjs/common'; -import { EventStream } from '../event-stream/event-stream.interfaces'; import { EventStreamService } from '../event-stream/event-stream.service'; import { EventStreamProxyGateway } from '../eventstream-proxy/eventstream-proxy.gateway'; -import { Context, newContext } from '../request-context/request-context.decorator'; +import { Context } from '../request-context/request-context.decorator'; import { AsyncResponse, CheckInterfaceRequest, From 4f3468130483796abc766e393951cb500dde8b3a Mon Sep 17 00:00:00 2001 From: Nicko Guyer Date: Mon, 6 Nov 2023 15:17:47 -0500 Subject: [PATCH 03/13] Adjust test timing Signed-off-by: Nicko Guyer --- test/suites/websocket.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/suites/websocket.ts b/test/suites/websocket.ts index 55e05d7..805aea3 100644 --- a/test/suites/websocket.ts +++ b/test/suites/websocket.ts @@ -985,6 +985,10 @@ export default (context: TestContext) => { context.resetConnectedPromise(); + await new Promise(resolve => { + setTimeout(resolve, 500); + }); + await context.server .ws('/api/ws') .sendJson({ From d96aa08be8e111577060b5744c9b6b903d04b882 Mon Sep 17 00:00:00 2001 From: Nicko Guyer Date: Mon, 6 Nov 2023 16:08:48 -0500 Subject: [PATCH 04/13] Update node version Signed-off-by: Nicko Guyer --- .github/workflows/docker_main.yml | 2 +- .github/workflows/docker_release.yml | 4 ++-- .github/workflows/test.yml | 16 ++++++++-------- test/suites/websocket.ts | 4 ---- 4 files changed, 11 insertions(+), 15 deletions(-) diff --git a/.github/workflows/docker_main.yml b/.github/workflows/docker_main.yml index 4ee11c9..b0e34a5 100644 --- a/.github/workflows/docker_main.yml +++ b/.github/workflows/docker_main.yml @@ -9,7 +9,7 @@ jobs: docker: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Set build tag id: build_tag_generator diff --git a/.github/workflows/docker_release.yml b/.github/workflows/docker_release.yml index 185e1c4..d372750 100644 --- a/.github/workflows/docker_release.yml +++ b/.github/workflows/docker_release.yml @@ -8,7 +8,7 @@ jobs: docker: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 with: fetch-depth: 0 @@ -30,7 +30,7 @@ jobs: run: | echo ${{ secrets.GITHUB_TOKEN }} | docker login ghcr.io -u $GITHUB_ACTOR --password-stdin docker push ghcr.io/hyperledger/firefly-tokens-erc20-erc721:${GITHUB_REF##*/} - + - name: Push head tag run: | echo ${{ secrets.GITHUB_TOKEN }} | docker login ghcr.io -u $GITHUB_ACTOR --password-stdin diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d695676..993d271 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -8,22 +8,22 @@ jobs: test: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Use Node.js - uses: actions/setup-node@v2 + uses: actions/setup-node@v3 with: - node-version: '16.x' + node-version: '20.9.0' - run: npm ci - run: npm run test - run: npm run test:e2e solidity-test: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Use Node.js - uses: actions/setup-node@v2 + uses: actions/setup-node@v3 with: - node-version: '16.x' + node-version: '20.9.0' - run: npm ci working-directory: ./samples/solidity - run: npm run compile @@ -33,6 +33,6 @@ jobs: docker: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Docker build - run: docker build --tag ghcr.io/hyperledger/firefly-tokens-erc20-erc721 . \ No newline at end of file + run: docker build --tag ghcr.io/hyperledger/firefly-tokens-erc20-erc721 . diff --git a/test/suites/websocket.ts b/test/suites/websocket.ts index 805aea3..55e05d7 100644 --- a/test/suites/websocket.ts +++ b/test/suites/websocket.ts @@ -985,10 +985,6 @@ export default (context: TestContext) => { context.resetConnectedPromise(); - await new Promise(resolve => { - setTimeout(resolve, 500); - }); - await context.server .ws('/api/ws') .sendJson({ From e58f3170a46f9baa2d8d8465d25728219f38f16a Mon Sep 17 00:00:00 2001 From: Nicko Guyer Date: Fri, 10 Nov 2023 16:28:57 -0500 Subject: [PATCH 05/13] Allow re-activation of token pools Signed-off-by: Nicko Guyer --- .../eventstream-proxy.base.ts | 14 +++++- .../eventstream-proxy.gateway.ts | 1 + src/websocket-events/websocket-events.base.ts | 5 ++ test/suites/websocket.ts | 48 +++++++++++++++++++ 4 files changed, 67 insertions(+), 1 deletion(-) diff --git a/src/eventstream-proxy/eventstream-proxy.base.ts b/src/eventstream-proxy/eventstream-proxy.base.ts index 1c08cab..339ad9d 100644 --- a/src/eventstream-proxy/eventstream-proxy.base.ts +++ b/src/eventstream-proxy/eventstream-proxy.base.ts @@ -68,7 +68,8 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { handleConnection(client: WebSocketEx) { super.handleConnection(client); - client.on('message', (message: string) => { + client.on('message', async (message: string) => { + this.logger.debug(`WS => ${message}`); const action = JSON.parse(message) as WebSocketActionBase; switch (action.type) { case 'start': @@ -108,6 +109,17 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { } clientSet.add(client); this.namespaceClients.set(namespace, clientSet); + + // ack the start command + client.send( + JSON.stringify({ + event: 'started', + data: { + namespace: namespace, + }, + }), + ); + this.logger.debug(`Started namespace '${namespace}'`); } catch (e) { this.logger.error(`Error connecting to event stream websocket: ${e.message}`); } diff --git a/src/eventstream-proxy/eventstream-proxy.gateway.ts b/src/eventstream-proxy/eventstream-proxy.gateway.ts index 90e1a2b..38ea447 100644 --- a/src/eventstream-proxy/eventstream-proxy.gateway.ts +++ b/src/eventstream-proxy/eventstream-proxy.gateway.ts @@ -18,6 +18,7 @@ import { Logger } from '@nestjs/common'; import { WebSocketGateway } from '@nestjs/websockets'; import { EventStreamService } from '../event-stream/event-stream.service'; import { EventStreamProxyBase } from './eventstream-proxy.base'; +import { TokensService } from 'src/tokens/tokens.service'; @WebSocketGateway({ path: '/api/ws' }) export class EventStreamProxyGateway extends EventStreamProxyBase { diff --git a/src/websocket-events/websocket-events.base.ts b/src/websocket-events/websocket-events.base.ts index 8b71689..a0961ad 100644 --- a/src/websocket-events/websocket-events.base.ts +++ b/src/websocket-events/websocket-events.base.ts @@ -24,6 +24,7 @@ import { } from '@nestjs/websockets'; import { nanoid } from 'nanoid'; import WebSocket, { Server } from 'ws'; +import { TokenPoolActivate } from 'src/tokens/tokens.interfaces'; const PING_INTERVAL = 5000; @@ -58,6 +59,10 @@ export interface WebSocketStart extends WebSocketActionBase { namespace: string; } +export interface WebSocketActivate extends WebSocketActionBase { + pool: TokenPoolActivate; +} + /** * Base class for websocket gateways. * diff --git a/test/suites/websocket.ts b/test/suites/websocket.ts index 55e05d7..8a13452 100644 --- a/test/suites/websocket.ts +++ b/test/suites/websocket.ts @@ -243,6 +243,10 @@ export default (context: TestContext) => { expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20MintTransferEvent], batchNumber: 12345 }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -304,6 +308,10 @@ export default (context: TestContext) => { expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20MintTransferEvent] }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -394,6 +402,10 @@ export default (context: TestContext) => { expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC721MintTransferEvent] }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -453,6 +465,10 @@ export default (context: TestContext) => { expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20TransferEvent] }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -545,6 +561,10 @@ export default (context: TestContext) => { expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC721TransferEvent] }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -603,6 +623,10 @@ export default (context: TestContext) => { expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20BurnEvent] }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -693,6 +717,10 @@ export default (context: TestContext) => { expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC721BurnEvent] }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -757,6 +785,10 @@ export default (context: TestContext) => { expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20ApprovalEvent] }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -821,6 +853,10 @@ export default (context: TestContext) => { expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC721ApprovalEvent] }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -885,6 +921,10 @@ export default (context: TestContext) => { expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockApprovalForAllEvent] }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -974,6 +1014,10 @@ export default (context: TestContext) => { expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20MintTransferEvent] }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -996,6 +1040,10 @@ export default (context: TestContext) => { expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20MintTransferEvent] }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); From fe873eab083b7e68cea878e510123fa9162fed96 Mon Sep 17 00:00:00 2001 From: Nicko Guyer Date: Tue, 14 Nov 2023 13:39:05 -0500 Subject: [PATCH 06/13] Remove unused import Signed-off-by: Nicko Guyer --- src/eventstream-proxy/eventstream-proxy.gateway.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/eventstream-proxy/eventstream-proxy.gateway.ts b/src/eventstream-proxy/eventstream-proxy.gateway.ts index 38ea447..90e1a2b 100644 --- a/src/eventstream-proxy/eventstream-proxy.gateway.ts +++ b/src/eventstream-proxy/eventstream-proxy.gateway.ts @@ -18,7 +18,6 @@ import { Logger } from '@nestjs/common'; import { WebSocketGateway } from '@nestjs/websockets'; import { EventStreamService } from '../event-stream/event-stream.service'; import { EventStreamProxyBase } from './eventstream-proxy.base'; -import { TokensService } from 'src/tokens/tokens.service'; @WebSocketGateway({ path: '/api/ws' }) export class EventStreamProxyGateway extends EventStreamProxyBase { From f62c69d7e5bcce3669600d860fbc28f500d09b1c Mon Sep 17 00:00:00 2001 From: Nicko Guyer Date: Tue, 14 Nov 2023 16:06:57 -0500 Subject: [PATCH 07/13] Remove deprecated eventstreams Signed-off-by: Nicko Guyer --- src/event-stream/event-stream.service.ts | 16 +++++++++++++++- src/websocket-events/websocket-events.base.ts | 5 ----- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/event-stream/event-stream.service.ts b/src/event-stream/event-stream.service.ts index 0917e02..609fbdf 100644 --- a/src/event-stream/event-stream.service.ts +++ b/src/event-stream/event-stream.service.ts @@ -198,13 +198,27 @@ export class EventStreamService { batchSize: 50, batchTimeoutMS: 500, type: 'websocket', - websocket: { topic }, + websocket: { topic: name }, blockedReryDelaySec: 30, // intentional due to spelling error in ethconnect inputs: true, timestamps: true, }; const existingStreams = await this.getStreams(ctx); + + // Check to see if there is a deprecated stream that we should remove + this.logger.debug(`Checking for deprecated event steam with topic '${topic}'`); + const deprecatedStream = existingStreams.find(s => s.name === topic); + if (deprecatedStream) { + this.logger.debug(`Purging deprecated eventstream '${deprecatedStream.id}'`); + await lastValueFrom( + this.http.delete( + new URL(`/eventstreams/${deprecatedStream.id}`, this.baseUrl).href, + this.requestOptions(ctx), + ), + ); + } + const stream = existingStreams.find(s => s.name === streamDetails.name); if (stream) { const patchedStreamRes = await lastValueFrom( diff --git a/src/websocket-events/websocket-events.base.ts b/src/websocket-events/websocket-events.base.ts index a0961ad..8b71689 100644 --- a/src/websocket-events/websocket-events.base.ts +++ b/src/websocket-events/websocket-events.base.ts @@ -24,7 +24,6 @@ import { } from '@nestjs/websockets'; import { nanoid } from 'nanoid'; import WebSocket, { Server } from 'ws'; -import { TokenPoolActivate } from 'src/tokens/tokens.interfaces'; const PING_INTERVAL = 5000; @@ -59,10 +58,6 @@ export interface WebSocketStart extends WebSocketActionBase { namespace: string; } -export interface WebSocketActivate extends WebSocketActionBase { - pool: TokenPoolActivate; -} - /** * Base class for websocket gateways. * From 916d4e76351edf646774a77e592c25f322548189 Mon Sep 17 00:00:00 2001 From: Nicko Guyer Date: Wed, 10 Jan 2024 14:36:38 -0500 Subject: [PATCH 08/13] Updates Signed-off-by: Nicko Guyer --- .env | 2 +- .vscode/settings.json | 2 +- src/tokens/tokens.service.ts | 5 +---- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/.env b/.env index 5e4d496..177a6c2 100644 --- a/.env +++ b/.env @@ -1,6 +1,6 @@ PORT=3000 ETHCONNECT_URL=http://127.0.0.1:5102 -ETHCONNECT_TOPIC=tokens_local +ETHCONNECT_TOPIC=tokens_0_0 FACTORY_CONTRACT_ADDRESS="0xd85b3fba5552c48389607954e042e7313a9aec6e" AUTO_INIT=false USE_LEGACY_ERC20_SAMPLE=false diff --git a/.vscode/settings.json b/.vscode/settings.json index 4fb545a..b273c98 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,7 +1,7 @@ { "solidity.compileUsingRemoteVersion": "v0.6.12+commit.27d51765", "editor.codeActionsOnSave": { - "source.fixAll.eslint": true + "source.fixAll.eslint": "explicit" }, "eslint.validate": ["javascript"], "solidity.defaultCompiler": "remote", diff --git a/src/tokens/tokens.service.ts b/src/tokens/tokens.service.ts index 2609677..16ee935 100644 --- a/src/tokens/tokens.service.ts +++ b/src/tokens/tokens.service.ts @@ -89,10 +89,7 @@ export class TokensService { this.proxy.configure(wsUrl, this.topic); } - async onConnect() { - const wsUrl = new URL('/ws', this.baseUrl.replace('http', 'ws')).href; - this.proxy.configure(wsUrl, this.topic); - } + async onConnect() {} private async getOrCreateFactorySubscription(ctx: Context, address: string, namespace) { const eventABI = this.mapper.getCreateEvent(); From 352668ef12dfffe5ad684178bc4a8284d4123bc0 Mon Sep 17 00:00:00 2001 From: Nicko Guyer Date: Tue, 30 Jan 2024 16:24:26 -0500 Subject: [PATCH 09/13] Add WS log messages Signed-off-by: Nicko Guyer --- src/event-stream/event-stream.service.ts | 1 + src/eventstream-proxy/eventstream-proxy.base.ts | 2 +- src/websocket-events/websocket-events.base.ts | 3 +++ 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/event-stream/event-stream.service.ts b/src/event-stream/event-stream.service.ts index 609fbdf..405418a 100644 --- a/src/event-stream/event-stream.service.ts +++ b/src/event-stream/event-stream.service.ts @@ -84,6 +84,7 @@ export class EventStreamSocket { } }) .on('message', (message: string) => { + this.logger.debug(`WS => ${message}`); this.handleMessage(JSON.parse(message)); }) .on('pong', () => { diff --git a/src/eventstream-proxy/eventstream-proxy.base.ts b/src/eventstream-proxy/eventstream-proxy.base.ts index 339ad9d..9886355 100644 --- a/src/eventstream-proxy/eventstream-proxy.base.ts +++ b/src/eventstream-proxy/eventstream-proxy.base.ts @@ -69,7 +69,6 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { handleConnection(client: WebSocketEx) { super.handleConnection(client); client.on('message', async (message: string) => { - this.logger.debug(`WS => ${message}`); const action = JSON.parse(message) as WebSocketActionBase; switch (action.type) { case 'start': @@ -253,6 +252,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { let i = 0; for (let client of clients.keys()) { if (i++ == selected) { + this.logger.debug(`WS <= ${payload}`); client.send(payload); return; } diff --git a/src/websocket-events/websocket-events.base.ts b/src/websocket-events/websocket-events.base.ts index 8b71689..1908cc1 100644 --- a/src/websocket-events/websocket-events.base.ts +++ b/src/websocket-events/websocket-events.base.ts @@ -94,6 +94,9 @@ export abstract class WebSocketEventsBase client.on('error', err => { this.logger.log(`WebSocket ${client.id}: error: ${err}`); }); + client.on('message', msg => { + this.logger.debug(`WS => ${msg}`); + }); } handleDisconnect(client: WebSocketEx) { From 4ebae6b6fc6ca5ce0410512112738e8580a3b8d5 Mon Sep 17 00:00:00 2001 From: Nicko Guyer Date: Mon, 26 Feb 2024 10:06:30 -0500 Subject: [PATCH 10/13] PR feedback Signed-off-by: Nicko Guyer --- .env | 1 - src/app.module.ts | 2 +- src/event-stream/event-stream.interfaces.ts | 2 +- src/event-stream/event-stream.module.ts | 2 +- src/event-stream/event-stream.service.spec.ts | 2 +- src/event-stream/event-stream.service.ts | 16 +++++++-------- .../eventstream-proxy.base.ts | 20 ++++++++----------- .../eventstream-proxy.gateway.spec.ts | 2 +- .../eventstream-proxy.gateway.ts | 2 +- .../eventstream-proxy.interfaces.ts | 6 +----- .../eventstream-proxy.module.ts | 2 +- src/health/health.module.ts | 2 +- src/main.ts | 3 +-- src/request-context/request-id.middleware.ts | 2 +- src/request-logging.interceptor.spec.ts | 2 +- src/request-logging.interceptor.ts | 2 +- src/tokens/abimapper.service.ts | 2 +- src/tokens/blockchain.service.ts | 2 +- src/tokens/erc165.ts | 2 +- src/tokens/erc20.ts | 2 +- src/tokens/erc721.ts | 2 +- src/tokens/tokens.controller.spec.ts | 2 +- src/tokens/tokens.controller.ts | 7 +++++-- src/tokens/tokens.interfaces.ts | 2 +- src/tokens/tokens.listener.ts | 7 +++++-- src/tokens/tokens.module.ts | 2 +- src/tokens/tokens.service.spec.ts | 2 +- src/tokens/tokens.service.ts | 20 +++++++++++++------ src/tokens/tokens.util.spec.ts | 2 +- src/tokens/tokens.util.ts | 2 +- src/utils.ts | 4 ++++ src/websocket-events/websocket-events.base.ts | 13 +++++++++--- test/app.e2e-spec.ts | 2 +- test/suites/erc20.ts | 2 +- test/suites/erc721.ts | 2 +- test/suites/websocket.ts | 2 +- 36 files changed, 82 insertions(+), 67 deletions(-) diff --git a/.env b/.env index 177a6c2..4c89bcc 100644 --- a/.env +++ b/.env @@ -2,6 +2,5 @@ PORT=3000 ETHCONNECT_URL=http://127.0.0.1:5102 ETHCONNECT_TOPIC=tokens_0_0 FACTORY_CONTRACT_ADDRESS="0xd85b3fba5552c48389607954e042e7313a9aec6e" -AUTO_INIT=false USE_LEGACY_ERC20_SAMPLE=false USE_LEGACY_ERC721_SAMPLE=false diff --git a/src/app.module.ts b/src/app.module.ts index 898a236..c3411f2 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/event-stream/event-stream.interfaces.ts b/src/event-stream/event-stream.interfaces.ts index 01cd022..8b981b4 100644 --- a/src/event-stream/event-stream.interfaces.ts +++ b/src/event-stream/event-stream.interfaces.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/event-stream/event-stream.module.ts b/src/event-stream/event-stream.module.ts index 9087296..e677c8d 100644 --- a/src/event-stream/event-stream.module.ts +++ b/src/event-stream/event-stream.module.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/event-stream/event-stream.service.spec.ts b/src/event-stream/event-stream.service.spec.ts index 5266f16..ffe98dc 100644 --- a/src/event-stream/event-stream.service.spec.ts +++ b/src/event-stream/event-stream.service.spec.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/event-stream/event-stream.service.ts b/src/event-stream/event-stream.service.ts index 405418a..9db31b1 100644 --- a/src/event-stream/event-stream.service.ts +++ b/src/event-stream/event-stream.service.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -22,7 +22,7 @@ import WebSocket from 'ws'; import { FFRequestIDHeader } from '../request-context/constants'; import { Context, newContext } from '../request-context/request-context.decorator'; import { IAbiMethod } from '../tokens/tokens.interfaces'; -import { getHttpRequestOptions, getWebsocketOptions } from '../utils'; +import { eventStreamName, getHttpRequestOptions, getWebsocketOptions } from '../utils'; import { Event, EventBatch, @@ -68,7 +68,7 @@ export class EventStreamSocket { } else { this.logger.log('Event stream websocket connected'); } - this.produce({ type: 'listen', topic: `${this.topic}/${this.namespace}` }); + this.produce({ type: 'listen', topic: eventStreamName(this.topic, this.namespace) }); this.produce({ type: 'listenreplies' }); this.ping(); }) @@ -84,7 +84,7 @@ export class EventStreamSocket { } }) .on('message', (message: string) => { - this.logger.debug(`WS => ${message}`); + this.logger.verbose(`WS => ${message}`); this.handleMessage(JSON.parse(message)); }) .on('pong', () => { @@ -111,11 +111,11 @@ export class EventStreamSocket { } ack(batchNumber: number | undefined) { - this.produce({ type: 'ack', topic: `${this.topic}/${this.namespace}`, batchNumber }); + this.produce({ type: 'ack', topic: eventStreamName(this.topic, this.namespace), batchNumber }); } nack(batchNumber: number | undefined) { - this.produce({ type: 'nack', topic: `${this.topic}/${this.namespace}`, batchNumber }); + this.produce({ type: 'nack', topic: eventStreamName(this.topic, this.namespace), batchNumber }); } close() { @@ -211,7 +211,7 @@ export class EventStreamService { this.logger.debug(`Checking for deprecated event steam with topic '${topic}'`); const deprecatedStream = existingStreams.find(s => s.name === topic); if (deprecatedStream) { - this.logger.debug(`Purging deprecated eventstream '${deprecatedStream.id}'`); + this.logger.log(`Purging deprecated eventstream '${deprecatedStream.id}'`); await lastValueFrom( this.http.delete( new URL(`/eventstreams/${deprecatedStream.id}`, this.baseUrl).href, @@ -358,7 +358,7 @@ export class EventStreamService { handleEvents: (events: EventBatch) => void, handleReceipt: (receipt: EventStreamReply) => void, ) { - const name = `${topic}/${namespace}`; + const name = eventStreamName(topic, namespace); await this.createOrUpdateStream(newContext(), name, topic); return new EventStreamSocket( diff --git a/src/eventstream-proxy/eventstream-proxy.base.ts b/src/eventstream-proxy/eventstream-proxy.base.ts index 9886355..54e8915 100644 --- a/src/eventstream-proxy/eventstream-proxy.base.ts +++ b/src/eventstream-proxy/eventstream-proxy.base.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -15,12 +15,13 @@ // limitations under the License. import { Logger } from '@nestjs/common'; -import { MessageBody, SubscribeMessage } from '@nestjs/websockets'; +import { ConnectedSocket, MessageBody, SubscribeMessage } from '@nestjs/websockets'; import { v4 as uuidv4 } from 'uuid'; import { Context, newContext } from '../request-context/request-context.decorator'; import { EventBatch, EventStreamReply } from '../event-stream/event-stream.interfaces'; import { EventStreamService, EventStreamSocket } from '../event-stream/event-stream.service'; import { + WebSocketAck, WebSocketActionBase, WebSocketEventsBase, WebSocketEx, @@ -28,8 +29,6 @@ import { WebSocketStart, } from '../websocket-events/websocket-events.base'; import { - AckMessageData, - ConnectionListener, EventListener, WebSocketMessageBatchData, WebSocketMessageWithId, @@ -47,7 +46,6 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { url?: string; topic?: string; - private connectListeners: ConnectionListener[] = []; private eventListeners: EventListener[] = []; private awaitingAck: WebSocketMessageWithId[] = []; private subscriptionNames = new Map(); @@ -75,6 +73,9 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { const startAction = action as WebSocketStart; this.startListening(client, startAction.namespace); break; + case 'ack': + const ackAction = action as WebSocketAck; + this.handleAck(ackAction); } }); } @@ -150,10 +151,6 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { }); } - addConnectionListener(listener: ConnectionListener) { - this.connectListeners.push(listener); - } - addEventListener(listener: EventListener) { this.eventListeners.push(listener); } @@ -219,8 +216,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { return undefined; } - @SubscribeMessage('ack') - handleAck(@MessageBody() data: AckMessageData) { + handleAck(data: WebSocketAck) { if (data.id === undefined) { this.logger.error('Received malformed ack'); return; @@ -252,7 +248,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { let i = 0; for (let client of clients.keys()) { if (i++ == selected) { - this.logger.debug(`WS <= ${payload}`); + this.logger.verbose(`WS <= ${payload}`); client.send(payload); return; } diff --git a/src/eventstream-proxy/eventstream-proxy.gateway.spec.ts b/src/eventstream-proxy/eventstream-proxy.gateway.spec.ts index becfe11..a650b4a 100644 --- a/src/eventstream-proxy/eventstream-proxy.gateway.spec.ts +++ b/src/eventstream-proxy/eventstream-proxy.gateway.spec.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/eventstream-proxy/eventstream-proxy.gateway.ts b/src/eventstream-proxy/eventstream-proxy.gateway.ts index 90e1a2b..c4e302d 100644 --- a/src/eventstream-proxy/eventstream-proxy.gateway.ts +++ b/src/eventstream-proxy/eventstream-proxy.gateway.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/eventstream-proxy/eventstream-proxy.interfaces.ts b/src/eventstream-proxy/eventstream-proxy.interfaces.ts index 8588292..42d0b70 100644 --- a/src/eventstream-proxy/eventstream-proxy.interfaces.ts +++ b/src/eventstream-proxy/eventstream-proxy.interfaces.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -35,10 +35,6 @@ export interface WebSocketMessageWithId extends WebSocketMessage { batchNumber: number | undefined; } -export interface AckMessageData { - id?: string; -} - export interface WebSocketMessageBatchData { events: WebSocketMessage[]; } diff --git a/src/eventstream-proxy/eventstream-proxy.module.ts b/src/eventstream-proxy/eventstream-proxy.module.ts index 7a96d27..158a5e5 100644 --- a/src/eventstream-proxy/eventstream-proxy.module.ts +++ b/src/eventstream-proxy/eventstream-proxy.module.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/health/health.module.ts b/src/health/health.module.ts index 299008e..ba2d9e5 100644 --- a/src/health/health.module.ts +++ b/src/health/health.module.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/main.ts b/src/main.ts index 6af6237..162fc50 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -78,7 +78,6 @@ async function bootstrap() { const ethConnectUrl = config.get('ETHCONNECT_URL', ''); const fftmUrl = config.get('FFTM_URL', ''); // Optional. Currently used only for SendTransaction API calls when set const topic = config.get('ETHCONNECT_TOPIC', 'tokenERC20ERC721'); - const autoInit = config.get('AUTO_INIT', 'true'); const username = config.get('ETHCONNECT_USERNAME', ''); const password = config.get('ETHCONNECT_PASSWORD', ''); const factoryAddress = config.get('FACTORY_CONTRACT_ADDRESS', ''); diff --git a/src/request-context/request-id.middleware.ts b/src/request-context/request-id.middleware.ts index 7ad509b..a614f68 100644 --- a/src/request-context/request-id.middleware.ts +++ b/src/request-context/request-id.middleware.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/request-logging.interceptor.spec.ts b/src/request-logging.interceptor.spec.ts index fe6bd53..31a8c6a 100644 --- a/src/request-logging.interceptor.spec.ts +++ b/src/request-logging.interceptor.spec.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/request-logging.interceptor.ts b/src/request-logging.interceptor.ts index fdbe0a5..e78c7ec 100644 --- a/src/request-logging.interceptor.ts +++ b/src/request-logging.interceptor.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/tokens/abimapper.service.ts b/src/tokens/abimapper.service.ts index b5d0340..84903f2 100644 --- a/src/tokens/abimapper.service.ts +++ b/src/tokens/abimapper.service.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/tokens/blockchain.service.ts b/src/tokens/blockchain.service.ts index e622dab..505a6a0 100644 --- a/src/tokens/blockchain.service.ts +++ b/src/tokens/blockchain.service.ts @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/tokens/erc165.ts b/src/tokens/erc165.ts index 62f4915..086d056 100644 --- a/src/tokens/erc165.ts +++ b/src/tokens/erc165.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/tokens/erc20.ts b/src/tokens/erc20.ts index 7905152..995aa61 100644 --- a/src/tokens/erc20.ts +++ b/src/tokens/erc20.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/tokens/erc721.ts b/src/tokens/erc721.ts index de28215..bbb5195 100644 --- a/src/tokens/erc721.ts +++ b/src/tokens/erc721.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/tokens/tokens.controller.spec.ts b/src/tokens/tokens.controller.spec.ts index 0435811..df698c7 100644 --- a/src/tokens/tokens.controller.spec.ts +++ b/src/tokens/tokens.controller.spec.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/tokens/tokens.controller.ts b/src/tokens/tokens.controller.ts index 356e96b..493a3ce 100644 --- a/src/tokens/tokens.controller.ts +++ b/src/tokens/tokens.controller.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -36,7 +36,10 @@ import { TokensService } from './tokens.service'; @Controller() export class TokensController { - constructor(private service: TokensService, private blockchain: BlockchainConnectorService) {} + constructor( + private service: TokensService, + private blockchain: BlockchainConnectorService, + ) {} @Post('init') @HttpCode(204) diff --git a/src/tokens/tokens.interfaces.ts b/src/tokens/tokens.interfaces.ts index aa4bb78..ea6a248 100644 --- a/src/tokens/tokens.interfaces.ts +++ b/src/tokens/tokens.interfaces.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/tokens/tokens.listener.ts b/src/tokens/tokens.listener.ts index 46d19de..c8d847c 100644 --- a/src/tokens/tokens.listener.ts +++ b/src/tokens/tokens.listener.ts @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -55,7 +55,10 @@ const approvalForAllEventSignature = 'ApprovalForAll(address,address,bool)'; export class TokenListener implements EventListener { private readonly logger = new Logger(TokenListener.name); - constructor(private mapper: AbiMapperService, private blockchain: BlockchainConnectorService) {} + constructor( + private mapper: AbiMapperService, + private blockchain: BlockchainConnectorService, + ) {} async onEvent(subName: string, event: Event) { const signature = this.trimEventSignature(event.signature); diff --git a/src/tokens/tokens.module.ts b/src/tokens/tokens.module.ts index e6094b6..58a8a2d 100644 --- a/src/tokens/tokens.module.ts +++ b/src/tokens/tokens.module.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/tokens/tokens.service.spec.ts b/src/tokens/tokens.service.spec.ts index 54e888e..18dc5d2 100644 --- a/src/tokens/tokens.service.spec.ts +++ b/src/tokens/tokens.service.spec.ts @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/tokens/tokens.service.ts b/src/tokens/tokens.service.ts index 16ee935..7d907be 100644 --- a/src/tokens/tokens.service.ts +++ b/src/tokens/tokens.service.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -15,6 +15,7 @@ // limitations under the License. import { BadRequestException, Injectable, Logger, NotFoundException } from '@nestjs/common'; +import { EventStream } from '../event-stream/event-stream.interfaces'; import { EventStreamService } from '../event-stream/event-stream.service'; import { EventStreamProxyGateway } from '../eventstream-proxy/eventstream-proxy.gateway'; import { Context } from '../request-context/request-context.decorator'; @@ -63,6 +64,7 @@ import { Symbol as ERC721Symbol, DynamicMethods as ERC721Methods, } from './erc721'; +import { eventStreamName } from '../utils'; @Injectable() export class TokensService { @@ -70,6 +72,7 @@ export class TokensService { baseUrl: string; topic: string; + stream: EventStream; factoryAddress = ''; constructor( @@ -83,15 +86,12 @@ export class TokensService { this.baseUrl = baseUrl; this.topic = topic; this.factoryAddress = factoryAddress.toLowerCase(); - this.proxy.addConnectionListener(this); this.proxy.addEventListener(new TokenListener(this.mapper, this.blockchain)); const wsUrl = new URL('/ws', this.baseUrl.replace('http', 'ws')).href; this.proxy.configure(wsUrl, this.topic); } - async onConnect() {} - - private async getOrCreateFactorySubscription(ctx: Context, address: string, namespace) { + private async getOrCreateFactorySubscription(ctx: Context, address: string, namespace: string) { const eventABI = this.mapper.getCreateEvent(); const methodABI = this.mapper.getCreateMethod(); const stream = await this.getStream(ctx, namespace); @@ -110,9 +110,17 @@ export class TokensService { } private async getStream(ctx: Context, namespace: string) { + const stream = this.stream; + if (stream !== undefined) { + return stream; + } await this.migrationCheck(ctx); this.logger.log('Creating stream with name ' + this.topic); - return this.eventstream.createOrUpdateStream(ctx, `${this.topic}/${namespace}`, this.topic); + return this.eventstream.createOrUpdateStream( + ctx, + eventStreamName(this.topic, namespace), + this.topic, + ); } /** diff --git a/src/tokens/tokens.util.spec.ts b/src/tokens/tokens.util.spec.ts index c4d6edc..47cffcf 100644 --- a/src/tokens/tokens.util.spec.ts +++ b/src/tokens/tokens.util.spec.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/tokens/tokens.util.ts b/src/tokens/tokens.util.ts index 7bbb56d..9836cd4 100644 --- a/src/tokens/tokens.util.ts +++ b/src/tokens/tokens.util.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/utils.ts b/src/utils.ts index 2aa5db6..fbcadbc 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -69,3 +69,7 @@ export const getNestOptions = (): NestApplicationOptions => { } return options; }; + +export const eventStreamName = (topic: string, namespace: string) => { + return `${topic}/${namespace}`; +}; diff --git a/src/websocket-events/websocket-events.base.ts b/src/websocket-events/websocket-events.base.ts index 1908cc1..ebc5bfe 100644 --- a/src/websocket-events/websocket-events.base.ts +++ b/src/websocket-events/websocket-events.base.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -58,6 +58,10 @@ export interface WebSocketStart extends WebSocketActionBase { namespace: string; } +export interface WebSocketAck extends WebSocketActionBase { + id: string; +} + /** * Base class for websocket gateways. * @@ -69,7 +73,10 @@ export abstract class WebSocketEventsBase { @WebSocketServer() server: Server; - constructor(protected readonly logger: Logger, private requireAuth = false) {} + constructor( + protected readonly logger: Logger, + private requireAuth = false, + ) {} afterInit(server: Server) { const interval = setInterval(() => this.ping(), PING_INTERVAL); @@ -95,7 +102,7 @@ export abstract class WebSocketEventsBase this.logger.log(`WebSocket ${client.id}: error: ${err}`); }); client.on('message', msg => { - this.logger.debug(`WS => ${msg}`); + this.logger.verbose(`WS => ${msg}`); }); } diff --git a/test/app.e2e-spec.ts b/test/app.e2e-spec.ts index bd733db..f2523bc 100644 --- a/test/app.e2e-spec.ts +++ b/test/app.e2e-spec.ts @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/test/suites/erc20.ts b/test/suites/erc20.ts index 0ff305c..8c0ef83 100644 --- a/test/suites/erc20.ts +++ b/test/suites/erc20.ts @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/test/suites/erc721.ts b/test/suites/erc721.ts index fa592df..dfd2789 100644 --- a/test/suites/erc721.ts +++ b/test/suites/erc721.ts @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/test/suites/websocket.ts b/test/suites/websocket.ts index 8a13452..dda80ad 100644 --- a/test/suites/websocket.ts +++ b/test/suites/websocket.ts @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // From 4d85ab6206ff0464e43fcdb5311d7b8cf309123d Mon Sep 17 00:00:00 2001 From: Nicko Guyer Date: Mon, 26 Feb 2024 11:42:51 -0500 Subject: [PATCH 11/13] Scope acks to recipient client Signed-off-by: Nicko Guyer --- .../eventstream-proxy.base.ts | 65 ++++++++++++------- 1 file changed, 40 insertions(+), 25 deletions(-) diff --git a/src/eventstream-proxy/eventstream-proxy.base.ts b/src/eventstream-proxy/eventstream-proxy.base.ts index 54e8915..0f95a69 100644 --- a/src/eventstream-proxy/eventstream-proxy.base.ts +++ b/src/eventstream-proxy/eventstream-proxy.base.ts @@ -15,7 +15,6 @@ // limitations under the License. import { Logger } from '@nestjs/common'; -import { ConnectedSocket, MessageBody, SubscribeMessage } from '@nestjs/websockets'; import { v4 as uuidv4 } from 'uuid'; import { Context, newContext } from '../request-context/request-context.decorator'; import { EventBatch, EventStreamReply } from '../event-stream/event-stream.interfaces'; @@ -47,7 +46,8 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { topic?: string; private eventListeners: EventListener[] = []; - private awaitingAck: WebSocketMessageWithId[] = []; + // Map of client IDs to all the messages for which we are awaiting an ack + private awaitingAck: Map = new Map(); private subscriptionNames = new Map(); private queue = Promise.resolve(); @@ -66,6 +66,11 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { handleConnection(client: WebSocketEx) { super.handleConnection(client); + + if (!this.awaitingAck.get(client.id)) { + this.awaitingAck.set(client.id, []); + } + client.on('message', async (message: string) => { const action = JSON.parse(message) as WebSocketActionBase; switch (action.type) { @@ -75,7 +80,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { break; case 'ack': const ackAction = action as WebSocketAck; - this.handleAck(ackAction); + this.handleAck(client, ackAction); } }); } @@ -135,12 +140,13 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { // Nack any messages that are inflight for that namespace const nackedMessageIds: Set = new Set(); this.awaitingAck - .filter(msg => msg.namespace === namespace) + ?.get(client.id) + ?.filter(msg => msg.namespace === namespace) .map(msg => { this.namespaceEventStreamSocket.get(namespace)?.nack(msg.batchNumber); nackedMessageIds.add(msg.id); }); - this.awaitingAck = this.awaitingAck.filter(msg => nackedMessageIds.has(msg.id)); + this.awaitingAck.delete(client.id); // If all clients for this namespace have disconnected, also close the connection to EVMConnect if (clientSet.size == 0) { @@ -194,8 +200,8 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { }, batchNumber: batch.batchNumber, }; - this.awaitingAck.push(message); - this.send(namespace, JSON.stringify(message)); + // this.awaitingAck.get(client.id)push(message); + this.send(namespace, message); } private async getSubscriptionName(ctx: Context, subId: string) { @@ -216,40 +222,49 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { return undefined; } - handleAck(data: WebSocketAck) { + handleAck(client: WebSocketEx, data: WebSocketAck) { if (data.id === undefined) { this.logger.error('Received malformed ack'); return; } - const inflight = this.awaitingAck.find(msg => msg.id === data.id); - this.logger.log(`Received ack ${data.id} inflight=${!!inflight}`); - if (this.namespaceEventStreamSocket !== undefined && inflight !== undefined) { - this.awaitingAck = this.awaitingAck.filter(msg => msg.id !== data.id); - if ( - // If nothing is left awaiting an ack - then we clearly need to ack - this.awaitingAck.length === 0 || - // Or if we have a batch number associated with this ID, then we can only ack if there - // are no other messages in-flight with the same batch number. - (inflight.batchNumber !== undefined && - !this.awaitingAck.find(msg => msg.batchNumber === inflight.batchNumber)) - ) { - this.logger.log(`In-flight batch complete (batchNumber=${inflight.batchNumber})`); - this.namespaceEventStreamSocket.get(inflight.namespace)?.ack(inflight.batchNumber); + let awaitingAck = this.awaitingAck.get(client.id); + + if (awaitingAck) { + const inflight = awaitingAck.find(msg => msg.id === data.id); + this.logger.log(`Received ack ${data.id} inflight=${!!inflight}`); + if (this.namespaceEventStreamSocket !== undefined && inflight !== undefined) { + // Remove the acked message id from the queue + awaitingAck = awaitingAck.filter(msg => msg.id !== data.id); + this.awaitingAck.set(client.id, awaitingAck); + if ( + // If nothing is left awaiting an ack - then we clearly need to ack + awaitingAck.length === 0 || + // Or if we have a batch number associated with this ID, then we can only ack if there + // are no other messages in-flight with the same batch number. + (inflight.batchNumber !== undefined && + !awaitingAck.filter(msg => msg.batchNumber === inflight.batchNumber)) + ) { + this.logger.log(`In-flight batch complete (batchNumber=${inflight.batchNumber})`); + this.namespaceEventStreamSocket.get(inflight.namespace)?.ack(inflight.batchNumber); + } } + } else { + this.logger.warn(`Received unrecognized ack from client ${client.id} for message ${data.id}`); } } - send(namespace, payload: string) { + send(namespace, payload: WebSocketMessageWithId) { const clients = this.namespaceClients.get(namespace); if (clients) { // Randomly select a connected client for this namespace to distribute load const selected = Math.floor(Math.random() * clients.size); let i = 0; - for (let client of clients.keys()) { + for (const client of clients.keys()) { if (i++ == selected) { + this.awaitingAck.get(client.id)?.push(payload); this.logger.verbose(`WS <= ${payload}`); - client.send(payload); + client.send(JSON.stringify(payload)); return; } } From 4d59b01fb4076318ee52b09276e42c9fe1137e56 Mon Sep 17 00:00:00 2001 From: Nicko Guyer Date: Mon, 26 Feb 2024 13:55:47 -0500 Subject: [PATCH 12/13] Fix eventstream cache Signed-off-by: Nicko Guyer --- src/eventstream-proxy/eventstream-proxy.base.ts | 1 - src/tokens/tokens.service.ts | 10 ++++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/eventstream-proxy/eventstream-proxy.base.ts b/src/eventstream-proxy/eventstream-proxy.base.ts index 0f95a69..0f0eb10 100644 --- a/src/eventstream-proxy/eventstream-proxy.base.ts +++ b/src/eventstream-proxy/eventstream-proxy.base.ts @@ -200,7 +200,6 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { }, batchNumber: batch.batchNumber, }; - // this.awaitingAck.get(client.id)push(message); this.send(namespace, message); } diff --git a/src/tokens/tokens.service.ts b/src/tokens/tokens.service.ts index 7d907be..3340967 100644 --- a/src/tokens/tokens.service.ts +++ b/src/tokens/tokens.service.ts @@ -72,7 +72,7 @@ export class TokensService { baseUrl: string; topic: string; - stream: EventStream; + streamCache: Map = new Map(); factoryAddress = ''; constructor( @@ -110,17 +110,19 @@ export class TokensService { } private async getStream(ctx: Context, namespace: string) { - const stream = this.stream; + let stream = this.streamCache.get(namespace); if (stream !== undefined) { return stream; } await this.migrationCheck(ctx); - this.logger.log('Creating stream with name ' + this.topic); - return this.eventstream.createOrUpdateStream( + this.logger.log('Creating stream with name ' + eventStreamName(this.topic, namespace)); + stream = await this.eventstream.createOrUpdateStream( ctx, eventStreamName(this.topic, namespace), this.topic, ); + this.streamCache.set(namespace, stream); + return stream; } /** From d6cbee7f260b3c50343d4c48fb4626e3d2a00fcf Mon Sep 17 00:00:00 2001 From: Nicko Guyer Date: Wed, 28 Feb 2024 15:49:16 -0500 Subject: [PATCH 13/13] PR feedback Signed-off-by: Nicko Guyer --- src/eventstream-proxy/eventstream-proxy.base.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/eventstream-proxy/eventstream-proxy.base.ts b/src/eventstream-proxy/eventstream-proxy.base.ts index 0f0eb10..2a772e2 100644 --- a/src/eventstream-proxy/eventstream-proxy.base.ts +++ b/src/eventstream-proxy/eventstream-proxy.base.ts @@ -74,13 +74,15 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { client.on('message', async (message: string) => { const action = JSON.parse(message) as WebSocketActionBase; switch (action.type) { - case 'start': + case 'start': { const startAction = action as WebSocketStart; this.startListening(client, startAction.namespace); break; - case 'ack': + } + case 'ack': { const ackAction = action as WebSocketAck; this.handleAck(client, ackAction); + } } }); }