Skip to content

Commit

Permalink
Merge pull request #35 from pinax-network/update-webhook-signature
Browse files Browse the repository at this point in the history
Update to latest `substreams-sink-webhook` signature
  • Loading branch information
DenisCarriere authored Nov 29, 2023
2 parents 34dec0f + 08e7e46 commit f65c2f1
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 93 deletions.
15 changes: 8 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,14 @@ Usage: substreams-sink-websockets [options]
Substreams Sink Websockets

Options:
--public-key <string> (required) Ed25519 public key (env: PUBLIC_KEY)
--port <int> Server listen on HTTP port (default: 3000, env: PORT)
--hostname <string> Server listen on HTTP hostname (default: "0.0.0.0", env: HOSTNAME)
--sqlite-filename <string> SQLite database filename (default: "db.sqlite", env: SQLITE_FILENAME)
--verbose <boolean> Enable verbose logging (default: false, env: VERBOSE)
-V, --version output the version number
-h, --help display help for command
--public-key <string> (required) Ed25519 public key (comma-separated for multiple public keys) (env: PUBLIC_KEY)
--port <int> Server listen on HTTP port (default: 3000, env: PORT)
--hostname <string> Server listen on HTTP hostname (default: "0.0.0.0", env: HOSTNAME)
--sqlite-filename <string> SQLite database filename (default: "db.sqlite", env: SQLITE_FILENAME)
--verbose <boolean> Enable verbose logging (default: false, env: VERBOSE)
--recent-messages-limit <int> Limit recent messages (default: 50, env: RECENT_MESSAGES_LIMIT)
-V, --version output the version number
-h, --help display help for command
```

## Docker environment
Expand Down
4 changes: 2 additions & 2 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import GET from "./src/fetch/GET.js";
import POST from "./src/fetch/POST.js";
import * as sqlite from "./src/sqlite.js";
import * as websocket from "./src/websocket/index.js";
import { HOSTNAME, PORT, PUBLIC_KEY, SQLITE_FILENAME } from "./src/config.js";
import { HOSTNAME, PORT, PUBLIC_KEYS, SQLITE_FILENAME } from "./src/config.js";
import { logger } from "./src/logger.js";

export const db = sqlite.createDb(SQLITE_FILENAME);

logger.info(`Server listening on http://${HOSTNAME}:${PORT}`);
logger.info("Verifying with PUBLIC_KEY", PUBLIC_KEY);
logger.info("Verifying with PUBLIC_KEYS", PUBLIC_KEYS.join(","));
logger.info("Reading SQLITE_FILENAME", SQLITE_FILENAME);

export interface ServerWebSocketData {
Expand Down
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
{
"private": true,
"version": "0.1.9",
"version": "0.1.10",
"description": "Substreams Sink Websockets",
"name": "substreams-sink-websockets",
"homepage": "https://github.com/pinax-network/substreams-sink-websockets",
"type": "module",
"scripts": {
"start": "bun run index.ts",
"start": "bun run index.ts --help",
"test": "bun test",
"build": "bun build --compile ./index.ts --outfile substreams-sink-websockets",
"dev": "bun run --watch index.ts"
Expand All @@ -16,6 +16,7 @@
"dotenv": "latest",
"openapi3-ts": "latest",
"prom-client": "latest",
"substreams-sink-webhook": "^0.7.2",
"tslog": "latest",
"tweetnacl": "latest"
},
Expand Down
10 changes: 6 additions & 4 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const opts = new Command()
.name(pkg.name)
.description(pkg.description)
.showHelpAfterError()
.addOption(new Option("--public-key <string>", "(required) Ed25519 public key").env("PUBLIC_KEY"))
.addOption(new Option("--public-key <string>", "(required) Ed25519 public key (comma-separated for multiple public keys)").env("PUBLIC_KEY"))
.addOption(new Option("--port <int>", "Server listen on HTTP port").default(DEFAULT_PORT).env("PORT"))
.addOption(new Option("--hostname <string>", "Server listen on HTTP hostname").default(DEFAULT_HOSTNAME).env("HOSTNAME"))
.addOption(new Option("--sqlite-filename <string>", "SQLite database filename").default(DEFAULT_SQLITE_FILENAME).env("SQLITE_FILENAME"))
Expand All @@ -25,14 +25,16 @@ const opts = new Command()
.parse(process.argv).opts();

// export options
export const PUBLIC_KEY: string = opts.publicKey;
export const PUBLIC_KEYS: string[] = opts.publicKey?.split(",");
export const PORT = Number(opts.port);
export const HOSTNAME: string = opts.hostname
export const SQLITE_FILENAME: string = opts.sqliteFilename;
export const VERBOSE: boolean = opts.verbose === "true" ? true : false;
export const RECENT_MESSAGES_LIMIT: number = Number(opts.recentMessagesLimit);

// validate required options
if (!PUBLIC_KEY) throw new Error("PUBLIC_KEY is required");
if (Buffer.from(PUBLIC_KEY, "hex").length !== 32) throw new Error("PUBLIC_KEY must be a 32 byte hex string");
if (!PUBLIC_KEYS.length) throw new Error("PUBLIC_KEY is required");
for ( const publicKey of PUBLIC_KEYS ) {
if (Buffer.from(publicKey, "hex").length !== 32) throw new Error("PUBLIC_KEY must be a 32 byte hex string");
}
if (!Number.isInteger(PORT)) throw new Error("PORT must be an integer");
124 changes: 52 additions & 72 deletions src/fetch/POST.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,92 +2,72 @@ import * as prometheus from "../prometheus.js";
import * as sqlite from "../sqlite.js";
import { db } from "../../index.js";
import { logger } from "../logger.js";
import { verify } from "../verify.js";
import { PUBLIC_KEY } from "../config.js";
import { Server } from "bun";
import { toText } from "./cors.js";
import { insertMessages } from "./messages.js";
import { signatureEd25519 } from "../webhook/singatureEd25519.js";
import { BodySchema } from "substreams-sink-webhook/auth";

export default async function (req: Request, server: Server) {
// get headers and body from POST request
const timestamp = req.headers.get("x-signature-timestamp");
const signature = req.headers.get("x-signature-ed25519");
const body = await req.text();
logger.info('POST', {timestamp, signature, body});
// validate Ed25519 signature
const text = await req.text();
const signatureResult = await signatureEd25519(req, text);
if ("error" in signatureResult) return signatureResult.error;

// validate request
try {
if (!timestamp) throw new Error("missing required \'timestamp\' in headers");
if (!signature) throw new Error("missing required \'signature\' in headers");
if (!body) throw new Error("missing body");
} catch (e) {
logger.error(e);
prometheus.webhook_message_received_errors.inc(1);
return toText(e.message, 400 );
}

// verify request
const msg = Buffer.from(timestamp + body);
const isVerified = verify(msg, signature, PUBLIC_KEY);
if (!isVerified) {
prometheus.webhook_message_received_errors.inc(1);
return toText("invalid request signature", 401 );
}
const json = JSON.parse(body);
// parse POST body payload
try {
// prometheus.requests.inc();
const body = BodySchema.parse(JSON.parse(text));

// Webhook handshake (not WebSocket related)
if (json?.message == "PING") {
const message = JSON.parse(body).message;
logger.info('PING WebHook handshake', {message});
return toText("OK");
// PING
if ("message" in body) {
if (body.message === "PING") return toText("OK");
return toText("invalid body", 400);
}
// Get data from Substreams metadata
const { clock, manifest, session } = json;
const { moduleHash, chain } = manifest ?? {};
const { traceId } = session ?? {};

// validate POST request
try {
if (!clock) throw new Error("missing required \'clock\' in body");
if (!manifest) throw new Error("missing required \'manifest\' in body");
if (!session) throw new Error("missing required \'session\' in body");
if (!chain) throw new Error("missing required \'chain\' in body.manifest");
if (!moduleHash) throw new Error("missing required \'moduleHash\' in body.manifest");
if (!traceId) throw new Error("missing required \'traceId\' in body.session");
} catch (e) {
logger.error(e);
prometheus.webhook_message_received_errors.inc(1);
return toText(e.message, 400 );
}
if ("data" in body) {
// Get data from Substreams metadata
const { clock, manifest, session } = body;
const { moduleHash, chain } = manifest ?? {};
const { traceId } = session ?? {};
const { timestamp } = clock;

// publish message to subscribers
const bytes = server.publish(moduleHash, body);
logger.info('server.publish', {bytes, block: clock.number, timestamp: clock.timestamp, moduleHash});
// publish message to subscribers
const bytes = server.publish(moduleHash, text);
logger.info('server.publish', {bytes, block: clock.number, timestamp: clock.timestamp, moduleHash});

// additional publish message specified by chain
server.publish(`${chain}:${moduleHash}`, body)
// additional publish message specified by chain
server.publish(`${chain}:${moduleHash}`, text)

// Metrics for published messages
// response is:
// 0 if the message was dropped
// -1 if backpressure was applied
// or the number of bytes sent.
if ( bytes > 0 ) {
prometheus.publish_message_bytes.inc(bytes);
prometheus.publish_message.inc(1);
}
// Metrics for incoming WebHook
prometheus.webhook_message_received.labels({moduleHash, chain}).inc(1);
prometheus.webhook_trace_id.labels({traceId, chain}).inc(1);
// Metrics for published messages
// response is:
// 0 if the message was dropped
// -1 if backpressure was applied
// or the number of bytes sent.
if ( bytes > 0 ) {
prometheus.publish_message_bytes.inc(bytes);
prometheus.publish_message.inc(1);
}
// Metrics for incoming WebHook
prometheus.webhook_message_received.labels({moduleHash, chain}).inc(1);
prometheus.webhook_trace_id.labels({traceId, chain}).inc(1);

// Upsert moduleHash into SQLite DB
sqlite.replace(db, "chain", chain, timestamp);
sqlite.replace(db, "moduleHash", moduleHash, timestamp);
sqlite.replace(db, "moduleHashByChain", `${chain}:${moduleHash}`, timestamp);
sqlite.replace(db, "traceId", `${chain}:${traceId}`, timestamp);
//Set timestamp as key to filter recent messages
// Upsert moduleHash into SQLite DB
sqlite.replace(db, "chain", chain, timestamp);
sqlite.replace(db, "moduleHash", moduleHash, timestamp);
sqlite.replace(db, "moduleHashByChain", `${chain}:${moduleHash}`, timestamp);
sqlite.replace(db, "traceId", `${chain}:${traceId}`, timestamp);

insertMessages( db, traceId, JSON.stringify(json), chain );
// Set timestamp as key to filter recent messages
insertMessages( db, traceId, text, chain );

return toText("OK");
return toText("OK");
}
} catch (err) {
logger.error(err);
// prometheus.request_errors?.inc();
prometheus.webhook_message_received_errors.inc(1);
return toText("invalid request", 400);
}
}
12 changes: 6 additions & 6 deletions src/fetch/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as sqlite from "../sqlite.js";
import { DEFAULT_RECENT_MESSAGES_LIMIT, RECENT_MESSAGES_LIMIT } from "../config.js";
import Database from "bun:sqlite";
import { db } from "../../index.js";
import { toJSON, toText } from "./cors.js";
import { toJSON } from "./cors.js";

export function parseLimit(searchParams: URLSearchParams) {
const value = searchParams.get("limit");
Expand Down Expand Up @@ -34,28 +34,28 @@ export function handleMessages(req: Request) {
//console.log(messages)
}

export function insertMessages(db: Database, traceId: string, timestamp: string, chain?: string) {
export function insertMessages(db: Database, traceId: string, text: string, chain?: string) {
const dbLength = sqlite.count(db, "messages");

if (dbLength >= RECENT_MESSAGES_LIMIT) {
let oldest = sqlite.selectAll(db, "messages").sort((a: any, b: any) => a.timestamp - b.timestamp)[0];

// update messages
sqlite.replaceRecent(db, "messages", String(Date.now()), `${traceId}`, timestamp);
sqlite.replaceRecent(db, "messages", String(Date.now()), `${traceId}`, text);
sqlite.deleteRow(db, "messages", oldest.key);

// update messagesByChain
if (chain) {
oldest = sqlite.selectAll(db, "messagesByChain").sort((a: any, b: any) => a.timestamp - b.timestamp)[0];
sqlite.replaceRecent(db, "messagesByChain", String(Date.now()), `${chain}:${traceId}`, timestamp );
sqlite.replaceRecent(db, "messagesByChain", String(Date.now()), `${chain}:${traceId}`, text );
sqlite.deleteRow(db, "messagesByChain", `${oldest.key}`);
}
return;
}
// add messages if tables not full
sqlite.replaceRecent(db, "messages", String(Date.now()), `${traceId}`, timestamp);
sqlite.replaceRecent(db, "messages", String(Date.now()), `${traceId}`, text);

if (chain) sqlite.replaceRecent(db, "messagesByChain", String(Date.now()), `${chain}:${traceId}`, timestamp );
if (chain) sqlite.replaceRecent(db, "messagesByChain", String(Date.now()), `${chain}:${traceId}`, text );
}

export function selectMessages(db: Database, limit: number, sortBy: string, chain?: string, moduleHash?: string,) {
Expand Down
28 changes: 28 additions & 0 deletions src/result.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
export type Result<T = undefined, E = Error> = OkResult<T> | ErrResult<E>;

type OkResult<P = undefined> = { success: true } & (undefined extends P ? {} : { payload: P });
type ErrResult<E = Error> = { success: false; error: E };

export function Ok<
P extends string | number | object | undefined = undefined,
R = P extends undefined ? OkResult : OkResult<P>
>(payload?: P): R {
if (payload !== undefined) {
return { success: true, payload } as R;
}
return { success: true } as R;
}

export function Err<E = Error>(error: E): ErrResult<E> {
return { success: false, error };
}

export function UnknownErr(err: unknown): ErrResult {
if (err instanceof Error) {
return Err(err);
} else if (typeof err === "string") {
return Err(new Error(err));
} else {
return Err(new Error(JSON.stringify(err)));
}
}
25 changes: 25 additions & 0 deletions src/webhook/singatureEd25519.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { PUBLIC_KEYS } from "../config.js";
import { toText } from "../fetch/cors.js";
import { Err, Ok, Result } from "../result.js";
import { cachedVerify } from "substreams-sink-webhook/auth";

export async function signatureEd25519(req: Request, text: string): Promise<Result<undefined, Response>> {
const signature = req.headers.get("x-signature-ed25519");
const expiry = req.headers.get("x-signature-ed25519-expiry");
const publicKey = req.headers.get("x-signature-ed25519-public-key");

if (!signature) return Err(toText("missing required signature in headers", 400));
if (!expiry) return Err(toText("missing required expiry in headers", 400));
if (!publicKey) return Err(toText("missing required public key in headers", 400));
if (!text) return Err(toText("missing body", 400));

if (!PUBLIC_KEYS.includes(publicKey)) {
return Err(toText("invalid public key", 401));
}

if (!cachedVerify(signature, Number(expiry), publicKey)) {
return Err(toText("invalid request signature", 401));
}

return Ok();
}

0 comments on commit f65c2f1

Please sign in to comment.