Skip to content

Commit

Permalink
feat: Develop (#35)
Browse files Browse the repository at this point in the history
* Add static checks

* fix: data actuality metric on app init (#29)

* fix: data actuality metric on app init

* fix: add `absent` to alert condition

* feat: operators 24h balance diff and new sync participation calc (#28)

* feat: operators 24h balance diff and new sync participation calc

* fix: typo

* fix: remarks

* fix: dashboard

* sync dashboards with infra

* feat: add `CriticalSlashing` (#30)

* feat: store Lido keys to sqlite db file (#32)

* feat: store Lido keys to sqlite db file

* fix: rename

* feat: add mkdir to Dockerfile

* fix: RUN

* feat: use fallback in non-finalized node case (#33)

* feat: use fallback in non-finalized node case

* fix: ordering `then` `catch` blocks

* feat: change default retries count (#34)
  • Loading branch information
vgorkavenko authored Oct 3, 2022
1 parent 3b5ce62 commit 45c77ed
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 72 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ WORKDIR /app
COPY --from=building /app/dist ./dist
COPY --from=building /app/node_modules ./node_modules
COPY ./package.json ./
RUN mkdir -p ./docker/validators/ && chown -R node:node ./docker/validators/

USER node

Expand Down
81 changes: 42 additions & 39 deletions README.md

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion src/common/config/env.validation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ export class EnvironmentVariables {

@IsNumber()
@Transform(({ value }) => parseInt(value, 10), { toClassOnly: true })
public CL_API_GET_BLOCK_INFO_MAX_RETRIES = 5;
public CL_API_MAX_RETRIES = 1;

@IsNumber()
@Transform(({ value }) => parseInt(value, 10), { toClassOnly: true })
public CL_API_GET_BLOCK_INFO_MAX_RETRIES = 1;

@IsNumber()
@Min(18950)
Expand All @@ -173,6 +177,9 @@ export class EnvironmentVariables {
@IsString()
public VALIDATOR_REGISTRY_FILE_SOURCE_PATH = './docker/validators/custom_mainnet.yaml';

@IsString()
public VALIDATOR_REGISTRY_LIDO_SOURCE_SQLITE_CACHE_PATH = './docker/validators/lido_mainnet.db';

/**
* Distance (down) from Blockchain Sync Participation average after which we think that our sync participation is bad
* For example:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,17 @@ import { NonEmptyArray } from 'fp-ts/NonEmptyArray';
interface RequestRetryOptions {
maxRetries?: number;
dataOnly?: boolean;
fallbackConditionCallback?: (e: any) => any;
useFallbackOnRejected?: (e: any) => boolean;
useFallbackOnResolved?: (r: any) => boolean;
}

@Injectable()
export class ConsensusProviderService {
protected apiUrls: string[];
protected version = '';
protected genesisTime = 0n;
protected defaultMaxSlotDeepCount = 32 * 5;
protected defaultMaxSlotDeepCount = 32;
protected lastFinalizedSlot = { slot: 0n, fetchTime: 0 };

protected endpoints = {
version: 'eth/v1/node/version',
Expand Down Expand Up @@ -89,10 +91,25 @@ export class ConsensusProviderService {
);
}

public async getBeaconBlockHeader(state: bigint | string, maxRetries = 3): Promise<ShortBeaconBlockHeader> {
public async getBeaconBlockHeader(state: bigint | string): Promise<ShortBeaconBlockHeader> {
const blockHeader = await this.retryRequest<BlockHeaderResponse>(
(apiURL: string) => this.apiGet(apiURL, this.endpoints.beaconHeaders(state)),
{ maxRetries, fallbackConditionCallback: (e) => 404 != e.$httpCode },
{
maxRetries: this.config.get('CL_API_GET_BLOCK_INFO_MAX_RETRIES'),
useFallbackOnResolved: (r) => {
if (state == 'finalized') {
if (BigInt(r.data.header.message.slot) > this.lastFinalizedSlot.slot) {
this.lastFinalizedSlot = { slot: BigInt(r.data.header.message.slot), fetchTime: Number(Date.now()) };
} else if (Number(Date.now()) - this.lastFinalizedSlot.fetchTime > 420 * 1000) {
// if 'finalized' slot doesn't change ~7m we must switch to fallback
this.logger.error("Finalized slot hasn't changed in ~7m");
return true;
}
}
// for other states don't use fallback on resolved
return false;
},
},
);

return {
Expand All @@ -111,14 +128,16 @@ export class ConsensusProviderService {
this.logger.error('Unexpected status code while fetching block header');
throw e;
}
const someNotMissedNextBlock = await this.getNextNotMissedBlockHeader(slot);
// if block 64 is missed, try to get next not missed block header
const someNotMissedNextBlock = await this.getNextNotMissedBlockHeader(slot + 1n);

this.logger.log(
`Found next not missed slot [${
someNotMissedNextBlock.slotNumber
}] root [${someNotMissedNextBlock.blockRoot.toString()}] after slot [${slot}]`,
);

// and get the closest finalized block header in epoch by root from next
const [isMissed, notMissedPreviousBlock] = await this.checkSlotIsMissed(slot, someNotMissedNextBlock);

if (isMissed) {
Expand Down Expand Up @@ -233,7 +252,7 @@ export class ConsensusProviderService {
public async getBlockInfo(block: string | bigint): Promise<ShortBeaconBlockInfo> {
return <ShortBeaconBlockInfo>await this.retryRequest((apiURL: string) => this.apiGet(apiURL, this.endpoints.blockInfo(block)), {
maxRetries: this.config.get('CL_API_GET_BLOCK_INFO_MAX_RETRIES'),
fallbackConditionCallback: (e) => 404 != e.$httpCode,
useFallbackOnRejected: (e) => 404 != e.$httpCode,
}).catch((e) => {
if (404 != e.$httpCode) {
this.logger.error('Unexpected status code while fetching block info');
Expand All @@ -250,9 +269,9 @@ export class ConsensusProviderService {
epoch: string | bigint,
dependentRoot: string,
indexes: string[],
maxRetries = 3,
maxRetriesForGetCanonical = 3,
): Promise<AttesterDutyInfo[]> {
const retry = retrier(this.logger, maxRetries, 100, 10000, true);
const retry = retrier(this.logger, maxRetriesForGetCanonical, 100, 10000, true);
const request = async () => {
const res = <{ dependent_root: string; data: AttesterDutyInfo[] }>(
await this.retryRequest(
Expand All @@ -269,7 +288,7 @@ export class ConsensusProviderService {
return await request()
.catch(() => retry(() => request()))
.catch(() => {
throw Error(`Failed to get canonical attester duty info after ${maxRetries} retries`);
throw Error(`Failed to get canonical attester duty info after ${maxRetriesForGetCanonical} retries`);
});
}

Expand Down Expand Up @@ -300,12 +319,16 @@ export class ConsensusProviderService {
return result;
}

public async getCanonicalProposerDuties(epoch: string | bigint, dependentRoot: string, maxRetries = 3): Promise<ProposerDutyInfo[]> {
const retry = retrier(this.logger, maxRetries, 100, 10000, true);
public async getCanonicalProposerDuties(
epoch: string | bigint,
dependentRoot: string,
maxRetriesForGetCanonical = 3,
): Promise<ProposerDutyInfo[]> {
const retry = retrier(this.logger, maxRetriesForGetCanonical, 100, 10000, true);
const request = async () => {
const res = <{ dependent_root: string; data: ProposerDutyInfo[] }>await this.retryRequest(
(apiURL: string) => this.apiGet(apiURL, this.endpoints.proposerDutes(epoch)),
{ maxRetries, dataOnly: false },
{ dataOnly: false },
).catch((e) => {
this.logger.error('Unexpected status code while fetching proposer duties info');
throw e;
Expand All @@ -319,7 +342,7 @@ export class ConsensusProviderService {
return await request()
.catch(() => retry(() => request()))
.catch(() => {
throw Error(`Failed to get canonical proposer duty info after ${maxRetries} retries`);
throw Error(`Failed to get canonical proposer duty info after ${maxRetriesForGetCanonical} retries`);
});
}

Expand All @@ -328,35 +351,44 @@ export class ConsensusProviderService {
}

protected async retryRequest<T>(callback: (apiURL: string) => any, options?: RequestRetryOptions): Promise<T> {
const [maxRetries, dataOnly, fallbackConditionCallback] = [
options?.maxRetries ?? 5,
options?.dataOnly != undefined ? options.dataOnly : true,
options?.fallbackConditionCallback != undefined ? options.fallbackConditionCallback : () => true,
];
const retry = retrier(this.logger, maxRetries, 100, 10000, true);
options = {
maxRetries: options?.maxRetries ?? this.config.get('CL_API_MAX_RETRIES'),
dataOnly: options?.dataOnly ?? true,
useFallbackOnRejected: options?.useFallbackOnRejected ?? (() => true), // use fallback on error as default
useFallbackOnResolved: options?.useFallbackOnResolved ?? (() => false), // do NOT use fallback on success as default
};
const retry = retrier(this.logger, options.maxRetries, 100, 10000, true);
let res;
let err;
for (let i = 0; i < this.apiUrls.length; i++) {
if (res) break;
res = await callback(this.apiUrls[i])
.catch(rejectDelay(this.config.get('CL_API_RETRY_DELAY_MS')))
.catch(() => retry(() => callback(this.apiUrls[i])))
.then((r: any) => {
if (options.useFallbackOnResolved(r)) {
err = Error('Unresolved data on a successful CL API response');
return undefined;
}
return r;
})
.catch((e: any) => {
if (fallbackConditionCallback(e)) {
if (this.apiUrls.length == 1) {
this.logger.warn('Backup CL API URLs not passed');
throw e;
}
this.logger.error('Error while doing CL API request. Will try to switch to another API URL');
if (options.useFallbackOnRejected(e)) {
err = e;
return undefined;
}
throw e;
});
if (i == this.apiUrls.length - 1 && !res) {
throw Error('Error while doing CL API request on all passed URLs');
err.message = `Error while doing CL API request on all passed URLs. ${err.message}`;
throw err;
}
if (!res) {
this.logger.warn(`${err.message}. Error while doing CL API request. Will try to switch to another API URL`);
}
}

if (dataOnly) return res.data;
if (options.dataOnly) return res.data;
else return res;
}

Expand Down
6 changes: 3 additions & 3 deletions src/common/functions/retrier.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ export const retrier = (
try {
return await callback();
} catch (err: any) {
if (logWarning) {
logger.warn(err, `Retrying after (${minBackoffMs}ms). Remaining retries [${maxRetryCount}]`);
}
if (maxRetryCount <= 1 || minBackoffMs >= maxBackoffMs) {
throw err;
}
if (logWarning) {
logger.warn(err, `Retrying after (${minBackoffMs}ms). Remaining retries [${maxRetryCount}]`);
}
await sleep(minBackoffMs);
return await retrier(logger)(callback, maxRetryCount - 1, minBackoffMs * 2, maxBackoffMs, logWarning);
}
Expand Down
7 changes: 7 additions & 0 deletions src/inspector/inspector.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,16 @@ export class InspectorService implements OnModuleInit {
protected async waitForNextFinalizedSlot(nextSlot: bigint): Promise<{ slotToWrite: bigint; stateRoot: string; slotNumber: bigint }> {
const latestFinalizedBeaconBlock = await this.clClient.getBeaconBlockHeader('finalized');
if (latestFinalizedBeaconBlock.slotNumber >= nextSlot && nextSlot > this.dataProcessor.latestSlotInDb) {
// if new finalized slot has happened, from which we can get information about needed
// for example: latestSlotInDb = 32, nextSlot = 64, latestFinalizedBeaconBlock = 65
this.logger.log(
`Latest finalized slot [${latestFinalizedBeaconBlock.slotNumber}] found. Next slot [${nextSlot}]. Latest DB slot [${this.dataProcessor.latestSlotInDb}]`,
);

// try to get block 64 header
const [nextFinalizedBeaconBlock, isMissed] = await this.clClient.getBeaconBlockHeaderOrPreviousIfMissed(nextSlot);

// if it's not missed - just return it
if (!isMissed) {
this.logger.log(
`Fetched next slot [${nextFinalizedBeaconBlock.slotNumber}] with state root [${nextFinalizedBeaconBlock.stateRoot}]`,
Expand All @@ -90,6 +94,7 @@ export class InspectorService implements OnModuleInit {
};
}

// if it's missed that we return the closest finalized block stateRoot and slotNumber in epoch (for example - block 63)
this.logger.log(
`Fetched next slot [${nextFinalizedBeaconBlock.slotNumber}] with state root [${
nextFinalizedBeaconBlock.stateRoot
Expand All @@ -103,6 +108,8 @@ export class InspectorService implements OnModuleInit {
};
}

// if new finalized slot hasn't happened, from which we should get information about needed
// for example: latestSlotInDb = 32, nextSlot = 64, latestFinalizedBeaconBlock = 33
// just wait `CHAIN_SLOT_TIME_SECONDS` until finality happens
const sleepTime = this.config.get('CHAIN_SLOT_TIME_SECONDS');
this.logger.log(
Expand Down
6 changes: 4 additions & 2 deletions src/storage/database/database.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,23 @@ import { MikroOrmModule } from '@mikro-orm/nestjs';
import { Global, Module } from '@nestjs/common';
import { DatabaseService } from './database.service';
import { FlushMode } from '@mikro-orm/core';
import { ConfigService } from '../../common/config';

@Global()
@Module({
imports: [
MikroOrmModule.forRootAsync({
async useFactory() {
async useFactory(configService: ConfigService) {
return {
dbName: ':memory:',
dbName: configService.get('VALIDATOR_REGISTRY_LIDO_SOURCE_SQLITE_CACHE_PATH'),
type: 'sqlite',
allowGlobalContext: true,
autoLoadEntities: true,
cache: { enabled: false },
flushMode: FlushMode.ALWAYS,
};
},
inject: [ConfigService],
}),
],
providers: [DatabaseService],
Expand Down

0 comments on commit 45c77ed

Please sign in to comment.