diff --git a/.env b/.env index 2bbb638..4c89bcc 100644 --- a/.env +++ b/.env @@ -1,7 +1,6 @@ PORT=3000 ETHCONNECT_URL=http://127.0.0.1:5102 -ETHCONNECT_TOPIC=token -FACTORY_CONTRACT_ADDRESS= -AUTO_INIT=true +ETHCONNECT_TOPIC=tokens_0_0 +FACTORY_CONTRACT_ADDRESS="0xd85b3fba5552c48389607954e042e7313a9aec6e" USE_LEGACY_ERC20_SAMPLE=false USE_LEGACY_ERC721_SAMPLE=false diff --git a/.github/workflows/docker_main.yml b/.github/workflows/docker_main.yml index 4ee11c9..b0e34a5 100644 --- a/.github/workflows/docker_main.yml +++ b/.github/workflows/docker_main.yml @@ -9,7 +9,7 @@ jobs: docker: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Set build tag id: build_tag_generator diff --git a/.github/workflows/docker_release.yml b/.github/workflows/docker_release.yml index 185e1c4..d372750 100644 --- a/.github/workflows/docker_release.yml +++ b/.github/workflows/docker_release.yml @@ -8,7 +8,7 @@ jobs: docker: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 with: fetch-depth: 0 @@ -30,7 +30,7 @@ jobs: run: | echo ${{ secrets.GITHUB_TOKEN }} | docker login ghcr.io -u $GITHUB_ACTOR --password-stdin docker push ghcr.io/hyperledger/firefly-tokens-erc20-erc721:${GITHUB_REF##*/} - + - name: Push head tag run: | echo ${{ secrets.GITHUB_TOKEN }} | docker login ghcr.io -u $GITHUB_ACTOR --password-stdin diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d695676..993d271 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -8,22 +8,22 @@ jobs: test: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Use Node.js - uses: actions/setup-node@v2 + uses: actions/setup-node@v3 with: - node-version: '16.x' + node-version: '20.9.0' - run: npm ci - run: npm run test - run: npm run test:e2e solidity-test: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Use Node.js - uses: actions/setup-node@v2 + uses: actions/setup-node@v3 with: - node-version: '16.x' + node-version: '20.9.0' - run: npm ci working-directory: ./samples/solidity - run: npm run compile @@ -33,6 +33,6 @@ jobs: docker: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Docker build - run: docker build --tag ghcr.io/hyperledger/firefly-tokens-erc20-erc721 . \ No newline at end of file + run: docker build --tag ghcr.io/hyperledger/firefly-tokens-erc20-erc721 . diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..1377495 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,33 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Run Tests", + "runtimeExecutable": "npm", + "args": ["run", "test"], + "request": "launch", + "type": "node", + "outputCapture": "std" + }, + { + "name": "Run E2E Tests", + "runtimeExecutable": "npm", + "args": ["run", "test:e2e"], + "request": "launch", + "type": "node", + "outputCapture": "std" + }, + { + "type": "node", + "request": "launch", + "name": "Launch Program", + "skipFiles": ["/**"], + "program": "${file}", + "preLaunchTask": "tsc: build - tsconfig.json", + "outFiles": ["${workspaceFolder}/dist/**/*.js"] + } + ] +} diff --git a/.vscode/settings.json b/.vscode/settings.json index 108b279..b273c98 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,11 +1,9 @@ { "solidity.compileUsingRemoteVersion": "v0.6.12+commit.27d51765", "editor.codeActionsOnSave": { - "source.fixAll.eslint": true + "source.fixAll.eslint": "explicit" }, "eslint.validate": ["javascript"], "solidity.defaultCompiler": "remote", - "cSpell.words": [ - "fftm" - ] + "cSpell.words": ["eventstream", "fftm"] } diff --git a/Dockerfile b/Dockerfile index 2129032..126786d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM node:16-alpine3.15 as build +FROM node:20-alpine3.17 as build USER node WORKDIR /home/node ADD --chown=node:node package*.json ./ @@ -6,7 +6,7 @@ RUN npm install ADD --chown=node:node . . RUN npm run build -FROM node:16-alpine3.15 as solidity-build +FROM node:20-alpine3.17 as solidity-build RUN apk add python3 alpine-sdk USER node WORKDIR /home/node @@ -15,7 +15,7 @@ RUN npm install ADD --chown=node:node ./samples/solidity . RUN npx hardhat compile -FROM node:16-alpine3.15 +FROM node:20-alpine3.17 RUN apk add curl jq RUN mkdir -p /app/contracts/source \ && chgrp -R 0 /app/ \ @@ -31,6 +31,8 @@ COPY --from=solidity-build --chown=1001:0 /home/node/contracts /home/node/packag RUN npm install --production WORKDIR /app/contracts COPY --from=solidity-build --chown=1001:0 /home/node/artifacts/contracts/TokenFactory.sol/TokenFactory.json ./ +# We also need to keep copying it to the old location to maintain compatibility with the FireFly CLI +COPY --from=solidity-build --chown=1001:0 /home/node/artifacts/contracts/TokenFactory.sol/TokenFactory.json /home/node/contracts/ WORKDIR /app COPY --from=build --chown=1001:0 /home/node/dist ./dist COPY --from=build --chown=1001:0 /home/node/package.json /home/node/package-lock.json ./ diff --git a/src/app.module.ts b/src/app.module.ts index 898a236..c3411f2 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/event-stream/event-stream.interfaces.ts b/src/event-stream/event-stream.interfaces.ts index 01cd022..8b981b4 100644 --- a/src/event-stream/event-stream.interfaces.ts +++ b/src/event-stream/event-stream.interfaces.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/event-stream/event-stream.module.ts b/src/event-stream/event-stream.module.ts index 9087296..e677c8d 100644 --- a/src/event-stream/event-stream.module.ts +++ b/src/event-stream/event-stream.module.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/event-stream/event-stream.service.spec.ts b/src/event-stream/event-stream.service.spec.ts index 5266f16..ffe98dc 100644 --- a/src/event-stream/event-stream.service.spec.ts +++ b/src/event-stream/event-stream.service.spec.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/event-stream/event-stream.service.ts b/src/event-stream/event-stream.service.ts index 45a8cc1..9db31b1 100644 --- a/src/event-stream/event-stream.service.ts +++ b/src/event-stream/event-stream.service.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -20,9 +20,9 @@ import { AxiosRequestConfig } from 'axios'; import { lastValueFrom } from 'rxjs'; import WebSocket from 'ws'; import { FFRequestIDHeader } from '../request-context/constants'; -import { Context } from '../request-context/request-context.decorator'; +import { Context, newContext } from '../request-context/request-context.decorator'; import { IAbiMethod } from '../tokens/tokens.interfaces'; -import { getHttpRequestOptions, getWebsocketOptions } from '../utils'; +import { eventStreamName, getHttpRequestOptions, getWebsocketOptions } from '../utils'; import { Event, EventBatch, @@ -46,6 +46,7 @@ export class EventStreamSocket { constructor( private url: string, private topic: string, + private namespace: string, private username: string, private password: string, private handleEvents: (events: EventBatch) => void, @@ -67,7 +68,7 @@ export class EventStreamSocket { } else { this.logger.log('Event stream websocket connected'); } - this.produce({ type: 'listen', topic: this.topic }); + this.produce({ type: 'listen', topic: eventStreamName(this.topic, this.namespace) }); this.produce({ type: 'listenreplies' }); this.ping(); }) @@ -83,6 +84,7 @@ export class EventStreamSocket { } }) .on('message', (message: string) => { + this.logger.verbose(`WS => ${message}`); this.handleMessage(JSON.parse(message)); }) .on('pong', () => { @@ -109,7 +111,11 @@ export class EventStreamSocket { } ack(batchNumber: number | undefined) { - this.produce({ type: 'ack', topic: this.topic, batchNumber }); + this.produce({ type: 'ack', topic: eventStreamName(this.topic, this.namespace), batchNumber }); + } + + nack(batchNumber: number | undefined) { + this.produce({ type: 'nack', topic: eventStreamName(this.topic, this.namespace), batchNumber }); } close() { @@ -193,13 +199,27 @@ export class EventStreamService { batchSize: 50, batchTimeoutMS: 500, type: 'websocket', - websocket: { topic }, + websocket: { topic: name }, blockedReryDelaySec: 30, // intentional due to spelling error in ethconnect inputs: true, timestamps: true, }; const existingStreams = await this.getStreams(ctx); + + // Check to see if there is a deprecated stream that we should remove + this.logger.debug(`Checking for deprecated event steam with topic '${topic}'`); + const deprecatedStream = existingStreams.find(s => s.name === topic); + if (deprecatedStream) { + this.logger.log(`Purging deprecated eventstream '${deprecatedStream.id}'`); + await lastValueFrom( + this.http.delete( + new URL(`/eventstreams/${deprecatedStream.id}`, this.baseUrl).href, + this.requestOptions(ctx), + ), + ); + } + const stream = existingStreams.find(s => s.name === streamDetails.name); if (stream) { const patchedStreamRes = await lastValueFrom( @@ -331,15 +351,20 @@ export class EventStreamService { return true; } - connect( + async connect( url: string, topic: string, + namespace: string, handleEvents: (events: EventBatch) => void, handleReceipt: (receipt: EventStreamReply) => void, ) { + const name = eventStreamName(topic, namespace); + await this.createOrUpdateStream(newContext(), name, topic); + return new EventStreamSocket( url, topic, + namespace, this.username, this.password, handleEvents, diff --git a/src/eventstream-proxy/eventstream-proxy.base.ts b/src/eventstream-proxy/eventstream-proxy.base.ts index e708fbd..2a772e2 100644 --- a/src/eventstream-proxy/eventstream-proxy.base.ts +++ b/src/eventstream-proxy/eventstream-proxy.base.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -15,19 +15,19 @@ // limitations under the License. import { Logger } from '@nestjs/common'; -import { MessageBody, SubscribeMessage } from '@nestjs/websockets'; import { v4 as uuidv4 } from 'uuid'; import { Context, newContext } from '../request-context/request-context.decorator'; import { EventBatch, EventStreamReply } from '../event-stream/event-stream.interfaces'; import { EventStreamService, EventStreamSocket } from '../event-stream/event-stream.service'; import { + WebSocketAck, + WebSocketActionBase, WebSocketEventsBase, WebSocketEx, WebSocketMessage, + WebSocketStart, } from '../websocket-events/websocket-events.base'; import { - AckMessageData, - ConnectionListener, EventListener, WebSocketMessageBatchData, WebSocketMessageWithId, @@ -40,20 +40,20 @@ import { * @WebSocketGateway({ path: '/api/stream' }) */ export abstract class EventStreamProxyBase extends WebSocketEventsBase { - socket?: EventStreamSocket; + namespaceClients: Map> = new Map(); + namespaceEventStreamSocket: Map = new Map(); url?: string; topic?: string; - private connectListeners: ConnectionListener[] = []; private eventListeners: EventListener[] = []; - private awaitingAck: WebSocketMessageWithId[] = []; - private currentClient: WebSocketEx | undefined; + // Map of client IDs to all the messages for which we are awaiting an ack + private awaitingAck: Map = new Map(); private subscriptionNames = new Map(); private queue = Promise.resolve(); constructor( protected readonly logger: Logger, - protected eventstream: EventStreamService, + protected eventStreamService: EventStreamService, requireAuth = false, ) { super(logger, requireAuth); @@ -66,66 +66,104 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { handleConnection(client: WebSocketEx) { super.handleConnection(client); - if (this.server.clients.size === 1) { - this.logger.log(`Initializing event stream proxy`); - Promise.all(this.connectListeners.map(l => l.onConnect())) - .then(() => { - this.setCurrentClient(client); - this.startListening(); - }) - .catch(err => { - this.logger.error(`Error initializing event stream proxy: ${err}`); - }); + + if (!this.awaitingAck.get(client.id)) { + this.awaitingAck.set(client.id, []); } + + client.on('message', async (message: string) => { + const action = JSON.parse(message) as WebSocketActionBase; + switch (action.type) { + case 'start': { + const startAction = action as WebSocketStart; + this.startListening(client, startAction.namespace); + break; + } + case 'ack': { + const ackAction = action as WebSocketAck; + this.handleAck(client, ackAction); + } + } + }); } private queueTask(task: () => void) { this.queue = this.queue.finally(task); } - private startListening() { + private async startListening(client: WebSocketEx, namespace: string) { if (this.url === undefined || this.topic === undefined) { return; } - this.socket = this.eventstream.connect( - this.url, - this.topic, - events => { - this.queueTask(() => this.processEvents(events)); - }, - receipt => { - this.broadcast('receipt', receipt); - }, - ); + try { + if (!this.namespaceEventStreamSocket.has(namespace)) { + const eventStreamSocket = await this.eventStreamService.connect( + this.url, + this.topic, + namespace, + events => { + this.queueTask(() => this.processEvents(events, namespace)); + }, + receipt => { + this.broadcast('receipt', receipt); + }, + ); + this.namespaceEventStreamSocket.set(namespace, eventStreamSocket); + } + let clientSet = this.namespaceClients.get(namespace); + if (!clientSet) { + clientSet = new Set(); + } + clientSet.add(client); + this.namespaceClients.set(namespace, clientSet); + + // ack the start command + client.send( + JSON.stringify({ + event: 'started', + data: { + namespace: namespace, + }, + }), + ); + this.logger.debug(`Started namespace '${namespace}'`); + } catch (e) { + this.logger.error(`Error connecting to event stream websocket: ${e.message}`); + } } handleDisconnect(client: WebSocketEx) { super.handleDisconnect(client); - if (this.server.clients.size === 0) { - this.stopListening(); - } else if (client.id === this.currentClient?.id) { - for (const newClient of this.server.clients) { - this.setCurrentClient(newClient as WebSocketEx); - break; - } - } - } - private stopListening() { - this.socket?.close(); - this.socket = undefined; - this.currentClient = undefined; - } + // Iterate over all the namespaces this client was subscribed to + this.namespaceClients.forEach((clientSet, namespace) => { + clientSet.delete(client); + + // Nack any messages that are inflight for that namespace + const nackedMessageIds: Set = new Set(); + this.awaitingAck + ?.get(client.id) + ?.filter(msg => msg.namespace === namespace) + .map(msg => { + this.namespaceEventStreamSocket.get(namespace)?.nack(msg.batchNumber); + nackedMessageIds.add(msg.id); + }); + this.awaitingAck.delete(client.id); - addConnectionListener(listener: ConnectionListener) { - this.connectListeners.push(listener); + // If all clients for this namespace have disconnected, also close the connection to EVMConnect + if (clientSet.size == 0) { + this.namespaceEventStreamSocket.get(namespace)?.close(); + this.namespaceEventStreamSocket.delete(namespace); + this.namespaceClients.delete(namespace); + } + }); } addEventListener(listener: EventListener) { this.eventListeners.push(listener); } - private async processEvents(batch: EventBatch) { + private async processEvents(batch: EventBatch, namespace: string) { const messages: WebSocketMessage[] = []; const eventHandlers: Promise[] = []; for (const event of batch.events) { @@ -156,6 +194,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { } } const message: WebSocketMessageWithId = { + namespace: namespace, id: uuidv4(), event: 'batch', data: { @@ -163,8 +202,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { }, batchNumber: batch.batchNumber, }; - this.awaitingAck.push(message); - this.currentClient?.send(JSON.stringify(message)); + this.send(namespace, message); } private async getSubscriptionName(ctx: Context, subId: string) { @@ -174,7 +212,7 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { } try { - const sub = await this.eventstream.getSubscription(ctx, subId); + const sub = await this.eventStreamService.getSubscription(ctx, subId); if (sub !== undefined) { this.subscriptionNames.set(subId, sub.name); return sub.name; @@ -185,34 +223,51 @@ export abstract class EventStreamProxyBase extends WebSocketEventsBase { return undefined; } - private setCurrentClient(client: WebSocketEx) { - this.currentClient = client; - for (const message of this.awaitingAck) { - this.currentClient.send(JSON.stringify(message)); - } - } - - @SubscribeMessage('ack') - handleAck(@MessageBody() data: AckMessageData) { + handleAck(client: WebSocketEx, data: WebSocketAck) { if (data.id === undefined) { this.logger.error('Received malformed ack'); return; } - const inflight = this.awaitingAck.find(msg => msg.id === data.id); - this.logger.log(`Received ack ${data.id} inflight=${!!inflight}`); - if (this.socket !== undefined && inflight !== undefined) { - this.awaitingAck = this.awaitingAck.filter(msg => msg.id !== data.id); - if ( - // If nothing is left awaiting an ack - then we clearly need to ack - this.awaitingAck.length === 0 || - // Or if we have a batch number associated with this ID, then we can only ack if there - // are no other messages in-flight with the same batch number. - (inflight.batchNumber !== undefined && - !this.awaitingAck.find(msg => msg.batchNumber === inflight.batchNumber)) - ) { - this.logger.log(`In-flight batch complete (batchNumber=${inflight.batchNumber})`); - this.socket.ack(inflight.batchNumber); + let awaitingAck = this.awaitingAck.get(client.id); + + if (awaitingAck) { + const inflight = awaitingAck.find(msg => msg.id === data.id); + this.logger.log(`Received ack ${data.id} inflight=${!!inflight}`); + if (this.namespaceEventStreamSocket !== undefined && inflight !== undefined) { + // Remove the acked message id from the queue + awaitingAck = awaitingAck.filter(msg => msg.id !== data.id); + this.awaitingAck.set(client.id, awaitingAck); + if ( + // If nothing is left awaiting an ack - then we clearly need to ack + awaitingAck.length === 0 || + // Or if we have a batch number associated with this ID, then we can only ack if there + // are no other messages in-flight with the same batch number. + (inflight.batchNumber !== undefined && + !awaitingAck.filter(msg => msg.batchNumber === inflight.batchNumber)) + ) { + this.logger.log(`In-flight batch complete (batchNumber=${inflight.batchNumber})`); + this.namespaceEventStreamSocket.get(inflight.namespace)?.ack(inflight.batchNumber); + } + } + } else { + this.logger.warn(`Received unrecognized ack from client ${client.id} for message ${data.id}`); + } + } + + send(namespace, payload: WebSocketMessageWithId) { + const clients = this.namespaceClients.get(namespace); + if (clients) { + // Randomly select a connected client for this namespace to distribute load + const selected = Math.floor(Math.random() * clients.size); + let i = 0; + for (const client of clients.keys()) { + if (i++ == selected) { + this.awaitingAck.get(client.id)?.push(payload); + this.logger.verbose(`WS <= ${payload}`); + client.send(JSON.stringify(payload)); + return; + } } } } diff --git a/src/eventstream-proxy/eventstream-proxy.gateway.spec.ts b/src/eventstream-proxy/eventstream-proxy.gateway.spec.ts index becfe11..a650b4a 100644 --- a/src/eventstream-proxy/eventstream-proxy.gateway.spec.ts +++ b/src/eventstream-proxy/eventstream-proxy.gateway.spec.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/eventstream-proxy/eventstream-proxy.gateway.ts b/src/eventstream-proxy/eventstream-proxy.gateway.ts index 90e1a2b..c4e302d 100644 --- a/src/eventstream-proxy/eventstream-proxy.gateway.ts +++ b/src/eventstream-proxy/eventstream-proxy.gateway.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/eventstream-proxy/eventstream-proxy.interfaces.ts b/src/eventstream-proxy/eventstream-proxy.interfaces.ts index 20bfca3..42d0b70 100644 --- a/src/eventstream-proxy/eventstream-proxy.interfaces.ts +++ b/src/eventstream-proxy/eventstream-proxy.interfaces.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -30,14 +30,11 @@ export interface EventListener { } export interface WebSocketMessageWithId extends WebSocketMessage { + namespace: string; id: string; batchNumber: number | undefined; } -export interface AckMessageData { - id?: string; -} - export interface WebSocketMessageBatchData { events: WebSocketMessage[]; } diff --git a/src/eventstream-proxy/eventstream-proxy.module.ts b/src/eventstream-proxy/eventstream-proxy.module.ts index 7a96d27..158a5e5 100644 --- a/src/eventstream-proxy/eventstream-proxy.module.ts +++ b/src/eventstream-proxy/eventstream-proxy.module.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/health/health.module.ts b/src/health/health.module.ts index 299008e..ba2d9e5 100644 --- a/src/health/health.module.ts +++ b/src/health/health.module.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/main.ts b/src/main.ts index 549da08..162fc50 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -78,7 +78,6 @@ async function bootstrap() { const ethConnectUrl = config.get('ETHCONNECT_URL', ''); const fftmUrl = config.get('FFTM_URL', ''); // Optional. Currently used only for SendTransaction API calls when set const topic = config.get('ETHCONNECT_TOPIC', 'tokenERC20ERC721'); - const autoInit = config.get('AUTO_INIT', 'true'); const username = config.get('ETHCONNECT_USERNAME', ''); const password = config.get('ETHCONNECT_PASSWORD', ''); const factoryAddress = config.get('FACTORY_CONTRACT_ADDRESS', ''); @@ -107,10 +106,6 @@ async function bootstrap() { .configure(ethConnectUrl, fftmUrl, username, password, passthroughHeaders, blockchainRetryCfg); app.get(AbiMapperService).configure(legacyERC20, legacyERC721); - if (autoInit.toLowerCase() !== 'false') { - await app.get(TokensService).init(newContext()); - } - const port = config.get('PORT', 3000); console.log(`Listening on port ${port}`); await app.listen(port); diff --git a/src/request-context/request-id.middleware.ts b/src/request-context/request-id.middleware.ts index 7ad509b..a614f68 100644 --- a/src/request-context/request-id.middleware.ts +++ b/src/request-context/request-id.middleware.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/request-logging.interceptor.spec.ts b/src/request-logging.interceptor.spec.ts index fe6bd53..31a8c6a 100644 --- a/src/request-logging.interceptor.spec.ts +++ b/src/request-logging.interceptor.spec.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/request-logging.interceptor.ts b/src/request-logging.interceptor.ts index fdbe0a5..e78c7ec 100644 --- a/src/request-logging.interceptor.ts +++ b/src/request-logging.interceptor.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/tokens/abimapper.service.ts b/src/tokens/abimapper.service.ts index b5d0340..84903f2 100644 --- a/src/tokens/abimapper.service.ts +++ b/src/tokens/abimapper.service.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/tokens/blockchain.service.ts b/src/tokens/blockchain.service.ts index e622dab..505a6a0 100644 --- a/src/tokens/blockchain.service.ts +++ b/src/tokens/blockchain.service.ts @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/tokens/erc165.ts b/src/tokens/erc165.ts index 62f4915..086d056 100644 --- a/src/tokens/erc165.ts +++ b/src/tokens/erc165.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/tokens/erc20.ts b/src/tokens/erc20.ts index 7905152..995aa61 100644 --- a/src/tokens/erc20.ts +++ b/src/tokens/erc20.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/tokens/erc721.ts b/src/tokens/erc721.ts index de28215..bbb5195 100644 --- a/src/tokens/erc721.ts +++ b/src/tokens/erc721.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/tokens/tokens.controller.spec.ts b/src/tokens/tokens.controller.spec.ts index 0435811..df698c7 100644 --- a/src/tokens/tokens.controller.spec.ts +++ b/src/tokens/tokens.controller.spec.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/tokens/tokens.controller.ts b/src/tokens/tokens.controller.ts index 2daeeb2..493a3ce 100644 --- a/src/tokens/tokens.controller.ts +++ b/src/tokens/tokens.controller.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -36,13 +36,16 @@ import { TokensService } from './tokens.service'; @Controller() export class TokensController { - constructor(private service: TokensService, private blockchain: BlockchainConnectorService) {} + constructor( + private service: TokensService, + private blockchain: BlockchainConnectorService, + ) {} @Post('init') @HttpCode(204) @ApiOperation({ summary: 'Perform one-time initialization (if not auto-initialized)' }) init(@RequestContext() ctx: Context) { - return this.service.init(ctx); + // Do nothing. Endpoint retained for backwards compatibility with older tooling. } @Post('createpool') diff --git a/src/tokens/tokens.interfaces.ts b/src/tokens/tokens.interfaces.ts index d174053..ea6a248 100644 --- a/src/tokens/tokens.interfaces.ts +++ b/src/tokens/tokens.interfaces.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -139,6 +139,10 @@ export class TokenPoolConfig { } export class TokenPool { + @ApiProperty() + @IsNotEmpty() + namespace: string; + @ApiProperty({ enum: TokenType }) @IsEnum(TokenType) type: TokenType; @@ -213,6 +217,10 @@ export class BlockchainEvent { } export class TokenPoolActivate { + @ApiProperty() + @IsNotEmpty() + namespace: string; + @ApiProperty() @IsNotEmpty() poolLocator: string; @@ -227,6 +235,10 @@ export class TokenPoolActivate { } export class TokenPoolDeactivate { + @ApiProperty() + @IsNotEmpty() + namespace: string; + @ApiProperty() @IsNotEmpty() poolLocator: string; @@ -271,6 +283,10 @@ export class CheckInterfaceResponse implements TokenAbi { } export class TokenTransfer { + @ApiProperty() + @IsNotEmpty() + namespace: string; + @ApiProperty() @IsNotEmpty() poolLocator: string; @@ -330,6 +346,10 @@ export class TokenApprovalConfig { } export class TokenApproval { + @ApiProperty() + @IsNotEmpty() + namespace: string; + @ApiProperty() @IsNotEmpty() poolLocator: string; @@ -366,6 +386,10 @@ export class TokenApproval { // Websocket notifications class tokenEventBase { + @ApiProperty() + @IsNotEmpty() + namespace: string; + @ApiProperty() poolLocator: string; diff --git a/src/tokens/tokens.listener.ts b/src/tokens/tokens.listener.ts index 46d19de..c8d847c 100644 --- a/src/tokens/tokens.listener.ts +++ b/src/tokens/tokens.listener.ts @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -55,7 +55,10 @@ const approvalForAllEventSignature = 'ApprovalForAll(address,address,bool)'; export class TokenListener implements EventListener { private readonly logger = new Logger(TokenListener.name); - constructor(private mapper: AbiMapperService, private blockchain: BlockchainConnectorService) {} + constructor( + private mapper: AbiMapperService, + private blockchain: BlockchainConnectorService, + ) {} async onEvent(subName: string, event: Event) { const signature = this.trimEventSignature(event.signature); diff --git a/src/tokens/tokens.module.ts b/src/tokens/tokens.module.ts index e6094b6..58a8a2d 100644 --- a/src/tokens/tokens.module.ts +++ b/src/tokens/tokens.module.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/tokens/tokens.service.spec.ts b/src/tokens/tokens.service.spec.ts index 8f70a35..18dc5d2 100644 --- a/src/tokens/tokens.service.spec.ts +++ b/src/tokens/tokens.service.spec.ts @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -230,6 +230,7 @@ describe('TokensService', () => { useValue: { addConnectionListener: jest.fn(), addEventListener: jest.fn(), + configure: jest.fn(), }, }, ], @@ -263,6 +264,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenPool = { + namespace: 'ns1', type: TokenType.FUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -276,6 +278,7 @@ describe('TokensService', () => { await service.createPool(ctx, request).then(resp => { expect(resp).toEqual({ + namespace: 'ns1', data: `{"tx":${TX}}`, poolLocator: ERC20_NO_DATA_POOL_ID, standard: 'ERC20', @@ -296,6 +299,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenPoolActivate = { + namespace: 'ns1', poolLocator: ERC20_NO_DATA_POOL_ID, poolData: 'ns1', }; @@ -306,6 +310,7 @@ describe('TokensService', () => { }; const response: TokenPoolEvent = { + namespace: 'ns1', poolData: 'ns1', poolLocator: ERC20_NO_DATA_POOL_ID, standard: 'ERC20', @@ -350,6 +355,7 @@ describe('TokensService', () => { }; const request: TokenMint = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_NO_DATA_POOL_ID, @@ -385,6 +391,7 @@ describe('TokensService', () => { }; const request: TokenTransfer = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_NO_DATA_POOL_ID, @@ -421,6 +428,7 @@ describe('TokensService', () => { }; const request: TokenBurn = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_NO_DATA_POOL_ID, @@ -455,6 +463,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenPool = { + namespace: 'ns1', type: TokenType.FUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -468,6 +477,7 @@ describe('TokensService', () => { await service.createPool(ctx, request).then(resp => { expect(resp).toEqual({ + namespace: 'ns1', data: `{"tx":${TX}}`, poolLocator: ERC20_WITH_DATA_POOL_ID, standard: 'ERC20', @@ -488,6 +498,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenPool = { + namespace: 'ns1', type: TokenType.FUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -501,6 +512,7 @@ describe('TokensService', () => { await service.createPool(ctx, request).then(resp => { expect(resp).toEqual({ + namespace: 'ns1', data: `{"tx":${TX}}`, poolLocator: ERC20_WITH_DATA_POOL_ID, standard: 'ERC20', @@ -521,6 +533,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenPoolActivate = { + namespace: 'ns1', poolLocator: ERC20_WITH_DATA_POOL_ID, poolData: 'ns1', }; @@ -531,6 +544,7 @@ describe('TokensService', () => { }; const response: TokenPoolEvent = { + namespace: 'ns1', poolData: 'ns1', poolLocator: ERC20_WITH_DATA_POOL_ID, standard: 'ERC20', @@ -575,6 +589,7 @@ describe('TokensService', () => { }; const request: TokenMint = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_WITH_DATA_POOL_ID, @@ -610,6 +625,7 @@ describe('TokensService', () => { }; const request: TokenTransfer = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_WITH_DATA_POOL_ID, @@ -646,6 +662,7 @@ describe('TokensService', () => { }; const request: TokenBurn = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_WITH_DATA_POOL_ID, @@ -680,6 +697,7 @@ describe('TokensService', () => { it('should return ERC721NoData pool details successfully', async () => { const request: TokenPool = { + namespace: 'ns1', type: TokenType.NONFUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -695,6 +713,7 @@ describe('TokensService', () => { await service.createPool(ctx, request).then(resp => { expect(resp).toEqual({ + namespace: 'ns1', data: `{"tx":${TX}}`, poolLocator: ERC721_NO_DATA_POOL_ID, standard: 'ERC721', @@ -715,6 +734,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenPoolActivate = { + namespace: 'ns1', poolLocator: ERC721_NO_DATA_POOL_ID, poolData: 'ns1', }; @@ -725,6 +745,7 @@ describe('TokensService', () => { }; const response: TokenPoolEvent = { + namespace: 'ns1', poolData: 'ns1', poolLocator: ERC721_NO_DATA_POOL_ID, standard: 'ERC721', @@ -767,6 +788,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenMint = { + namespace: 'ns1', amount: '2', signer: IDENTITY, poolLocator: ERC721_NO_DATA_POOL_ID, @@ -784,6 +806,7 @@ describe('TokensService', () => { }; const request: TokenMint = { + namespace: 'ns1', signer: IDENTITY, poolLocator: ERC721_NO_DATA_POOL_ID, to: '0x123', @@ -818,6 +841,7 @@ describe('TokensService', () => { }; const request: TokenTransfer = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_NO_DATA_POOL_ID, @@ -856,6 +880,7 @@ describe('TokensService', () => { }; const request: TokenBurn = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_NO_DATA_POOL_ID, @@ -890,6 +915,7 @@ describe('TokensService', () => { it('should return ERC721WithData pool details successfully - implicit withData config', async () => { const request: TokenPool = { + namespace: 'ns1', type: TokenType.NONFUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -904,6 +930,7 @@ describe('TokensService', () => { await service.createPool(ctx, request).then(resp => { expect(resp).toEqual({ + namespace: 'ns1', data: `{"tx":${TX}}`, poolLocator: ERC721_WITH_DATA_POOL_ID, standard: 'ERC721', @@ -924,6 +951,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenPoolActivate = { + namespace: 'ns1', poolLocator: ERC721_WITH_DATA_POOL_ID, poolData: 'ns1', }; @@ -934,6 +962,7 @@ describe('TokensService', () => { }; const response: TokenPoolEvent = { + namespace: 'ns1', poolData: 'ns1', poolLocator: ERC721_WITH_DATA_POOL_ID, standard: 'ERC721', @@ -975,6 +1004,7 @@ describe('TokensService', () => { it('should not mint ERC721WithData token due to invalid amount', async () => { const ctx = newContext(); const request: TokenMint = { + namespace: 'ns1', amount: '2', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_POOL_ID, @@ -992,6 +1022,7 @@ describe('TokensService', () => { }; const request: TokenMint = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_V1_POOL_ID, @@ -1030,6 +1061,7 @@ describe('TokensService', () => { }; const request: TokenMint = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_V1_POOL_ID, @@ -1069,6 +1101,7 @@ describe('TokensService', () => { }; const request: TokenMint = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_V1_POOL_ID, @@ -1112,6 +1145,7 @@ describe('TokensService', () => { }; const request: TokenMint = { + namespace: 'ns1', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_POOL_ID, to: '0x123', @@ -1149,6 +1183,7 @@ describe('TokensService', () => { }; const request: TokenTransfer = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_POOL_ID, @@ -1185,6 +1220,7 @@ describe('TokensService', () => { }; const request: TokenBurn = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_POOL_ID, @@ -1219,6 +1255,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenPoolActivate = { + namespace: 'ns1', poolLocator: 'address=0x123&standard=notAStandard&type=fungible', poolData: 'ns1', }; @@ -1231,6 +1268,7 @@ describe('TokensService', () => { const ctx = newContext(); const request: TokenPoolActivate = { + namespace: 'ns1', poolLocator: 'address=0x123&type=fungible', poolData: 'ns1', }; diff --git a/src/tokens/tokens.service.ts b/src/tokens/tokens.service.ts index b82c961..3340967 100644 --- a/src/tokens/tokens.service.ts +++ b/src/tokens/tokens.service.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -18,7 +18,7 @@ import { BadRequestException, Injectable, Logger, NotFoundException } from '@nes import { EventStream } from '../event-stream/event-stream.interfaces'; import { EventStreamService } from '../event-stream/event-stream.service'; import { EventStreamProxyGateway } from '../eventstream-proxy/eventstream-proxy.gateway'; -import { Context, newContext } from '../request-context/request-context.decorator'; +import { Context } from '../request-context/request-context.decorator'; import { AsyncResponse, CheckInterfaceRequest, @@ -64,6 +64,7 @@ import { Symbol as ERC721Symbol, DynamicMethods as ERC721Methods, } from './erc721'; +import { eventStreamName } from '../utils'; @Injectable() export class TokensService { @@ -71,7 +72,7 @@ export class TokensService { baseUrl: string; topic: string; - stream: EventStream; + streamCache: Map = new Map(); factoryAddress = ''; constructor( @@ -85,35 +86,21 @@ export class TokensService { this.baseUrl = baseUrl; this.topic = topic; this.factoryAddress = factoryAddress.toLowerCase(); - this.proxy.addConnectionListener(this); this.proxy.addEventListener(new TokenListener(this.mapper, this.blockchain)); - } - - async onConnect() { const wsUrl = new URL('/ws', this.baseUrl.replace('http', 'ws')).href; - const stream = await this.getStream(newContext()); - this.proxy.configure(wsUrl, stream.name); - } - - /** - * One-time initialization of event stream and base subscription. - */ - async init(ctx: Context) { - this.stream = await this.getStream(ctx); - if (this.factoryAddress !== '') { - await this.createFactorySubscription(ctx, this.factoryAddress); - } + this.proxy.configure(wsUrl, this.topic); } - private async createFactorySubscription(ctx: Context, address: string) { + private async getOrCreateFactorySubscription(ctx: Context, address: string, namespace: string) { const eventABI = this.mapper.getCreateEvent(); const methodABI = this.mapper.getCreateMethod(); - if (eventABI !== undefined && methodABI !== undefined) { + const stream = await this.getStream(ctx, namespace); + if (eventABI !== undefined && methodABI !== undefined && stream !== undefined) { await this.eventstream.getOrCreateSubscription( ctx, this.baseUrl, eventABI, - this.stream.id, + stream.id, packSubscriptionName(address, eventABI.name), address, [methodABI], @@ -122,15 +109,20 @@ export class TokensService { } } - private async getStream(ctx: Context) { - const stream = this.stream; + private async getStream(ctx: Context, namespace: string) { + let stream = this.streamCache.get(namespace); if (stream !== undefined) { return stream; } await this.migrationCheck(ctx); - this.logger.log('Creating stream with name ' + this.topic); - this.stream = await this.eventstream.createOrUpdateStream(ctx, this.topic, this.topic); - return this.stream; + this.logger.log('Creating stream with name ' + eventStreamName(this.topic, namespace)); + stream = await this.eventstream.createOrUpdateStream( + ctx, + eventStreamName(this.topic, namespace), + this.topic, + ); + this.streamCache.set(namespace, stream); + return stream; } /** @@ -276,6 +268,7 @@ export class TokensService { } return { + namespace: dto.namespace, data: dto.data, poolLocator: packPoolLocator(poolLocator), standard: dto.type === TokenType.FUNGIBLE ? 'ERC20' : 'ERC721', @@ -296,7 +289,7 @@ export class TokensService { address: string, dto: TokenPool, ): Promise { - await this.createFactorySubscription(ctx, address); + await this.getOrCreateFactorySubscription(ctx, address, dto.namespace); const { method, params } = await this.mapper.getCreateMethodAndParams(ctx, address, dto); const response = await this.blockchain.sendTransaction( ctx, @@ -347,7 +340,7 @@ export class TokensService { poolLocator.type === TokenType.FUNGIBLE, ); const eventAbis = this.getEventAbis(poolLocator); - const stream = await this.getStream(ctx); + const stream = await this.getStream(ctx, dto.namespace); const promises = [ this.eventstream.getOrCreateSubscription( @@ -389,6 +382,7 @@ export class TokensService { const poolInfo = await this.queryPool(ctx, poolLocator); const tokenPoolEvent: TokenPoolEvent = { + namespace: dto.namespace, poolData: dto.poolData, poolLocator: dto.poolLocator, standard: poolLocator.type === TokenType.FUNGIBLE ? 'ERC20' : 'ERC721', @@ -412,7 +406,7 @@ export class TokensService { throw new BadRequestException('Invalid pool locator'); } - const stream = await this.getStream(ctx); + const stream = await this.getStream(ctx, dto.namespace); const eventAbis = this.getEventAbis(poolLocator); const promises = [ this.eventstream.deleteSubscriptionByName( diff --git a/src/tokens/tokens.util.spec.ts b/src/tokens/tokens.util.spec.ts index c4d6edc..47cffcf 100644 --- a/src/tokens/tokens.util.spec.ts +++ b/src/tokens/tokens.util.spec.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/tokens/tokens.util.ts b/src/tokens/tokens.util.ts index 7bbb56d..9836cd4 100644 --- a/src/tokens/tokens.util.ts +++ b/src/tokens/tokens.util.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/src/utils.ts b/src/utils.ts index 2aa5db6..fbcadbc 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -69,3 +69,7 @@ export const getNestOptions = (): NestApplicationOptions => { } return options; }; + +export const eventStreamName = (topic: string, namespace: string) => { + return `${topic}/${namespace}`; +}; diff --git a/src/websocket-events/websocket-events.base.ts b/src/websocket-events/websocket-events.base.ts index b25efa7..ebc5bfe 100644 --- a/src/websocket-events/websocket-events.base.ts +++ b/src/websocket-events/websocket-events.base.ts @@ -1,4 +1,4 @@ -// Copyright © 2022 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -50,6 +50,18 @@ export interface WebSocketMessage { data: any; } +export interface WebSocketActionBase { + type: 'start' | 'ack' | 'nack' | 'protocol_error'; +} + +export interface WebSocketStart extends WebSocketActionBase { + namespace: string; +} + +export interface WebSocketAck extends WebSocketActionBase { + id: string; +} + /** * Base class for websocket gateways. * @@ -61,7 +73,10 @@ export abstract class WebSocketEventsBase { @WebSocketServer() server: Server; - constructor(protected readonly logger: Logger, private requireAuth = false) {} + constructor( + protected readonly logger: Logger, + private requireAuth = false, + ) {} afterInit(server: Server) { const interval = setInterval(() => this.ping(), PING_INTERVAL); @@ -86,6 +101,9 @@ export abstract class WebSocketEventsBase client.on('error', err => { this.logger.log(`WebSocket ${client.id}: error: ${err}`); }); + client.on('message', msg => { + this.logger.verbose(`WS => ${msg}`); + }); } handleDisconnect(client: WebSocketEx) { diff --git a/test/app.e2e-context.ts b/test/app.e2e-context.ts index 49a1f52..d17b7f4 100644 --- a/test/app.e2e-context.ts +++ b/test/app.e2e-context.ts @@ -27,16 +27,28 @@ export class TestContext { }; eventHandler: (events: EventBatch) => void; receiptHandler: (receipt: EventStreamReply) => void; + connected: Promise; + private resolveConnected: () => void; + private rejectConnected: () => void; + + resetConnectedPromise() { + this.connected = new Promise((resolve, reject) => { + this.resolveConnected = resolve; + this.rejectConnected = reject; + }); + } eventstream = { connect: ( url: string, topic: string, + namespace: string, handleEvents: (events: EventBatch) => void, handleReceipt: (receipt: EventStreamReply) => void, ) => { this.eventHandler = handleEvents; this.receiptHandler = handleReceipt; + this.resolveConnected(); }, getStreams: jest.fn(), @@ -85,6 +97,7 @@ export class TestContext { (this.app.getHttpServer() as Server).listen(); this.server = request(this.app.getHttpServer()); + this.resetConnectedPromise(); } async end() { diff --git a/test/app.e2e-spec.ts b/test/app.e2e-spec.ts index bd733db..f2523bc 100644 --- a/test/app.e2e-spec.ts +++ b/test/app.e2e-spec.ts @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // diff --git a/test/suites/erc20.ts b/test/suites/erc20.ts index 65568ee..8c0ef83 100644 --- a/test/suites/erc20.ts +++ b/test/suites/erc20.ts @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -89,10 +89,10 @@ export default (context: TestContext) => { }), ); }; - describe('ERC20WithData', () => { it('Create pool - unrecognized fields', async () => { const request = { + namespace: 'ns1', type: TokenType.FUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -102,8 +102,8 @@ export default (context: TestContext) => { symbol: SYMBOL, isBestPool: true, // will be stripped but will not cause an error }; - const expectedResponse = expect.objectContaining({ + namespace: 'ns1', data: `{"tx":${TX}}`, poolLocator: `address=${CONTRACT_ADDRESS}&schema=${ERC20_WITH_DATA_SCHEMA}&type=${TokenType.FUNGIBLE}`, standard: 'ERC20', @@ -116,16 +116,14 @@ export default (context: TestContext) => { schema: ERC20_WITH_DATA_SCHEMA, }, }); - mockPoolQuery(true); context.http.get = jest.fn(() => new FakeObservable(expectedResponse)); - const response = await context.server.post('/createpool').send(request).expect(200); expect(response.body).toEqual(expectedResponse); }); - it('Create pool - invalid type', async () => { const request: TokenPool = { + namespace: 'ns1', type: 'funkible' as TokenType, requestId: REQUEST, signer: IDENTITY, @@ -134,19 +132,17 @@ export default (context: TestContext) => { name: NAME, symbol: SYMBOL, }; - const response = { statusCode: 400, message: ['type must be a valid enum value'], error: 'Bad Request', }; - context.http.post = jest.fn(() => new FakeObservable(response)); await context.server.post('/createpool').send(request).expect(400).expect(response); }); - it('Create pool - correct fields', async () => { const request: TokenPool = { + namespace: 'ns1', type: TokenType.FUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -155,8 +151,8 @@ export default (context: TestContext) => { name: NAME, symbol: SYMBOL, }; - const expectedResponse = expect.objectContaining({ + namespace: 'ns1', data: `{"tx":${TX}}`, poolLocator: `address=${CONTRACT_ADDRESS}&schema=${ERC20_WITH_DATA_SCHEMA}&type=${TokenType.FUNGIBLE}`, standard: 'ERC20', @@ -169,22 +165,19 @@ export default (context: TestContext) => { schema: ERC20_WITH_DATA_SCHEMA, }, }); - mockPoolQuery(true); context.http.get = jest.fn(() => new FakeObservable(expectedResponse)); - const response = await context.server.post('/createpool').send(request).expect(200); expect(response.body).toEqual(expectedResponse); }); - it('Mint token', async () => { const request: TokenMint = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_WITH_DATA_POOL_ID, to: '0x123', }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -194,29 +187,24 @@ export default (context: TestContext) => { method: ERC20WithDataABI.abi.find(abi => abi.name === MINT_WITH_DATA) as IAbiMethod, params: ['0x123', '20', '0x00'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/mint').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Transfer token', async () => { const request: TokenTransfer = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_WITH_DATA_POOL_ID, to: '0x123', from: IDENTITY, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -226,28 +214,23 @@ export default (context: TestContext) => { method: ERC20WithDataABI.abi.find(abi => abi.name === TRANSFER_WITH_DATA) as IAbiMethod, params: [IDENTITY, '0x123', '20', '0x00'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/transfer').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Burn token', async () => { const request: TokenBurn = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_WITH_DATA_POOL_ID, from: IDENTITY, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -257,29 +240,24 @@ export default (context: TestContext) => { method: ERC20WithDataABI.abi.find(abi => abi.name === BURN_WITH_DATA) as IAbiMethod, params: [IDENTITY, '20', '0x00'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/burn').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Token approval', async () => { const request: TokenApproval = { + namespace: 'ns1', poolLocator: ERC20_WITH_DATA_POOL_ID, signer: IDENTITY, operator: '2', approved: true, config: { allowance: '100' }, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -289,24 +267,20 @@ export default (context: TestContext) => { method: ERC20WithDataABI.abi.find(abi => abi.name === APPROVE_WITH_DATA) as IAbiMethod, params: ['2', '100', '0x00'], }; - const response: EthConnectAsyncResponse = { id: '1', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/approval').send(request).expect(202).expect({ id: '1' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); }); - describe('ERC20NoData', () => { it('Create pool - unrecognized fields', async () => { const request = { + namespace: 'ns1', type: TokenType.FUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -316,8 +290,8 @@ export default (context: TestContext) => { symbol: SYMBOL, isBestPool: true, // will be stripped but will not cause an error }; - const expectedResponse = expect.objectContaining({ + namespace: 'ns1', data: `{"tx":${TX}}`, poolLocator: `address=${CONTRACT_ADDRESS}&schema=${ERC20_NO_DATA_SCHEMA}&type=${TokenType.FUNGIBLE}`, standard: 'ERC20', @@ -329,16 +303,14 @@ export default (context: TestContext) => { schema: ERC20_NO_DATA_SCHEMA, }, }); - mockPoolQuery(false); context.http.get = jest.fn(() => new FakeObservable(expectedResponse)); - const response = await context.server.post('/createpool').send(request).expect(200); expect(response.body).toEqual(expectedResponse); }); - it('Create pool - invalid type', async () => { const request: TokenPool = { + namespace: 'ns1', type: 'funkible' as TokenType, requestId: REQUEST, signer: IDENTITY, @@ -347,18 +319,16 @@ export default (context: TestContext) => { name: NAME, symbol: SYMBOL, }; - const response = { statusCode: 400, message: ['type must be a valid enum value'], error: 'Bad Request', }; - await context.server.post('/createpool').send(request).expect(400).expect(response); }); - it('Create pool - correct fields', async () => { const request: TokenPool = { + namespace: 'ns1', type: TokenType.FUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -367,8 +337,8 @@ export default (context: TestContext) => { name: NAME, symbol: SYMBOL, }; - const expectedResponse = expect.objectContaining({ + namespace: 'ns1', data: `{"tx":${TX}}`, poolLocator: `address=${CONTRACT_ADDRESS}&schema=${ERC20_NO_DATA_SCHEMA}&type=${TokenType.FUNGIBLE}`, standard: 'ERC20', @@ -380,22 +350,19 @@ export default (context: TestContext) => { schema: ERC20_NO_DATA_SCHEMA, }, }); - mockPoolQuery(false); context.http.get = jest.fn(() => new FakeObservable(expectedResponse)); - const response = await context.server.post('/createpool').send(request).expect(200); expect(response.body).toEqual(expectedResponse); }); - it('Mint token', async () => { const request: TokenMint = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_NO_DATA_POOL_ID, to: '0x123', }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -405,29 +372,24 @@ export default (context: TestContext) => { method: ERC20NoDataABI.abi.find(abi => abi.name === MINT_NO_DATA) as IAbiMethod, params: ['0x123', '20'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/mint').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Transfer token', async () => { const request: TokenTransfer = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_NO_DATA_POOL_ID, to: '0x123', from: IDENTITY, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -437,28 +399,23 @@ export default (context: TestContext) => { method: ERC20NoDataABI.abi.find(abi => abi.name === TRANSFER_NO_DATA) as IAbiMethod, params: ['0x123', '20'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/transfer').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Burn token', async () => { const request: TokenBurn = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_NO_DATA_POOL_ID, from: IDENTITY, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -468,29 +425,24 @@ export default (context: TestContext) => { method: ERC20NoDataABI.abi.find(abi => abi.name === BURN_NO_DATA) as IAbiMethod, params: ['20'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/burn').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Token approval', async () => { const request: TokenApproval = { + namespace: 'ns1', poolLocator: ERC20_NO_DATA_POOL_ID, signer: IDENTITY, operator: '2', approved: true, config: { allowance: '100' }, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -500,20 +452,15 @@ export default (context: TestContext) => { method: ERC20WithDataABI.abi.find(abi => abi.name === APPROVE_NO_DATA) as IAbiMethod, params: ['2', '100'], }; - const response: EthConnectAsyncResponse = { id: '1', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/approval').send(request).expect(202).expect({ id: '1' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Burn token - custom ABI', async () => { const burnMethods = [ { @@ -548,8 +495,8 @@ export default (context: TestContext) => { outputs: [], }, ]; - const request: TokenBurn = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_NO_DATA_POOL_ID, @@ -559,7 +506,6 @@ export default (context: TestContext) => { methods: burnMethods, }, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -569,20 +515,15 @@ export default (context: TestContext) => { method: burnMethods[0], params: ['20'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/burn').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Burn token - custom ABI, burn from other', async () => { const burnMethods = [ { @@ -617,8 +558,8 @@ export default (context: TestContext) => { outputs: [], }, ]; - const request: TokenBurn = { + namespace: 'ns1', amount: '20', signer: IDENTITY, poolLocator: ERC20_NO_DATA_POOL_ID, @@ -628,7 +569,6 @@ export default (context: TestContext) => { methods: burnMethods, }, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -638,27 +578,21 @@ export default (context: TestContext) => { method: burnMethods[1], params: ['0x2', '20'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/burn').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Check interface', async () => { const request: CheckInterfaceRequest = { poolLocator: ERC20_NO_DATA_POOL_ID, format: InterfaceFormat.ABI, methods: ERC20NoDataABI.abi, }; - const response: CheckInterfaceResponse = { approval: { format: InterfaceFormat.ABI, @@ -682,7 +616,6 @@ export default (context: TestContext) => { ], }, }; - await context.server.post('/checkinterface').send(request).expect(200).expect(response); }); }); diff --git a/test/suites/erc721.ts b/test/suites/erc721.ts index bf41034..dfd2789 100644 --- a/test/suites/erc721.ts +++ b/test/suites/erc721.ts @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -83,7 +83,6 @@ export default (context: TestContext) => { }), ); }; - const mockURIQuery = (withURI: boolean) => { context.http.post.mockReturnValueOnce( new FakeObservable({ @@ -91,10 +90,10 @@ export default (context: TestContext) => { }), ); }; - describe('ERC721WithData', () => { it('Create pool - correct fields', async () => { const request: TokenPool = { + namespace: 'ns1', type: TokenType.NONFUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -103,7 +102,6 @@ export default (context: TestContext) => { name: NAME, symbol: SYMBOL, }; - const expectedResponse = expect.objectContaining({ data: `{"tx":${TX}}`, poolLocator: `address=${CONTRACT_ADDRESS}&schema=${ERC721_WITH_DATA_SCHEMA}&type=${TokenType.NONFUNGIBLE}`, @@ -116,17 +114,15 @@ export default (context: TestContext) => { schema: ERC721_WITH_DATA_SCHEMA, }, }); - mockURIQuery(true); mockPoolQuery(undefined); context.http.get = jest.fn(() => new FakeObservable(expectedResponse)); - const response = await context.server.post('/createpool').send(request).expect(200); expect(response.body).toEqual(expectedResponse); }); - it('Create pool - base URI', async () => { const request: TokenPool = { + namespace: 'ns1', type: TokenType.NONFUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -135,7 +131,6 @@ export default (context: TestContext) => { name: NAME, symbol: SYMBOL, }; - const expectedResponse = expect.objectContaining({ data: `{"tx":${TX}}`, poolLocator: `address=${CONTRACT_ADDRESS}&schema=${ERC721_WITH_DATA_SCHEMA}&type=${TokenType.NONFUNGIBLE}`, @@ -148,22 +143,19 @@ export default (context: TestContext) => { schema: ERC721_WITH_DATA_SCHEMA, }, }); - mockURIQuery(true); mockPoolQuery(undefined); context.http.get = jest.fn(() => new FakeObservable(expectedResponse)); - const response = await context.server.post('/createpool').send(request).expect(200); expect(response.body).toEqual(expectedResponse); }); - it('Mint token', async () => { const request: TokenMint = { + namespace: 'ns1', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_POOL_ID, to: '0x123', }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -173,29 +165,24 @@ export default (context: TestContext) => { method: ERC721WithDataV2ABI.abi.find(abi => abi.name === MINT_WITH_URI) as IAbiMethod, params: ['0x123', '0x00', ''], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/mint').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Transfer token', async () => { const request: TokenTransfer = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_POOL_ID, to: '0x123', from: IDENTITY, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -205,28 +192,23 @@ export default (context: TestContext) => { method: ERC721WithDataV2ABI.abi.find(abi => abi.name === TRANSFER_WITH_DATA) as IAbiMethod, params: [IDENTITY, '0x123', '721', '0x00'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/transfer').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Burn token', async () => { const request: TokenBurn = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_WITH_DATA_POOL_ID, from: IDENTITY, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -236,29 +218,24 @@ export default (context: TestContext) => { method: ERC721WithDataV2ABI.abi.find(abi => abi.name === BURN_WITH_DATA) as IAbiMethod, params: [IDENTITY, '721', '0x00'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/burn').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Token approval for all', async () => { const request: TokenApproval = { + namespace: 'ns1', poolLocator: ERC721_WITH_DATA_POOL_ID, signer: IDENTITY, operator: '2', approved: true, config: {}, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -270,29 +247,24 @@ export default (context: TestContext) => { ) as IAbiMethod, params: ['2', true, '0x00'], }; - const response: EthConnectAsyncResponse = { id: '1', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/approval').send(request).expect(202).expect({ id: '1' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Token approval for one', async () => { const request: TokenApproval = { + namespace: 'ns1', poolLocator: ERC721_WITH_DATA_POOL_ID, signer: IDENTITY, operator: '2', approved: true, config: { tokenIndex: '5' }, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -302,24 +274,20 @@ export default (context: TestContext) => { method: ERC721WithDataV2ABI.abi.find(abi => abi.name === APPROVE_WITH_DATA) as IAbiMethod, params: ['2', '5', '0x00'], }; - const response: EthConnectAsyncResponse = { id: '1', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/approval').send(request).expect(202).expect({ id: '1' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); }); - describe('ERC721NoData', () => { it('Create pool - correct fields', async () => { const request: TokenPool = { + namespace: 'ns1', type: TokenType.NONFUNGIBLE, requestId: REQUEST, signer: IDENTITY, @@ -328,7 +296,6 @@ export default (context: TestContext) => { name: NAME, symbol: SYMBOL, }; - const expectedResponse = expect.objectContaining({ data: `{"tx":${TX}}`, poolLocator: `address=${CONTRACT_ADDRESS}&schema=${ERC721_NO_DATA_SCHEMA}&type=${TokenType.NONFUNGIBLE}`, @@ -341,23 +308,20 @@ export default (context: TestContext) => { schema: ERC721_NO_DATA_SCHEMA, }, }); - mockURIQuery(false); mockURIQuery(false); mockPoolQuery(false); context.http.get = jest.fn(() => new FakeObservable(expectedResponse)); - const response = await context.server.post('/createpool').send(request).expect(200); expect(response.body).toEqual(expectedResponse); }); - it('Mint token', async () => { const request: TokenMint = { + namespace: 'ns1', signer: IDENTITY, poolLocator: ERC721_NO_DATA_POOL_ID, to: '0x123', }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -367,29 +331,24 @@ export default (context: TestContext) => { method: ERC721NoDataABI.abi.find(abi => abi.name === MINT_NO_DATA) as IAbiMethod, params: ['0x123'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/mint').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Transfer token', async () => { const request: TokenTransfer = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_NO_DATA_POOL_ID, to: '0x123', from: IDENTITY, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -401,28 +360,23 @@ export default (context: TestContext) => { ) as IAbiMethod, params: [IDENTITY, '0x123', '721', '0x00'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/transfer').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Burn token', async () => { const request: TokenBurn = { + namespace: 'ns1', tokenIndex: '721', signer: IDENTITY, poolLocator: ERC721_NO_DATA_POOL_ID, from: IDENTITY, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -432,29 +386,24 @@ export default (context: TestContext) => { method: ERC721NoDataABI.abi.find(abi => abi.name === BURN_NO_DATA) as IAbiMethod, params: ['721'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/burn').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Token approval for all', async () => { const request: TokenApproval = { + namespace: 'ns1', poolLocator: ERC721_NO_DATA_POOL_ID, signer: IDENTITY, operator: '2', approved: true, config: {}, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -464,29 +413,24 @@ export default (context: TestContext) => { method: ERC721NoDataABI.abi.find(abi => abi.name === APPROVE_FOR_ALL_NO_DATA) as IAbiMethod, params: ['2', true], }; - const response: EthConnectAsyncResponse = { id: '1', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/approval').send(request).expect(202).expect({ id: '1' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Token approval for one', async () => { const request: TokenApproval = { + namespace: 'ns1', poolLocator: ERC721_NO_DATA_POOL_ID, signer: IDENTITY, operator: '2', approved: true, config: { tokenIndex: '5' }, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -496,20 +440,15 @@ export default (context: TestContext) => { method: ERC721NoDataABI.abi.find(abi => abi.name === APPROVE_NO_DATA) as IAbiMethod, params: ['2', '5'], }; - const response: EthConnectAsyncResponse = { id: '1', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/approval').send(request).expect(202).expect({ id: '1' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Mint token - custom ABI', async () => { const safeMintAutoIndex = { name: 'safeMint', @@ -524,8 +463,8 @@ export default (context: TestContext) => { ], outputs: [], }; - const request: TokenMint = { + namespace: 'ns1', signer: IDENTITY, poolLocator: ERC721_NO_DATA_POOL_ID, to: '0x123', @@ -534,7 +473,6 @@ export default (context: TestContext) => { methods: [safeMintAutoIndex], }, }; - const mockEthConnectRequest: EthConnectMsgRequest = { headers: { type: 'SendTransaction', @@ -544,27 +482,21 @@ export default (context: TestContext) => { method: safeMintAutoIndex, params: ['0x123'], }; - const response: EthConnectAsyncResponse = { id: 'responseId', sent: true, }; - context.http.post = jest.fn(() => new FakeObservable(response)); - await context.server.post('/mint').send(request).expect(202).expect({ id: 'responseId' }); - expect(context.http.post).toHaveBeenCalledTimes(1); expect(context.http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, OPTIONS); }); - it('Check interface', async () => { const request: CheckInterfaceRequest = { poolLocator: ERC721_NO_DATA_POOL_ID, format: InterfaceFormat.ABI, methods: ERC721NoDataABI.abi, }; - const response: CheckInterfaceResponse = { approval: { format: InterfaceFormat.ABI, @@ -593,7 +525,6 @@ export default (context: TestContext) => { ], }, }; - await context.server.post('/checkinterface').send(request).expect(200).expect(response); }); }); diff --git a/test/suites/websocket.ts b/test/suites/websocket.ts index 01c2bca..dda80ad 100644 --- a/test/suites/websocket.ts +++ b/test/suites/websocket.ts @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -234,10 +234,19 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20MintTransferEvent], batchNumber: 12345 }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -290,10 +299,19 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20MintTransferEvent] }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -375,10 +393,19 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC721MintTransferEvent] }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -429,10 +456,19 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20TransferEvent] }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -516,10 +552,19 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC721TransferEvent] }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -569,10 +614,19 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20BurnEvent] }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -654,10 +708,19 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC721BurnEvent] }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -713,10 +776,19 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20ApprovalEvent] }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -772,10 +844,19 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC721ApprovalEvent] }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -831,10 +912,19 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockApprovalForAllEvent] }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -847,7 +937,12 @@ export default (context: TestContext) => { it('Success receipt', () => { return context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.receiptHandler).toBeDefined(); context.receiptHandler({ headers: { @@ -873,7 +968,12 @@ export default (context: TestContext) => { it('Error receipt', () => { return context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.receiptHandler).toBeDefined(); context.receiptHandler({ headers: { @@ -905,10 +1005,19 @@ export default (context: TestContext) => { await context.server .ws('/api/ws') - .exec(() => { + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20MintTransferEvent] }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); @@ -918,43 +1027,29 @@ export default (context: TestContext) => { }) .close(); - await context.server.ws('/api/ws').expectJson(message => { - expect(message.id).toBeDefined(); - expect(message.event).toEqual('batch'); - expect(message.data.events).toHaveLength(1); - expect(message.data.events[0].event).toEqual('token-mint'); - return true; - }); - }); - - it('Client switchover', async () => { - context.eventstream.getSubscription.mockReturnValueOnce({ - name: packSubscriptionName(CONTRACT_ADDRESS, '', 'default'), - }); - - const ws1 = context.server.ws('/api/ws'); - const ws2 = context.server.ws('/api/ws'); + context.resetConnectedPromise(); - await ws1 - .exec(() => { + await context.server + .ws('/api/ws') + .sendJson({ + type: 'start', + namespace: 'ns1', + }) + .exec(async () => { + await context.connected; expect(context.eventHandler).toBeDefined(); context.eventHandler({ events: [mockERC20MintTransferEvent] }); }) + .expectJson(message => { + expect(message.event).toEqual('started'); + expect(message.data.namespace).toEqual('ns1'); + }) .expectJson(message => { expect(message.id).toBeDefined(); expect(message.event).toEqual('batch'); expect(message.data.events).toHaveLength(1); expect(message.data.events[0].event).toEqual('token-mint'); return true; - }) - .close(); - - await ws2.expectJson(message => { - expect(message.id).toBeDefined(); - expect(message.event).toEqual('batch'); - expect(message.data.events).toHaveLength(1); - expect(message.data.events[0].event).toEqual('token-mint'); - return true; - }); + }); }); };