From 58de1338854956bf929b9db2e23a8f080a0f2991 Mon Sep 17 00:00:00 2001 From: Andrew Richardson Date: Tue, 23 May 2023 23:09:13 -0400 Subject: [PATCH] Add /deactivatepool API for deleting listeners Signed-off-by: Andrew Richardson --- src/event-stream/event-stream.service.ts | 16 ++++++ src/main.ts | 2 +- src/tokens/tokens.controller.ts | 33 ++++++++---- src/tokens/tokens.interfaces.ts | 12 +++-- src/tokens/tokens.service.ts | 64 +++++++++++++++++++++++- 5 files changed, 111 insertions(+), 16 deletions(-) diff --git a/src/event-stream/event-stream.service.ts b/src/event-stream/event-stream.service.ts index 7932f0b..42ec157 100644 --- a/src/event-stream/event-stream.service.ts +++ b/src/event-stream/event-stream.service.ts @@ -321,6 +321,22 @@ export class EventStreamService { ); } + async deleteSubscriptionByName(ctx: Context, streamId: string, name: string) { + const existingSubscriptions = await this.getSubscriptions(ctx); + const sub = existingSubscriptions.find(s => s.name === name && s.stream === streamId); + if (!sub) { + this.logger.log(`No subscription found for ${name}`); + return false; + } + await lastValueFrom( + this.http.delete( + new URL(`/subscriptions/${sub.id}`, this.baseUrl).href, + this.requestOptions(ctx), + ), + ); + return true; + } + connect( url: string, topic: string, diff --git a/src/main.ts b/src/main.ts index c2e2b31..95ff1c3 100644 --- a/src/main.ts +++ b/src/main.ts @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -import { NestApplicationOptions, ShutdownSignal, ValidationPipe } from '@nestjs/common'; +import { ShutdownSignal, ValidationPipe } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { NestFactory } from '@nestjs/core'; import { DocumentBuilder, SwaggerModule } from '@nestjs/swagger'; diff --git a/src/tokens/tokens.controller.ts b/src/tokens/tokens.controller.ts index 871ef77..ff51c7f 100644 --- a/src/tokens/tokens.controller.ts +++ b/src/tokens/tokens.controller.ts @@ -28,10 +28,11 @@ import { TokenMint, TokenPool, TokenPoolActivate, + TokenPoolDeactivate, TokenTransfer, } from './tokens.interfaces'; import { TokensService } from './tokens.service'; -import { RequestContext } from '../request-context/request-context.decorator'; +import { Context, RequestContext } from '../request-context/request-context.decorator'; @Controller() export class TokensController { @@ -40,7 +41,7 @@ export class TokensController { @Post('init') @HttpCode(204) @ApiOperation({ summary: 'Perform one-time initialization (if not auto-initialized)' }) - async init(@RequestContext() ctx) { + async init(@RequestContext() ctx: Context) { await this.service.init(ctx); } @@ -53,21 +54,31 @@ export class TokensController { }) @ApiBody({ type: TokenPool }) @ApiResponse({ status: 202, type: AsyncResponse }) - createPool(@RequestContext() ctx, @Body() dto: TokenPool) { + createPool(@RequestContext() ctx: Context, @Body() dto: TokenPool) { return this.service.createPool(ctx, dto); } @Post('activatepool') @HttpCode(204) @ApiOperation({ - summary: 'Activate a token pool to begin receiving transfer events', + summary: 'Activate a token pool to begin receiving transfer and approval events', description: 'Will retrigger the token-pool event for this pool as a side-effect', }) @ApiBody({ type: TokenPoolActivate }) - async activatePool(@RequestContext() ctx, @Body() dto: TokenPoolActivate) { + async activatePool(@RequestContext() ctx: Context, @Body() dto: TokenPoolActivate) { await this.service.activatePool(ctx, dto); } + @Post('deactivatepool') + @HttpCode(204) + @ApiOperation({ + summary: 'Deactivate a token pool to delete all listeners and stop receiving events', + }) + @ApiBody({ type: TokenPoolDeactivate }) + async deactivatePool(@RequestContext() ctx: Context, @Body() dto: TokenPoolDeactivate) { + await this.service.deactivatePool(ctx, dto); + } + @Post('mint') @HttpCode(202) @ApiOperation({ @@ -77,7 +88,7 @@ export class TokensController { }) @ApiBody({ type: TokenMint }) @ApiResponse({ status: 202, type: AsyncResponse }) - mint(@RequestContext() ctx, @Body() dto: TokenMint) { + mint(@RequestContext() ctx: Context, @Body() dto: TokenMint) { return this.service.mint(ctx, dto); } @@ -99,7 +110,7 @@ export class TokensController { }) @ApiBody({ type: TokenApproval }) @ApiResponse({ status: 202, type: AsyncResponse }) - approve(@RequestContext() ctx, @Body() dto: TokenApproval) { + approve(@RequestContext() ctx: Context, @Body() dto: TokenApproval) { return this.service.approval(ctx, dto); } @@ -112,7 +123,7 @@ export class TokensController { }) @ApiBody({ type: TokenBurn }) @ApiResponse({ status: 202, type: AsyncResponse }) - burn(@RequestContext() ctx, @Body() dto: TokenBurn) { + burn(@RequestContext() ctx: Context, @Body() dto: TokenBurn) { return this.service.burn(ctx, dto); } @@ -125,21 +136,21 @@ export class TokensController { }) @ApiBody({ type: TokenTransfer }) @ApiResponse({ status: 202, type: AsyncResponse }) - transfer(@RequestContext() ctx, @Body() dto: TokenTransfer) { + transfer(@RequestContext() ctx: Context, @Body() dto: TokenTransfer) { return this.service.transfer(ctx, dto); } @Get('balance') @ApiOperation({ summary: 'Retrieve a token balance' }) @ApiResponse({ status: 200, type: TokenBalance }) - balance(@RequestContext() ctx, @Query() query: TokenBalanceQuery) { + balance(@RequestContext() ctx: Context, @Query() query: TokenBalanceQuery) { return this.service.balance(ctx, query); } @Get('receipt/:id') @ApiOperation({ summary: 'Retrieve the result of an async operation' }) @ApiResponse({ status: 200, type: EventStreamReply }) - getReceipt(@RequestContext() ctx, @Param('id') id: string) { + getReceipt(@RequestContext() ctx: Context, @Param('id') id: string) { return this.blockchain.getReceipt(ctx, id); } } diff --git a/src/tokens/tokens.interfaces.ts b/src/tokens/tokens.interfaces.ts index 5b91e18..b824afe 100644 --- a/src/tokens/tokens.interfaces.ts +++ b/src/tokens/tokens.interfaces.ts @@ -187,11 +187,17 @@ export class TokenPoolActivate { @ApiProperty() @IsOptional() - config?: any; + config?: TokenPoolConfig; - @ApiProperty({ description: requestIdDescription }) + @ApiProperty() @IsOptional() - requestId?: string; + poolData?: string; +} + +export class TokenPoolDeactivate { + @ApiProperty() + @IsNotEmpty() + poolLocator: string; @ApiProperty() @IsOptional() diff --git a/src/tokens/tokens.service.ts b/src/tokens/tokens.service.ts index 10e7f0f..248f7d3 100644 --- a/src/tokens/tokens.service.ts +++ b/src/tokens/tokens.service.ts @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -import { Injectable, Logger } from '@nestjs/common'; +import { Injectable, Logger, NotFoundException } from '@nestjs/common'; import { EventStreamService } from '../event-stream/event-stream.service'; import { EventStream, EventStreamSubscription } from '../event-stream/event-stream.interfaces'; import { EventStreamProxyGateway } from '../eventstream-proxy/eventstream-proxy.gateway'; @@ -33,6 +33,7 @@ import { TokenMint, TokenPool, TokenPoolActivate, + TokenPoolDeactivate, TokenTransfer, } from './tokens.interfaces'; import { @@ -325,6 +326,67 @@ export class TokensService { await Promise.all(promises); } + async deactivatePool(ctx: Context, dto: TokenPoolDeactivate) { + const tokenCreateEvent = this.mapper.getCreateEvent(); + const stream = await this.getStream(ctx); + + const promises: Promise[] = []; + if (tokenCreateEvent?.name !== undefined) { + promises.push( + this.eventstream.deleteSubscriptionByName( + ctx, + stream.id, + packSubscriptionName( + this.instancePath, + dto.poolLocator, + tokenCreateEvent.name, + dto.poolData, + ), + ), + ); + } + + promises.push( + ...[ + this.eventstream.deleteSubscriptionByName( + ctx, + stream.id, + packSubscriptionName( + this.instancePath, + dto.poolLocator, + TransferSingle.name, + dto.poolData, + ), + ), + this.eventstream.deleteSubscriptionByName( + ctx, + stream.id, + packSubscriptionName( + this.instancePath, + dto.poolLocator, + TransferBatch.name, + dto.poolData, + ), + ), + this.eventstream.deleteSubscriptionByName( + ctx, + stream.id, + packSubscriptionName( + this.instancePath, + dto.poolLocator, + ApprovalForAll.name, + dto.poolData, + ), + ), + ], + ); + + const results = await Promise.all(promises); + if (results.every(deleted => !deleted)) { + throw new NotFoundException('No listeners found'); + } + } + checkInterface(dto: CheckInterfaceRequest): CheckInterfaceResponse { const wrapMethods = (methods: IAbiMethod[]): TokenInterface => { return { format: InterfaceFormat.ABI, methods };