Skip to content

Commit

Permalink
update onRestartInactivitySeconds
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisCarriere committed Aug 2, 2023
1 parent 3274925 commit bba4e3f
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 18 deletions.
8 changes: 5 additions & 3 deletions example.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import pkg from "./package.json" assert { type: "json" };
import { commander, setup, prometheus, http } from "./dist/index.js";
import { commander, setup, prometheus, http, logger } from "./dist/index.js";

// Setup CLI using Commander
const program = commander.program(pkg);
const command = commander.run(program, pkg);
logger.setName(pkg.name);

// Custom Prometheus Counters
const customCounter = prometheus.registerCounter("custom_counter");

command.action(async options => {
// Setup sink for Block Emitter
const { emitter } = await setup(options, pkg);
const {emitter} = await setup(options);
console.log("setup")

// Stream Blocks
Expand All @@ -26,6 +27,7 @@ command.action(async options => {

// Start streaming
await emitter.start();
console.log("finished");
http.server.close()
console.log("✅ finished");
})
program.parse();
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "substreams-sink",
"version": "0.8.2",
"version": "0.9.0",
"description": "Substreams Sink",
"type": "module",
"exports": "./dist/index.js",
Expand Down
16 changes: 5 additions & 11 deletions src/restartInactivitySeconds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,21 @@ const CHECK_INACTIVITY_INTERVAL = 1000;

export function onRestartInactivitySeconds(emitter: BlockEmitter, restartInactivitySeconds: number) {
let lastUpdate = now();
let isStarted = false;
let isFinished = false;

async function checkInactivity() {
if (now() - lastUpdate > restartInactivitySeconds) {
if (!isStarted) return;
logger.error(`Restarting due to inactivity for ${restartInactivitySeconds} seconds`);
process.exit(1);
process.exit(1); // force quit
}
if (isFinished) return;
if (isFinished) return; // exit out of the loop
await setTimeout(CHECK_INACTIVITY_INTERVAL);
checkInactivity();
}

emitter.on("cursor", () => {
isStarted = true;
emitter.on("cursor", (cursor, clock) => {
lastUpdate = now();
});

emitter.on("block", (block) => {
if (block.clock?.number === emitter.request.stopBlockNum - 1n) {
console.log(clock.number, emitter.request.stopBlockNum)
if (clock.number >= emitter.request.stopBlockNum - 1n) {
isFinished = true;
};
});
Expand Down
11 changes: 8 additions & 3 deletions src/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ import { logger } from "./logger.js";
import { onRestartInactivitySeconds } from "./restartInactivitySeconds.js";
import { applyParams } from "./applyParams.js";

export async function setup(options: RunOptions, pkg: { name: string }) {
export async function setup(options: RunOptions) {
// Configure logging with TSLog
if (options.verbose) logger.enable();
logger.setName(pkg.name);

// Download Substream package
const manifest = options.manifest;
Expand Down Expand Up @@ -75,7 +74,13 @@ export async function setup(options: RunOptions, pkg: { name: string }) {
await setTimeout(options.delayBeforeStart);

// Restart on inactivity
onRestartInactivitySeconds(emitter, options.restartInactivitySeconds);
// only activate once first cursor is received
let isStarted = false;
emitter.on("cursor", () => {
if ( isStarted ) return;
onRestartInactivitySeconds(emitter, options.restartInactivitySeconds);
isStarted = true;
});

return { emitter, substreamPackage, moduleHash, startCursor };
}

0 comments on commit bba4e3f

Please sign in to comment.