Skip to content

Commit

Permalink
Merge pull request #61 from kaleido-io/subs
Browse files Browse the repository at this point in the history
Fix bugs with event streams
  • Loading branch information
nguyer authored Mar 15, 2022
2 parents 31668a0 + d0cee54 commit 80492e9
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 52 deletions.
4 changes: 2 additions & 2 deletions src/event-stream/event-stream.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ export class EventStreamService {
return response.data;
}

async createOrUpdateStream(topic: string): Promise<EventStream> {
async createOrUpdateStream(name: string, topic: string): Promise<EventStream> {
const streamDetails = {
name: topic,
name,
errorHandling: 'block',
batchSize: 50,
batchTimeoutMS: 500,
Expand Down
44 changes: 31 additions & 13 deletions src/tokens/tokens.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import {
encodeHex,
encodeHexIDForURI,
isFungible,
packStreamName,
packSubscriptionName,
packTokenId,
unpackSubscriptionName,
Expand Down Expand Up @@ -86,7 +87,7 @@ export class TokensService {
instanceUrl: string;
topic: string;
shortPrefix: string;
stream: EventStream;
stream: EventStream | undefined;
username: string;
password: string;

Expand Down Expand Up @@ -120,15 +121,23 @@ export class TokensService {
* One-time initialization of event stream and base subscription.
*/
async init() {
this.stream = await this.eventstream.createOrUpdateStream(this.topic);
this.stream = await this.getStream();
await this.eventstream.getOrCreateSubscription(
this.instancePath,
this.stream.id,
tokenCreateEvent,
packSubscriptionName(this.topic, BASE_SUBSCRIPTION_NAME),
packSubscriptionName(this.topic, this.instancePath, BASE_SUBSCRIPTION_NAME, tokenCreateEvent),
);
}

private async getStream() {
if (this.stream === undefined) {
const name = packStreamName(this.topic, this.instancePath);
this.stream = await this.eventstream.createOrUpdateStream(name, this.topic);
}
return this.stream;
}

/**
* If there is an existing event stream whose subscriptions don't match the current
* events and naming format, delete the stream so we'll start over.
Expand All @@ -138,9 +147,17 @@ export class TokensService {
* TODO: eventually this migration logic can be pruned
*/
async migrate() {
const name = packStreamName(this.topic, this.instancePath);
const streams = await this.eventstream.getStreams();
const existingStream = streams.find(s => s.name === this.topic);
const existingStream = streams.find(s => s.name === name);
if (existingStream === undefined) {
// Look for the old stream name (topic alone)
const oldStream = streams.find(s => s.name === this.topic);
if (oldStream !== undefined) {
this.logger.warn('Old event stream found - deleting and recreating');
await this.eventstream.deleteStream(oldStream.id);
await this.init();
}
return;
}
const subscriptions = await this.eventstream.getSubscriptions();
Expand Down Expand Up @@ -168,7 +185,7 @@ export class TokensService {
this.logger.warn('Incorrect event stream subscriptions found - deleting and recreating');
await this.eventstream.deleteStream(existingStream.id);
await this.init();
break;
return;
}
}
}
Expand Down Expand Up @@ -218,33 +235,34 @@ export class TokensService {
}

async activatePool(dto: TokenPoolActivate) {
const stream = await this.getStream();
await Promise.all([
this.eventstream.getOrCreateSubscription(
this.instancePath,
this.stream.id,
stream.id,
tokenCreateEvent,
packSubscriptionName(this.topic, dto.poolId, tokenCreateEvent),
packSubscriptionName(this.topic, this.instancePath, dto.poolId, tokenCreateEvent),
dto.transaction?.blockNumber ?? '0',
),
this.eventstream.getOrCreateSubscription(
this.instancePath,
this.stream.id,
stream.id,
transferSingleEvent,
packSubscriptionName(this.topic, dto.poolId, transferSingleEvent),
packSubscriptionName(this.topic, this.instancePath, dto.poolId, transferSingleEvent),
dto.transaction?.blockNumber ?? '0',
),
this.eventstream.getOrCreateSubscription(
this.instancePath,
this.stream.id,
stream.id,
transferBatchEvent,
packSubscriptionName(this.topic, dto.poolId, transferBatchEvent),
packSubscriptionName(this.topic, this.instancePath, dto.poolId, transferBatchEvent),
dto.transaction?.blockNumber ?? '0',
),
this.eventstream.getOrCreateSubscription(
this.instancePath,
this.stream.id,
stream.id,
approvalForAllEvent,
packSubscriptionName(this.topic, dto.poolId, approvalForAllEvent),
packSubscriptionName(this.topic, this.instancePath, dto.poolId, approvalForAllEvent),
// Block number is 0 because it is important to receive all approval events,
// so existing approvals will be reflected in the newly created pool
'0',
Expand Down
26 changes: 14 additions & 12 deletions src/tokens/tokens.util.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
decodeHex,
encodeHex,
encodeHexIDForURI,
packStreamName,
packSubscriptionName,
packTokenId,
unpackSubscriptionName,
Expand Down Expand Up @@ -67,29 +68,30 @@ describe('Util', () => {
});
});

it('packStreamName', () => {
expect(packStreamName('token', '0x123')).toEqual('token:0x123');
});

it('packSubscriptionName', () => {
expect(packSubscriptionName('token', 'F1')).toEqual('token:F1');
expect(packSubscriptionName('token', 'N1', 'create')).toEqual('token:N1:create');
expect(packSubscriptionName('tok:en', 'N1', 'create')).toEqual('tok:en:N1:create');
expect(packSubscriptionName('token', '0x123', 'F1', 'create')).toEqual('token:0x123:F1:create');
expect(packSubscriptionName('tok:en', '0x123', 'N1', 'create')).toEqual(
'tok:en:0x123:N1:create',
);
});

it('unpackSubscriptionName', () => {
expect(unpackSubscriptionName('token', 'token:F1')).toEqual({
expect(unpackSubscriptionName('token', 'token:0x123:F1:create')).toEqual({
prefix: 'token',
instancePath: '0x123',
poolId: 'F1',
});
expect(unpackSubscriptionName('token', 'token:N1:create')).toEqual({
prefix: 'token',
poolId: 'N1',
event: 'create',
});
expect(unpackSubscriptionName('tok:en', 'tok:en:N1:create')).toEqual({
expect(unpackSubscriptionName('tok:en', 'tok:en:0x123:N1:create')).toEqual({
prefix: 'tok:en',
instancePath: '0x123',
poolId: 'N1',
event: 'create',
});
expect(unpackSubscriptionName('token', 'bad:N1:create')).toEqual({
prefix: 'token',
});
expect(unpackSubscriptionName('token', 'bad:N1:create')).toEqual({});
});
});
31 changes: 21 additions & 10 deletions src/tokens/tokens.util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,31 @@ export function unpackTokenId(id: string) {
};
}

export function packSubscriptionName(prefix: string, poolId: string, event?: string) {
if (event === undefined) {
return [prefix, poolId].join(':');
}
return [prefix, poolId, event].join(':');
export function packStreamName(prefix: string, instancePath: string) {
return [prefix, instancePath].join(':');
}

export function packSubscriptionName(
prefix: string,
instancePath: string,
poolId: string,
event: string,
) {
return [prefix, instancePath, poolId, event].join(':');
}

export function unpackSubscriptionName(prefix: string, data: string) {
const parts = data.startsWith(prefix + ':')
? data.slice(prefix.length + 1).split(':', 2)
: undefined;
if (!data.startsWith(prefix + ':')) {
return {};
}
const parts = data.slice(prefix.length + 1).split(':');
if (parts.length !== 3) {
return {};
}
return {
prefix,
poolId: parts?.[0],
event: parts?.[1],
instancePath: parts[0],
poolId: parts[1],
event: parts[2],
};
}
29 changes: 14 additions & 15 deletions test/app.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import {
} from '../src/tokens/tokens.interfaces';
import { TokensService } from '../src/tokens/tokens.service';
import { WebSocketMessage } from '../src/websocket-events/websocket-events.base';
import { packSubscriptionName } from '../src/tokens/tokens.util';
import { AppModule } from './../src/app.module';

const BASE_URL = 'http://eth';
Expand Down Expand Up @@ -403,7 +404,7 @@ describe('AppController (e2e)', () => {

it('Websocket: token pool event', () => {
eventstream.getSubscription.mockReturnValueOnce(<EventStreamSubscription>{
name: TOPIC + ':F1',
name: packSubscriptionName(TOPIC, '0x123', 'F1', ''),
});

return server
Expand Down Expand Up @@ -461,7 +462,7 @@ describe('AppController (e2e)', () => {

it('Websocket: token pool event from base subscription', () => {
eventstream.getSubscription.mockReturnValueOnce(<EventStreamSubscription>{
name: TOPIC + ':base',
name: packSubscriptionName(TOPIC, '0x123', 'base', ''),
});

return server
Expand Down Expand Up @@ -519,7 +520,7 @@ describe('AppController (e2e)', () => {

it('Websocket: token mint event', async () => {
eventstream.getSubscription.mockReturnValueOnce(<EventStreamSubscription>{
name: TOPIC + ':F1',
name: packSubscriptionName(TOPIC, '0x123', 'F1', ''),
});

http.get = jest.fn(
Expand Down Expand Up @@ -611,7 +612,7 @@ describe('AppController (e2e)', () => {

it('Websocket: token burn event', async () => {
eventstream.getSubscription.mockReturnValueOnce(<EventStreamSubscription>{
name: TOPIC + ':N1',
name: packSubscriptionName(TOPIC, '0x123', 'N1', ''),
});

http.get = jest.fn(
Expand Down Expand Up @@ -702,7 +703,7 @@ describe('AppController (e2e)', () => {

it('Websocket: token transfer event', async () => {
eventstream.getSubscription.mockReturnValueOnce(<EventStreamSubscription>{
name: TOPIC + ':N1',
name: packSubscriptionName(TOPIC, '0x123', 'N1', ''),
});

http.get = jest.fn(
Expand Down Expand Up @@ -780,7 +781,7 @@ describe('AppController (e2e)', () => {

it('Websocket: token approval event', async () => {
eventstream.getSubscription.mockReturnValueOnce(<EventStreamSubscription>{
name: TOPIC + ':N1',
name: packSubscriptionName(TOPIC, '0x123', 'N1', ''),
});

await server
Expand Down Expand Up @@ -841,9 +842,8 @@ describe('AppController (e2e)', () => {
});

it('Websocket: token transfer event from wrong pool', () => {
eventstream.getSubscription
.mockReturnValueOnce(<EventStreamSubscription>{ name: TOPIC + ':N1' })
.mockReturnValueOnce(<EventStreamSubscription>{ name: TOPIC + ':N1' });
const sub = <EventStreamSubscription>{ name: packSubscriptionName(TOPIC, '0x123', 'N1', '') };
eventstream.getSubscription.mockReturnValueOnce(sub).mockReturnValueOnce(sub);

return server
.ws('/api/ws')
Expand Down Expand Up @@ -893,7 +893,7 @@ describe('AppController (e2e)', () => {

it('Websocket: token batch transfer', async () => {
eventstream.getSubscription.mockReturnValueOnce(<EventStreamSubscription>{
name: TOPIC + ':N1',
name: packSubscriptionName(TOPIC, '0x123', 'N1', ''),
});

http.get = jest.fn(
Expand Down Expand Up @@ -1077,7 +1077,7 @@ describe('AppController (e2e)', () => {
};

eventstream.getSubscription.mockReturnValueOnce(<EventStreamSubscription>{
name: TOPIC + ':F1',
name: packSubscriptionName(TOPIC, '0x123', 'F1', ''),
});

await server
Expand Down Expand Up @@ -1116,7 +1116,7 @@ describe('AppController (e2e)', () => {
};

eventstream.getSubscription.mockReturnValueOnce(<EventStreamSubscription>{
name: TOPIC + ':F1',
name: packSubscriptionName(TOPIC, '0x123', 'F1', ''),
});

const ws1 = server.ws('/api/ws');
Expand Down Expand Up @@ -1173,9 +1173,8 @@ describe('AppController (e2e)', () => {
},
};

eventstream.getSubscription
.mockReturnValueOnce(<EventStreamSubscription>{ name: TOPIC + ':F1' })
.mockReturnValueOnce(<EventStreamSubscription>{ name: TOPIC + ':F1' });
const sub = <EventStreamSubscription>{ name: packSubscriptionName(TOPIC, '0x123', 'F1', '') };
eventstream.getSubscription.mockReturnValueOnce(sub).mockReturnValueOnce(sub);

const ws1 = server.ws('/api/ws');
const ws2 = server.ws('/api/ws');
Expand Down

0 comments on commit 80492e9

Please sign in to comment.