Skip to content

Commit

Permalink
Merge pull request #56 from kaleido-io/readahead
Browse files Browse the repository at this point in the history
Update event queue to support readahead to FireFly core
  • Loading branch information
gabriel-indik authored Apr 1, 2022
2 parents adeab1f + 9bb8118 commit 10b23db
Show file tree
Hide file tree
Showing 13 changed files with 815 additions and 576 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ This will make it possible for the organizations to establish MTLS communication
|message-delivered| Emitted to the sender when a message has been delivered | recipient, message, requestId (optional)
|message-failed | Emitted to the sender when a message could not be delivered| recipient, message, requestId (optional)

- After receiving a websocket message, a commit must be sent in order to receive the next one:
- After receiving a websocket message, an ack must be sent ("commit" is a synonym for "ack"):
```
{ "action": "commit" }
{ "action": "ack", "id": "<ID_FROM_EVENT>" }
```
- Messages arrive in the same order they were sent
- Up to 1,000 messages will be queued
Expand Down
936 changes: 438 additions & 498 deletions package-lock.json

Large diffs are not rendered by default.

34 changes: 17 additions & 17 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,40 @@
"author": "",
"license": "Apache-2.0",
"dependencies": {
"ajv": "^8.8.2",
"axios": "^0.24.0",
"busboy": "^0.3.1",
"express": "^4.17.2",
"ajv": "^8.11.0",
"axios": "^0.26.1",
"busboy": "^1.5.0",
"express": "^4.17.3",
"form-data": "^4.0.0",
"jsrsasign": "^10.5.1",
"jsrsasign": "^10.5.14",
"swagger-ui-express": "^4.3.0",
"uuid": "^8.3.2",
"ws": "^8.4.0",
"ws": "^8.5.0",
"yamljs": "^0.3.0"
},
"devDependencies": {
"@types/bunyan": "^1.8.8",
"@types/busboy": "^0.3.1",
"@types/busboy": "^1.5.0",
"@types/chai": "^4.3.0",
"@types/express": "^4.17.13",
"@types/jsrsasign": "^9.0.3",
"@types/mocha": "^9.0.0",
"@types/node": "^17.0.8",
"@types/jsrsasign": "^10.2.1",
"@types/mocha": "^9.1.0",
"@types/node": "^17.0.23",
"@types/swagger-ui-express": "^4.1.3",
"@types/uuid": "^8.3.3",
"@types/ws": "^8.2.2",
"@types/uuid": "^8.3.4",
"@types/ws": "^8.5.3",
"@types/yamljs": "^0.2.31",
"chai": "^4.3.4",
"mocha": "^9.1.3",
"chai": "^4.3.6",
"mocha": "^9.2.2",
"moment": "^2.29.1",
"nyc": "^15.1.0",
"rimraf": "^3.0.2",
"sinon": "^12.0.1",
"sinon": "^13.0.1",
"sinon-chai": "^3.7.0",
"source-map-support": "^0.5.21",
"ts-node": "^10.4.0",
"ts-node": "^10.7.0",
"ts-sinon": "^2.0.2",
"typescript": "^4.5.4"
"typescript": "^4.6.3"
},
"nyc": {
"extension": [
Expand Down
24 changes: 9 additions & 15 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ import path from 'path';
import swaggerUi from 'swagger-ui-express';
import WebSocket from 'ws';
import YAML from 'yamljs';
import { eventEmitter as blobsEventEmitter } from './handlers/blobs';
import * as eventsHandler from './handlers/events';
import { eventEmitter as messagesEventEmitter } from './handlers/messages';
import { genTLSContext, init as initCert, loadPeerCAs } from './lib/cert';
import { config, init as initConfig } from './lib/config';
import { IAckEvent } from './lib/interfaces';
import { Logger } from './lib/logger';
import RequestError, { errorHandler } from './lib/request-error';
import * as utils from './lib/utils';
import { router as apiRouter, setRefreshCACerts } from './routers/api';
import { eventEmitter as p2pEventEmitter, router as p2pRouter } from './routers/p2p';
import { router as p2pRouter } from './routers/p2p';
import { init as initEvents } from './handlers/events';

const log = new Logger("app.ts");

Expand All @@ -51,6 +51,7 @@ setRefreshCACerts(refreshCACerts)
export const start = async () => {
await initConfig();
await initCert();
await initEvents(config);

const apiApp = express();
apiServer = http.createServer(apiApp);
Expand All @@ -68,11 +69,7 @@ export const start = async () => {
}
});

p2pEventEmitter.addListener('event', event => eventsHandler.queueEvent(event));
blobsEventEmitter.addListener('event', event => eventsHandler.queueEvent(event));
messagesEventEmitter.addListener('event', event => eventsHandler.queueEvent(event));

eventsHandler.eventEmitter.addListener('event', event => {
eventsHandler.getEmitter().addListener('event', event => {
log.info(`Event emitted ${event.type}/${event.id}`)
if (delegatedWebSocket !== undefined) {
delegatedWebSocket.send(JSON.stringify(event));
Expand All @@ -82,21 +79,16 @@ export const start = async () => {
const assignWebSocketDelegate = (webSocket: WebSocket) => {
log.info('New WebSocket delegate assigned');
delegatedWebSocket = webSocket;
const event = eventsHandler.getCurrentEvent();
webSocket.on('message', async message => {
try {
const messageContent = JSON.parse(message.toLocaleString());
if (messageContent.action === 'commit') {
log.info(`Event comitted ${event?`${event.type}/${event.id}`:`[no event in flight]`}`)
eventsHandler.handleCommit();
if (messageContent.action === 'ack' || messageContent.action == 'commit') {
eventsHandler.handleAck(messageContent as IAckEvent);
}
} catch (err) {
log.error(`Failed to process websocket message ${err}`);
}
});
if (event !== undefined) {
webSocket.send(JSON.stringify(event));
}
webSocket.on('close', () => {
log.info('WebSocket delegate disconnected');
const nextDelegatedWebSocket = wss.clients.values().next().value;
Expand All @@ -106,6 +98,8 @@ export const start = async () => {
delegatedWebSocket = undefined;
}
});
// Anything that's in-flight needs to be sent again
eventsHandler.reDispatchInFlight();
};

wss.on('connection', (webSocket: WebSocket) => {
Expand Down
7 changes: 3 additions & 4 deletions src/handlers/blobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// limitations under the License.

import crypto from 'crypto';
import EventEmitter from 'events';
import FormData from 'form-data';
import { createReadStream, createWriteStream, promises as fs } from 'fs';
import https from 'https';
Expand All @@ -27,12 +26,12 @@ import { BlobTask, IBlobDeliveredEvent, IBlobFailedEvent, IFile, IMetadata } fro
import { Logger } from '../lib/logger';
import RequestError from '../lib/request-error';
import * as utils from '../lib/utils';
import { queueEvent } from './events';

const log = new Logger("handlers/blobs.ts")

let blobQueue: BlobTask[] = [];
let sending = false;
export const eventEmitter = new EventEmitter();

export const retreiveBlob = async (filePath: string) => {
const resolvedFilePath = path.join(utils.constants.DATA_DIRECTORY, utils.constants.BLOBS_SUBDIRECTORY, filePath);
Expand Down Expand Up @@ -100,7 +99,7 @@ export const deliverBlob = async ({ blobPath, recipient, recipientURL, requestId
timeout: utils.constants.REST_API_CALL_BLOB_REQUEST_TIMEOUT,
httpsAgent
});
eventEmitter.emit('event', {
await queueEvent({
id: uuidV4(),
type: 'blob-delivered',
path: blobPath,
Expand All @@ -109,7 +108,7 @@ export const deliverBlob = async ({ blobPath, recipient, recipientURL, requestId
} as IBlobDeliveredEvent);
log.trace(`Blob delivered`);
} catch (err: any) {
eventEmitter.emit('event', {
await queueEvent({
id: uuidV4(),
type: 'blob-failed',
path: blobPath,
Expand Down
123 changes: 102 additions & 21 deletions src/handlers/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,39 +15,120 @@
// limitations under the License.

import EventEmitter from "events";
import { OutboundEvent } from "../lib/interfaces";
import { IAckEvent, IConfig, OutboundEvent } from "../lib/interfaces";
import { Logger } from "../lib/logger";
import * as utils from '../lib/utils';

const log = new Logger("handlers/events.ts")

let eventQueue: OutboundEvent[] = [];
export const eventEmitter = new EventEmitter();
let maxInflight = utils.constants.DEFAULT_MAX_INFLIGHT;
let maxEventQueueSize = utils.constants.MAX_EVENT_QUEUE_SIZE;
let eventQueue: OutboundEvent[];
let inFlight: OutboundEvent[];

export const queueEvent = (socketEvent: OutboundEvent) => {
if(eventQueue.length < utils.constants.MAX_EVENT_QUEUE_SIZE) {
eventQueue.push(socketEvent);
if(eventQueue.length === 1) {
eventEmitter.emit('event', eventQueue[0]);
let eventEmitter: EventEmitter;
let unblockPromise: Promise<void> | undefined;
let unblock: (() => void) | undefined;

export const init = async (config: IConfig) => {
eventEmitter = new EventEmitter();
eventQueue = [];
inFlight = [];
unblockPromise = undefined;
unblock = undefined;
if (config.events?.maxInflight !== undefined) {
maxInflight = config.events.maxInflight;
}
if (config.events?.queueSize !== undefined) {
maxEventQueueSize = config.events.queueSize;
}
}

const dispatchNext = () => {
if (inFlight.length < maxInflight) {
const event = eventQueue.shift();
if (event) {
inFlight.push(event)
log.debug(`${event.id}: dispatched`);
eventEmitter.emit('event', event);
}
} else {
log.warn('Max queue size reached');
}
};

export const handleCommit = () => {
eventQueue.shift();
if(eventQueue.length > 0) {
eventEmitter.emit('event', eventQueue[0]);
if (eventQueue.length < maxEventQueueSize && unblock) {
unblock();
unblockPromise = undefined;
unblock = undefined;
log.info(`Event queue unblocked (length=${eventQueue.length})`);
}
}

export const getCurrentEvent = () => {
if(eventQueue.length > 0) {
return eventQueue[0];
export const queueEvent = async (socketEvent: OutboundEvent) => {

let currentUnblockPromise = unblockPromise;
if (currentUnblockPromise) {
let blockedTime = Date.now();
log.warn(`${socketEvent.id}: delaying receive due to full event queue (length=${eventQueue.length})`);
await currentUnblockPromise;
log.info(`${socketEvent.id}: unblocked receive after ${Date.now()-blockedTime}ms`);
}
};

export const getQueueSize = () => {
return eventQueue.length;
eventQueue.push(socketEvent);
if (eventQueue.length >= maxEventQueueSize && !unblockPromise) {
log.warn(`Event queue became full (length=${eventQueue.length})`);
unblockPromise = new Promise(resolve => {
unblock = resolve;
})
}

dispatchNext();
};

export const reDispatchInFlight = () => {
for (const event of inFlight) {
eventEmitter.emit('event', event)
}
}

export const handleAck = (ack: IAckEvent) => {

// Check we have something in-flight
if (inFlight.length <= 0) {
log.error(`Ack for ${ack.id} while no events in-flight`);
return
}

// If no ID supplied (back-level API) we
if (ack.id === undefined) {
log.warn(`FireFly core is back-level and did not supply an event ID`);
ack.id = inFlight[0].id;
}

// Remove from our in-flight map
let event;
for (let i = 0; i < inFlight.length; i++) {
const candidate = inFlight[i]
if (ack.id === candidate.id) {
event = candidate;
inFlight.splice(i, 1);
break;
}
}
if (!event) {
log.warn(`Ack received for ${ack.id} which is not in-flight`);
return
}
log.debug(`${ack.id}: acknowledged`);

dispatchNext();
}

export const getEmitter = () => {
return eventEmitter;
}

export const getStats = () => {
return {
messageQueueSize: eventQueue.length,
inFlightCount: inFlight.length,
}
}
7 changes: 3 additions & 4 deletions src/handlers/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import EventEmitter from 'events';
import FormData from 'form-data';
import https from 'https';
import { v4 as uuidV4 } from 'uuid';
import { ca, cert, key } from '../lib/cert';
import { IMessageDeliveredEvent, IMessageFailedEvent, MessageTask } from '../lib/interfaces';
import { Logger } from '../lib/logger';
import * as utils from '../lib/utils';
import { queueEvent } from './events';

const log = new Logger('handlers/messages.ts')

let messageQueue: MessageTask[] = [];
let sending = false;
export const eventEmitter = new EventEmitter();

export const sendMessage = async (message: string, recipient: string, recipientURL: string, requestId: string | undefined) => {
if (sending) {
Expand Down Expand Up @@ -55,7 +54,7 @@ export const deliverMessage = async ({ message, recipient, recipientURL, request
headers: formData.getHeaders(),
httpsAgent
});
eventEmitter.emit('event', {
await queueEvent({
id: uuidV4(),
type: 'message-delivered',
message,
Expand All @@ -64,7 +63,7 @@ export const deliverMessage = async ({ message, recipient, recipientURL, request
} as IMessageDeliveredEvent);
log.trace(`Message delivered`);
} catch(err: any) {
eventEmitter.emit('event', {
await queueEvent({
id: uuidV4(),
type: 'message-failed',
message,
Expand Down
2 changes: 1 addition & 1 deletion src/lib/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ const loadConfig = async () => {
throw err;
}
}
config = data as IConfig;
if(validateConfig(data)) {
config = data as IConfig;
for(const peer of config.peers) {
if(peer.endpoint.endsWith('/')) {
peer.endpoint = peer.endpoint.slice(-0, -1);
Expand Down
Loading

0 comments on commit 10b23db

Please sign in to comment.