Skip to content

Commit

Permalink
Retry blockchain calls with a back-off delay
Browse files Browse the repository at this point in the history
Signed-off-by: Matthew Whitehead <matthew1001@gmail.com>
  • Loading branch information
matthew1001 authored and shorsher committed Mar 29, 2023
1 parent 206aab4 commit bfcc92d
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 39 deletions.
34 changes: 31 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ simple token support, and may provide a starting point for developing
production contracts that can be used with this connector.

To be usable by this connector, an ERC1155 contract should do all of the following:

1. Conform to [IERC1155MixedFungible](samples/solidity/contracts/IERC1155MixedFungible.sol).
2. Group tokens into clear fungible and non-fungible pools by partitioning the token ID space via the split bit implementation detailed in the comments in [ERC1155MixedFungible](samples/solidity/contracts/ERC1155MixedFungible.sol).

Expand All @@ -32,8 +33,9 @@ are additional methods used by the token connector to guess at the contract ABI
but is the preferred method for most use cases.

To leverage this capability in a running FireFly environment, you must:

1. [Upload the token contract ABI to FireFly](https://hyperledger.github.io/firefly/tutorials/custom_contracts/ethereum.html)
as a contract interface.
as a contract interface.
2. Include the `interface` parameter when [creating the pool on FireFly](https://hyperledger.github.io/firefly/tutorials/tokens).

This will cause FireFly to parse the interface and provide ABI details
Expand Down Expand Up @@ -94,8 +96,8 @@ All approvals are global and will apply to all tokens across _all_ pools on a pa

The following APIs are not part of the fftokens standard, but are exposed under `/api/v1`:

* `GET /balance` - Get token balance
* `GET /receipt/:id` - Get receipt for a previous request
- `GET /balance` - Get token balance
- `GET /receipt/:id` - Get receipt for a previous request

## Running the service

Expand Down Expand Up @@ -143,3 +145,29 @@ $ npm run lint
# formatting
$ npm run format
```

## Blockchain retry behaviour

Most short-term outages should be handled by the blockchain connector. For example if the blockchain node returns `HTTP 429` due to rate limiting
it is the blockchain connector's responsibility to use appropriate back-off retries to attempt to make the required blockchain call successfully.

There are cases where the token connector may need to perform its own back-off retry for a blockchain action. For example if the blockchain connector
microservice has crashed and is in the process of restarting just as the token connector is trying to query an NFT token URI to enrich a token event, if
the token connector doesn't perform a retry then the event will be returned without the token URI populated.

The token connector has configurable retry behaviour for all blockchain related calls. By default the connector will perform up to 15 retries with a back-off
interval between each one. The default first retry interval is 100ms and doubles up to a maximum of 10s per retry interval. Retries are only performed where
the error returned from the REST call matches a configurable regular expression retry condition. The default retry condition is `.*ECONN.*` which ensures
retries take place for common TCP errors such as `ECONNRESET` and `ECONNREFUSED`.

The configurable retry settings are:

- `RETRY_BACKOFF_FACTOR` (default `2`)
- `RETRY_BACKOFF_LIMIT_MS` (default `10000`)
- `RETRY_BACKOFF_INITIAL_MS` (default `100`)
- `RETRY_CONDITION` (default `.*ECONN.*`)
- `RETRY_MAX_ATTEMPTS` (default `15`)

Setting `RETRY_CONDITION` to `""` disables retries. Setting `RETRY_MAX_ATTEMPTS` to `-1` causes it to retry indefinitely.

Note, the token connector will make a total of `RETRY_MAX_ATTEMPTS` + 1 calls for a given retryable call (1 original attempt and `RETRY_MAX_ATTEMPTS` retries)
15 changes: 12 additions & 3 deletions src/main.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2023 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -32,7 +32,7 @@ import {
TokenTransferEvent,
} from './tokens/tokens.interfaces';
import { EventStreamService } from './event-stream/event-stream.service';
import { BlockchainConnectorService } from './tokens/blockchain.service';
import { BlockchainConnectorService, RetryConfiguration } from './tokens/blockchain.service';
import { requestIDMiddleware } from './request-context/request-id.middleware';
import { newContext } from './request-context/request-context.decorator';

Expand Down Expand Up @@ -81,6 +81,15 @@ async function bootstrap() {
const contractAddress = config.get<string>('CONTRACT_ADDRESS', '');
const passthroughHeaderString = config.get<string>('PASSTHROUGH_HEADERS', '');

// Configuration for blockchain call retries
const blockchainRetryCfg: RetryConfiguration = {
retryBackOffFactor: config.get<number>('RETRY_BACKOFF_FACTOR', 2),
retryBackOffLimit: config.get<number>('RETRY_BACKOFF_LIMIT_MS', 10000),
retryBackOffInitial: config.get<number>('RETRY_BACKOFF_INITIAL_MS', 100),
retryCondition: config.get<string>('RETRY_CONDITION', '.*ECONN.*'),
retriesMax: config.get<number>('RETRY_MAX_ATTEMPTS', 15),
};

const passthroughHeaders: string[] = [];
for (const h of passthroughHeaderString.split(',')) {
passthroughHeaders.push(h.toLowerCase());
Expand All @@ -90,7 +99,7 @@ async function bootstrap() {
app.get(TokensService).configure(ethConnectUrl, instancePath, topic, contractAddress);
app
.get(BlockchainConnectorService)
.configure(ethConnectUrl, username, password, passthroughHeaders);
.configure(ethConnectUrl, username, password, passthroughHeaders, blockchainRetryCfg);

if (autoInit.toLowerCase() !== 'false') {
await app.get(TokensService).init(newContext());
Expand Down
134 changes: 106 additions & 28 deletions src/tokens/blockchain.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2023 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -38,6 +38,14 @@ import {
const sendTransactionHeader = 'SendTransaction';
const queryHeader = 'Query';

export interface RetryConfiguration {
retryBackOffFactor: number;
retryBackOffLimit: number;
retryBackOffInitial: number;
retryCondition: string;
retriesMax: number;
}

@Injectable()
export class BlockchainConnectorService {
private readonly logger = new Logger(BlockchainConnectorService.name);
Expand All @@ -47,13 +55,22 @@ export class BlockchainConnectorService {
password: string;
passthroughHeaders: string[];

retryConfiguration: RetryConfiguration;

constructor(private http: HttpService) {}

configure(baseUrl: string, username: string, password: string, passthroughHeaders: string[]) {
configure(
baseUrl: string,
username: string,
password: string,
passthroughHeaders: string[],
retryConfiguration: RetryConfiguration,
) {
this.baseUrl = baseUrl;
this.username = username;
this.password = password;
this.passthroughHeaders = passthroughHeaders;
this.retryConfiguration = retryConfiguration;
}

private requestOptions(ctx: Context): AxiosRequestConfig {
Expand Down Expand Up @@ -85,22 +102,77 @@ export class BlockchainConnectorService {
});
}

// Check if retry condition matches the err that's been hit
private matchesRetryCondition(err: any): boolean {
return (
this.retryConfiguration.retryCondition != '' &&
`${err}`.match(this.retryConfiguration.retryCondition) !== null
);
}

// Delay by the appropriate amount of time given the iteration the caller is in
private async backoffDelay(iteration: number) {
const delay = Math.min(
this.retryConfiguration.retryBackOffInitial *
Math.pow(this.retryConfiguration.retryBackOffFactor, iteration),
this.retryConfiguration.retryBackOffLimit,
);
await new Promise(resolve => setTimeout(resolve, delay));
}

// Generic helper function that makes a given blockchain function retryable
// by using synchronous back-off delays for cases where the function returns
// an error which matches the configured retry condition
private async retryableCall<T = any>(
blockchainFunction: () => Promise<AxiosResponse<T>>,
): Promise<AxiosResponse<T>> {
let retries = 0;
for (
;
this.retryConfiguration.retriesMax == -1 || retries <= this.retryConfiguration.retriesMax;
this.retryConfiguration.retriesMax == -1 || retries++ // Don't inc 'retries' if 'retriesMax' if set to -1 (infinite retries)
) {
try {
return await blockchainFunction();
} catch (e) {
if (this.matchesRetryCondition(e)) {
this.logger.debug(`Retry condition matched for error ${e}`);
// Wait for a backed-off delay before trying again
await this.backoffDelay(retries);
} else {
// Whatever the error was it's not one we will retry for
throw e;
}
}
}

throw new InternalServerErrorException(
`Call to blockchain connector failed after ${retries} attempts`,
);
}

async getContractInfo(ctx: Context, url: string) {
const response = await this.wrapError(
lastValueFrom(this.http.get<ContractInfoResponse>(url, this.requestOptions(ctx))),
this.retryableCall<ContractInfoResponse>(
async (): Promise<AxiosResponse<ContractInfoResponse>> => {
return lastValueFrom(this.http.get<ContractInfoResponse>(url, this.requestOptions(ctx)));
},
),
);
return response.data;
}

async query(ctx: Context, to: string, method?: IAbiMethod, params?: any[]) {
const response = await this.wrapError(
lastValueFrom(
this.http.post<EthConnectReturn>(
this.baseUrl,
{ headers: { type: queryHeader }, to, method, params },
this.requestOptions(ctx),
),
),
this.retryableCall<EthConnectReturn>(async (): Promise<AxiosResponse<EthConnectReturn>> => {
return lastValueFrom(
this.http.post<EthConnectReturn>(
this.baseUrl,
{ headers: { type: queryHeader }, to, method, params },
this.requestOptions(ctx),
),
);
}),
);
return response.data;
}
Expand All @@ -114,31 +186,37 @@ export class BlockchainConnectorService {
params?: any[],
) {
const response = await this.wrapError(
lastValueFrom(
this.http.post<EthConnectAsyncResponse>(
this.baseUrl,
{
headers: { id, type: sendTransactionHeader },
from,
to,
method,
params,
},
this.requestOptions(ctx),
),
this.retryableCall<EthConnectAsyncResponse>(
async (): Promise<AxiosResponse<EthConnectAsyncResponse>> => {
return lastValueFrom(
this.http.post<EthConnectAsyncResponse>(
this.baseUrl,
{
headers: { id, type: sendTransactionHeader },
from,
to,
method,
params,
},
this.requestOptions(ctx),
),
);
},
),
);
return response.data;
}

async getReceipt(ctx: Context, id: string): Promise<EventStreamReply> {
const response = await this.wrapError(
lastValueFrom(
this.http.get<EventStreamReply>(new URL(`/reply/${id}`, this.baseUrl).href, {
validateStatus: status => status < 300 || status === 404,
...this.requestOptions(ctx),
}),
),
this.retryableCall<EventStreamReply>(async (): Promise<AxiosResponse<EventStreamReply>> => {
return lastValueFrom(
this.http.get<EventStreamReply>(new URL(`/reply/${id}`, this.baseUrl).href, {
validateStatus: status => status < 300 || status === 404,
...this.requestOptions(ctx),
}),
);
}),
);
if (response.status === 404) {
throw new NotFoundException();
Expand Down
Loading

0 comments on commit bfcc92d

Please sign in to comment.