Skip to content

Commit

Permalink
Merge pull request #128 from kaleido-io/backoff-retry-econn-errors
Browse files Browse the repository at this point in the history
Retry logic for blockchain calls
  • Loading branch information
matthew1001 authored Mar 29, 2023
2 parents 0b814b7 + 0982c00 commit 1cb1d6c
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 27 deletions.
31 changes: 29 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ are additional methods used by the token connector to guess at the contract ABI
but is the preferred method for most use cases.

To leverage this capability in a running FireFly environment, you must:

1. [Upload the token contract ABI to FireFly](https://hyperledger.github.io/firefly/tutorials/custom_contracts/ethereum.html)
as a contract interface.
as a contract interface.
2. Include the `interface` parameter when [creating the pool on FireFly](https://hyperledger.github.io/firefly/tutorials/tokens).

This will cause FireFly to parse the interface and provide ABI details
Expand Down Expand Up @@ -119,7 +120,7 @@ that specific token. If omitted, the approval covers all tokens.

The following APIs are not part of the fftokens standard, but are exposed under `/api/v1`:

* `GET /receipt/:id` - Get receipt for a previous request
- `GET /receipt/:id` - Get receipt for a previous request

## Running the service

Expand Down Expand Up @@ -179,3 +180,29 @@ $ npm run lint
# formatting
$ npm run format
```

## Blockchain retry behaviour

Most short-term outages should be handled by the blockchain connector. For example if the blockchain node returns `HTTP 429` due to rate limiting
it is the blockchain connector's responsibility to use appropriate back-off retries to attempt to make the required blockchain call successfully.

There are cases where the token connector may need to perform its own back-off retry for a blockchain action. For example if the blockchain connector
microservice has crashed and is in the process of restarting just as the token connector is trying to query an NFT token URI to enrich a token event, if
the token connector doesn't perform a retry then the event will be returned without the token URI populated.

The token connector has configurable retry behaviour for all blockchain related calls. By default the connector will perform up to 15 retries with a back-off
interval between each one. The default first retry interval is 100ms and doubles up to a maximum of 10s per retry interval. Retries are only performed where
the error returned from the REST call matches a configurable regular expression retry condition. The default retry condition is `.*ECONN.*` which ensures
retries take place for common TCP errors such as `ECONNRESET` and `ECONNREFUSED`.

The configurable retry settings are:

- `RETRY_BACKOFF_FACTOR` (default `2`)
- `RETRY_BACKOFF_LIMIT_MS` (default `10000`)
- `RETRY_BACKOFF_INITIAL_MS` (default `100`)
- `RETRY_CONDITION` (default `.*ECONN.*`)
- `RETRY_MAX_ATTEMPTS` (default `15`)

Setting `RETRY_CONDITION` to `""` disables retries. Setting `RETRY_MAX_ATTEMPTS` to `-1` causes it to retry indefinitely.

Note, the token connector will make a total of `RETRY_MAX_ATTEMPTS` + 1 calls for a given retryable call (1 original attempt and `RETRY_MAX_ATTEMPTS` retries)
13 changes: 11 additions & 2 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import { EventStreamReply } from './event-stream/event-stream.interfaces';
import { EventStreamService } from './event-stream/event-stream.service';
import { requestIDMiddleware } from './request-context/request-id.middleware';
import { RequestLoggingInterceptor } from './request-logging.interceptor';
import { BlockchainConnectorService } from './tokens/blockchain.service';
import { BlockchainConnectorService, RetryConfiguration } from './tokens/blockchain.service';
import {
TokenApprovalEvent,
TokenBurnEvent,
Expand Down Expand Up @@ -84,6 +84,15 @@ async function bootstrap() {
const legacyERC20 = config.get<string>('USE_LEGACY_ERC20_SAMPLE', '').toLowerCase() === 'true';
const legacyERC721 = config.get<string>('USE_LEGACY_ERC721_SAMPLE', '').toLowerCase() === 'true';

// Configuration for blockchain call retries
const blockchainRetryCfg: RetryConfiguration = {
retryBackOffFactor: config.get<number>('RETRY_BACKOFF_FACTOR', 2),
retryBackOffLimit: config.get<number>('RETRY_BACKOFF_LIMIT_MS', 10000),
retryBackOffInitial: config.get<number>('RETRY_BACKOFF_INITIAL_MS', 100),
retryCondition: config.get<string>('RETRY_CONDITION', '.*ECONN.*'),
retriesMax: config.get<number>('RETRY_MAX_ATTEMPTS', 15),
};

const passthroughHeaders: string[] = [];
for (const h of passthroughHeaderString.split(',')) {
passthroughHeaders.push(h.toLowerCase());
Expand All @@ -93,7 +102,7 @@ async function bootstrap() {
app.get(TokensService).configure(ethConnectUrl, topic, factoryAddress);
app
.get(BlockchainConnectorService)
.configure(ethConnectUrl, fftmUrl, username, password, passthroughHeaders);
.configure(ethConnectUrl, fftmUrl, username, password, passthroughHeaders, blockchainRetryCfg);
app.get(AbiMapperService).configure(legacyERC20, legacyERC721);

if (autoInit.toLowerCase() !== 'false') {
Expand Down
110 changes: 91 additions & 19 deletions src/tokens/blockchain.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ import { Context } from '../request-context/request-context.decorator';
import { FFRequestIDHeader } from '../request-context/constants';
import { EthConnectAsyncResponse, EthConnectReturn, IAbiMethod } from './tokens.interfaces';

export interface RetryConfiguration {
retryBackOffFactor: number;
retryBackOffLimit: number;
retryBackOffInitial: number;
retryCondition: string;
retriesMax: number;
}

const sendTransactionHeader = 'SendTransaction';
const queryHeader = 'Query';

Expand All @@ -43,6 +51,8 @@ export class BlockchainConnectorService {
password: string;
passthroughHeaders: string[];

retryConfiguration: RetryConfiguration;

constructor(public http: HttpService) {}

configure(
Expand All @@ -51,12 +61,14 @@ export class BlockchainConnectorService {
username: string,
password: string,
passthroughHeaders: string[],
retryConfiguration: RetryConfiguration,
) {
this.baseUrl = baseUrl;
this.fftmUrl = fftmUrl;
this.username = username;
this.password = password;
this.passthroughHeaders = passthroughHeaders;
this.retryConfiguration = retryConfiguration;
}

private requestOptions(ctx: Context): AxiosRequestConfig {
Expand Down Expand Up @@ -88,15 +100,67 @@ export class BlockchainConnectorService {
});
}

// Check if retry condition matches the err that's been hit
private matchesRetryCondition(err: any): boolean {
return (
this.retryConfiguration.retryCondition != '' &&
`${err}`.match(this.retryConfiguration.retryCondition) !== null
);
}

// Delay by the appropriate amount of time given the iteration the caller is in
private async backoffDelay(iteration: number) {
const delay = Math.min(
this.retryConfiguration.retryBackOffInitial *
Math.pow(this.retryConfiguration.retryBackOffFactor, iteration),
this.retryConfiguration.retryBackOffLimit,
);
await new Promise(resolve => setTimeout(resolve, delay));
}

// Generic helper function that makes a given blockchain function retryable
// by using synchronous back-off delays for cases where the function returns
// an error which matches the configured retry condition
private async retryableCall<T = any>(
blockchainFunction: () => Promise<AxiosResponse<T>>,
): Promise<AxiosResponse<T>> {
let retries = 0;
for (
;
this.retryConfiguration.retriesMax == -1 || retries <= this.retryConfiguration.retriesMax;
this.retryConfiguration.retriesMax == -1 || retries++ // Don't inc 'retries' if 'retriesMax' if set to -1 (infinite retries)
) {
try {
return await blockchainFunction();
} catch (e) {
if (this.matchesRetryCondition(e)) {
this.logger.debug(`Retry condition matched for error ${e}`);
// Wait for a backed-off delay before trying again
await this.backoffDelay(retries);
} else {
// Whatever the error was it's not one we will retry for
throw e;
}
}
}

throw new InternalServerErrorException(
`Call to blockchain connector failed after ${retries} attempts`,
);
}

async query(ctx: Context, to: string, method?: IAbiMethod, params?: any[]) {
const url = this.baseUrl;
const response = await this.wrapError(
lastValueFrom(
this.http.post<EthConnectReturn>(
this.baseUrl,
{ headers: { type: queryHeader }, to, method, params },
this.requestOptions(ctx),
),
),
this.retryableCall<EthConnectReturn>(async (): Promise<AxiosResponse<EthConnectReturn>> => {
return lastValueFrom(
this.http.post(
url,
{ headers: { type: queryHeader }, to, method, params },
this.requestOptions(ctx),
),
);
}),
);
return response.data;
}
Expand All @@ -110,26 +174,34 @@ export class BlockchainConnectorService {
params?: any[],
) {
const url = this.fftmUrl !== undefined && this.fftmUrl !== '' ? this.fftmUrl : this.baseUrl;

const response = await this.wrapError(
lastValueFrom(
this.http.post<EthConnectAsyncResponse>(
url,
{ headers: { id, type: sendTransactionHeader }, from, to, method, params },
this.requestOptions(ctx),
),
this.retryableCall<EthConnectAsyncResponse>(
async (): Promise<AxiosResponse<EthConnectAsyncResponse>> => {
return lastValueFrom(
this.http.post(
url,
{ headers: { id, type: sendTransactionHeader }, from, to, method, params },
this.requestOptions(ctx),
),
);
},
),
);
return response.data;
}

async getReceipt(ctx: Context, id: string): Promise<EventStreamReply> {
const url = this.baseUrl;
const response = await this.wrapError(
lastValueFrom(
this.http.get<EventStreamReply>(new URL(`/reply/${id}`, this.baseUrl).href, {
validateStatus: status => status < 300 || status === 404,
...this.requestOptions(ctx),
}),
),
this.retryableCall<EventStreamReply>(async (): Promise<AxiosResponse<EventStreamReply>> => {
return lastValueFrom(
this.http.get(new URL(`/reply/${id}`, url).href, {
validateStatus: status => status < 300 || status === 404,
...this.requestOptions(ctx),
}),
);
}),
);
if (response.status === 404) {
throw new NotFoundException();
Expand Down
63 changes: 61 additions & 2 deletions src/tokens/tokens.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import {
import { EventStreamService } from '../event-stream/event-stream.service';
import { EventStreamProxyGateway } from '../eventstream-proxy/eventstream-proxy.gateway';
import { AbiMapperService } from './abimapper.service';
import { BlockchainConnectorService } from './blockchain.service';
import { BlockchainConnectorService, RetryConfiguration } from './blockchain.service';
import {
AsyncResponse,
EthConnectAsyncResponse,
Expand Down Expand Up @@ -196,6 +196,14 @@ describe('TokensService', () => {
);
};

const mockECONNErrors = (count: number) => {
for (let i = 0; i < count; i++) {
http.post.mockImplementationOnce(() => {
throw new Error('connect ECONNREFUSED 10.1.2.3');
});
}
};

beforeEach(async () => {
http = {
get: jest.fn(),
Expand Down Expand Up @@ -232,10 +240,18 @@ describe('TokensService', () => {
.useValue(eventstream)
.compile();

let blockchainRetryCfg: RetryConfiguration = {
retryBackOffFactor: 2,
retryBackOffLimit: 500,
retryBackOffInitial: 50,
retryCondition: '.*ECONN.*',
retriesMax: 15,
};

service = module.get(TokensService);
service.configure(BASE_URL, TOPIC, '');
blockchain = module.get(BlockchainConnectorService);
blockchain.configure(BASE_URL, '', '', '', []);
blockchain.configure(BASE_URL, '', '', '', [], blockchainRetryCfg);
});

it('should be defined', () => {
Expand Down Expand Up @@ -1042,6 +1058,49 @@ describe('TokensService', () => {
expect(http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, { headers });
});

it('should mint ERC721WithData token with correct abi, custom uri, and inputs after 6 ECONNREFUSED retries', async () => {
const ctx = newContext();
const headers = {
'x-firefly-request-id': ctx.requestId,
};

const request: TokenMint = {
tokenIndex: '721',
signer: IDENTITY,
poolLocator: ERC721_WITH_DATA_V1_POOL_ID,
to: '0x123',
uri: 'ipfs://CID',
};

const response: EthConnectAsyncResponse = {
id: 'responseId',
sent: true,
};

const mockEthConnectRequest: EthConnectMsgRequest = {
headers: {
type: 'SendTransaction',
},
from: IDENTITY,
to: CONTRACT_ADDRESS,
method: ERC721WithDataV1ABI.abi.find(abi => abi.name === MINT_WITH_URI) as IAbiMethod,
params: ['0x123', '721', '0x00', 'ipfs://CID'],
};

http.post.mockReturnValueOnce(
new FakeObservable(<EthConnectReturn>{
output: true,
}),
);
mockECONNErrors(6);
http.post.mockReturnValueOnce(new FakeObservable(response));

await service.mint(ctx, request);

expect(http.post).toHaveBeenCalledWith(BASE_URL, mockEthConnectRequest, { headers });
expect(http.post).toHaveBeenCalledTimes(8); // Expect initial submit OK, 6 ECONN errors, final call OK = 8 POSTs
});

it('should mint ERC721WithData token with correct abi, custom uri, auto-indexing, and inputs', async () => {
const ctx = newContext();
const headers = {
Expand Down
14 changes: 12 additions & 2 deletions test/app.e2e-context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { EventStreamService } from '../src/event-stream/event-stream.service';
import { EventStreamProxyGateway } from '../src/eventstream-proxy/eventstream-proxy.gateway';
import { TokensService } from '../src/tokens/tokens.service';
import { requestIDMiddleware } from '../src/request-context/request-id.middleware';
import { BlockchainConnectorService } from '../src/tokens/blockchain.service';
import { BlockchainConnectorService, RetryConfiguration } from '../src/tokens/blockchain.service';

export const BASE_URL = 'http://eth';
export const INSTANCE_PATH = '/tokens';
Expand Down Expand Up @@ -69,9 +69,19 @@ export class TestContext {
this.app.use(requestIDMiddleware);
await this.app.init();

let blockchainRetryCfg: RetryConfiguration = {
retryBackOffFactor: 2,
retryBackOffLimit: 500,
retryBackOffInitial: 50,
retryCondition: '.*ECONN.*',
retriesMax: 15,
};

this.app.get(EventStreamProxyGateway).configure('url', TOPIC);
this.app.get(TokensService).configure(BASE_URL, TOPIC, '');
this.app.get(BlockchainConnectorService).configure(BASE_URL, '', '', '', []);
this.app
.get(BlockchainConnectorService)
.configure(BASE_URL, '', '', '', [], blockchainRetryCfg);

(this.app.getHttpServer() as Server).listen();
this.server = request(this.app.getHttpServer());
Expand Down

0 comments on commit 1cb1d6c

Please sign in to comment.