diff --git a/gax/src/gax.ts b/gax/src/gax.ts index 8bcff7947..64f381443 100644 --- a/gax/src/gax.ts +++ b/gax/src/gax.ts @@ -386,6 +386,15 @@ export function convertRetryOptions( // this is in seconds and needs to be converted to milliseconds and the totalTimeoutMillis parameter if (options.retryRequestOptions.totalTimeout !== undefined) { totalTimeoutMillis = options.retryRequestOptions.totalTimeout * 1000; + } else { + if (options.maxRetries === undefined) { + totalTimeoutMillis = 30000; + warn( + 'retry_request_options_no_max_retries_timeout', + 'Neither maxRetries nor totalTimeout were passed. Defaulting to totalTimeout of 30000ms.', + 'MissingParameterWarning' + ); + } } // for the variables the user wants to override, override in the backoff settings object we made diff --git a/gax/src/streamingCalls/streaming.ts b/gax/src/streamingCalls/streaming.ts index 583425fc9..5dd6fd9e1 100644 --- a/gax/src/streamingCalls/streaming.ts +++ b/gax/src/streamingCalls/streaming.ts @@ -25,11 +25,15 @@ import { RequestType, SimpleCallbackFunction, } from '../apitypes'; -import {RetryOptions, RetryRequestOptions} from '../gax'; +import { + RetryOptions, + RetryRequestOptions, + createDefaultBackoffSettings, +} from '../gax'; import {GoogleError} from '../googleError'; -import {streamingRetryRequest} from '../streamingRetryRequest'; import {Status} from '../status'; - +import {PassThrough} from 'stream'; +import {ResponseType} from '../apitypes'; // eslint-disable-next-line @typescript-eslint/no-var-requires const duplexify: DuplexifyConstructor = require('duplexify'); // eslint-disable-next-line @typescript-eslint/no-var-requires @@ -80,6 +84,17 @@ export enum StreamType { BIDI_STREAMING = 3, } +// In retry-request, you could pass parameters to request using the requestOpts parameter +// when we called retry-request from gax, we always passed null +// passing null here removes an unnecessary parameter from this implementation +const requestOps = null; + +interface streamingRetryRequestOptions { + request: Function; + retry: RetryOptions; + maxRetries?: number; +} + export class StreamProxy extends duplexify implements GRPCCallResult { type: StreamType; private _callback: APICallback; @@ -90,8 +105,6 @@ export class StreamProxy extends duplexify implements GRPCCallResult { gaxServerStreamingRetries?: boolean; apiCall?: SimpleCallbackFunction; argument?: {}; - prevDeadline?: number; - retries?: number = 0; /** * StreamProxy is a proxy to gRPC-streaming method. * @@ -118,7 +131,6 @@ export class StreamProxy extends duplexify implements GRPCCallResult { this.rest = rest; this.gaxServerStreamingRetries = gaxServerStreamingRetries; } - private shouldRetryRequest(error: Error, retry: RetryOptions): boolean { const e = GoogleError.parseGRPCStatusDetails(error); let shouldRetry = this.defaultShouldRetry(e!, retry); @@ -136,166 +148,60 @@ export class StreamProxy extends duplexify implements GRPCCallResult { } } - retry(stream: CancellableStream, retry: RetryOptions) { - let retryArgument = this.argument! as unknown as RequestType; - if (typeof retry.getResumptionRequestFn! === 'function') { - const resumptionRetryArgument = - retry.getResumptionRequestFn(retryArgument); - if (resumptionRetryArgument !== undefined) { - retryArgument = resumptionRetryArgument; - } - } - this.resetStreams(stream); - - const newStream = this.apiCall!( - retryArgument, - this._callback - ) as CancellableStream; - this.stream = newStream; - - this.streamHandoffHelper(newStream, retry); - return newStream; - } - /** * Helper function to handle total timeout + max retry check for server streaming retries * @param {number} deadline - the current retry deadline * @param {number} maxRetries - maximum total number of retries - * @param {number} totalTimeoutMillis - total timeout in milliseconds + * @param {number} totalTimeoutMillis - total timeout in milliseconds used in timeout calculation + * @param {GoogleError} originalError - underlying error received by the stream + * @param {originalTimeout} originalTimeout - the original Timeout set in backoff settings + * @param {retries} retries - the number of retries the call has made so far */ throwIfMaxRetriesOrTotalTimeoutExceeded( deadline: number, maxRetries: number, - totalTimeoutMillis: number + totalTimeoutMillis: number, + originalError: GoogleError, + originalTimeout: number | undefined, + retries: number ): void { const now = new Date(); + const nowTime = now.getTime(); if ( - this.prevDeadline! !== undefined && - deadline && - now.getTime() >= this.prevDeadline + originalTimeout && + (totalTimeoutMillis === 0 || + totalTimeoutMillis < 0 || + (deadline && nowTime >= deadline)) ) { const error = new GoogleError( - `Total timeout of API exceeded ${totalTimeoutMillis} milliseconds before any response was received.` + `Total timeout of API exceeded ${originalTimeout} milliseconds ${ + originalError ? `retrying error ${originalError} ` : '' + } before any response was received.` ); error.code = Status.DEADLINE_EXCEEDED; - this.emit('error', error); - this.destroy(); - // Without throwing error you get unhandled error since we are returning a new stream - // There might be a better way to do this throw error; } - - if (this.retries && this.retries >= maxRetries) { + if (maxRetries === 0) { + const error: GoogleError = originalError; + error.note = 'Max retries is set to zero.'; + throw error; + } + if (retries && retries >= maxRetries) { const error = new GoogleError( - 'Exceeded maximum number of retries before any ' + - 'response was received' + 'Exceeded maximum number of retries ' + + (originalError ? `retrying error ${originalError} ` : '') + + 'before any response was received' ); error.code = Status.DEADLINE_EXCEEDED; - this.emit('error', error); - this.destroy(); throw error; } } /** - * Error handler for server streaming retries - * @param {CancellableStream} stream - the stream being retried - * @param {RetryOptions} retry - Configures the exceptions upon which the - * function should retry, and the parameters to the exponential backoff retry - * algorithm. - * @param {Error} error - error to handle - */ - streamHandoffErrorHandler( - stream: CancellableStream, - retry: RetryOptions, - error: Error - ): void { - let retryStream = this.stream; - const delayMult = retry.backoffSettings.retryDelayMultiplier; - const maxDelay = retry.backoffSettings.maxRetryDelayMillis; - const timeoutMult = retry.backoffSettings.rpcTimeoutMultiplier; - const maxTimeout = retry.backoffSettings.maxRpcTimeoutMillis; - - let delay = retry.backoffSettings.initialRetryDelayMillis; - let timeout = retry.backoffSettings.initialRpcTimeoutMillis; - let now = new Date(); - let deadline = 0; - - if (retry.backoffSettings.totalTimeoutMillis) { - deadline = now.getTime() + retry.backoffSettings.totalTimeoutMillis; - } - const maxRetries = retry.backoffSettings.maxRetries!; - try { - this.throwIfMaxRetriesOrTotalTimeoutExceeded( - deadline, - maxRetries, - retry.backoffSettings.totalTimeoutMillis! - ); - } catch (error) { - return; - } - - this.retries!++; - if (this.shouldRetryRequest(error, retry)) { - const toSleep = Math.random() * delay; - setTimeout(() => { - now = new Date(); - delay = Math.min(delay * delayMult, maxDelay); - const timeoutCal = timeout && timeoutMult ? timeout * timeoutMult : 0; - const rpcTimeout = maxTimeout ? maxTimeout : 0; - this.prevDeadline = deadline; - const newDeadline = deadline ? deadline - now.getTime() : 0; - timeout = Math.min(timeoutCal, rpcTimeout, newDeadline); - }, toSleep); - } else { - const e = GoogleError.parseGRPCStatusDetails(error); - e.note = - 'Exception occurred in retry method that was ' + - 'not classified as transient'; - // for some reason this error must be emitted here - // instead of the destroy, otherwise the error event - // is swallowed - this.emit('error', e); - this.destroy(); - return; - } - retryStream = this.retry(stream, retry); - this.stream = retryStream; - return; - } - - /** - * Used during server streaming retries to handle - * event forwarding, errors, and/or stream closure - * @param {CancellableStream} stream - the stream that we're doing the retry on - * @param {RetryOptions} retry - Configures the exceptions upon which the - * function should retry, and the parameters to the exponential backoff retry - * algorithm. + * Forwards events from an API request stream to the user's stream. + * @param {Stream} stream - The API request stream. */ - streamHandoffHelper(stream: CancellableStream, retry: RetryOptions): void { - let enteredError = false; - this.eventForwardHelper(stream); - - stream.on('error', error => { - enteredError = true; - this.streamHandoffErrorHandler(stream, retry, error); - }); - - stream.on('data', (data: ResponseType) => { - this.retries = 0; - this.emit.bind(this, 'data')(data); - }); - - stream.on('end', () => { - if (!enteredError) { - enteredError = true; - this.emit('end'); - this.cancel(); - } - }); - } - eventForwardHelper(stream: Stream) { const eventsToForward = ['metadata', 'response', 'status']; eventsToForward.forEach(event => { @@ -303,6 +209,11 @@ export class StreamProxy extends duplexify implements GRPCCallResult { }); } + /** + * Helper function that emits a response on the stream after either a 'metadata' + * or a 'status' event - this helps streams to behave more like http consumers expect + * @param {Stream} stream - The API request stream. + */ statusMetadataHelper(stream: Stream) { // gRPC is guaranteed emit the 'status' event but not 'metadata', and 'status' is the last event to emit. // Emit the 'response' event if stream has no 'metadata' event. @@ -351,6 +262,14 @@ export class StreamProxy extends duplexify implements GRPCCallResult { }); } + /** + * Default mechanism for determining whether a streaming call should retry + * If a user passes in a "shouldRetryFn", this will not be used + * @param {GoogleError} errpr - The error we need to determine is retryable or not + * @param {RetryOptions} retry - Configures the exceptions upon which the + * function should retry, and the parameters to the exponential backoff retry + * algorithm. + */ defaultShouldRetry(error: GoogleError, retry: RetryOptions) { if ( (retry.retryCodes.length > 0 && @@ -362,79 +281,6 @@ export class StreamProxy extends duplexify implements GRPCCallResult { return true; } - /** - * Forward events from an API request stream to the user's stream. - * @param {Stream} stream - The API request stream. - * @param {RetryOptions} retry - Configures the exceptions upon which the - * function eshould retry, and the parameters to the exponential backoff retry - * algorithm. - */ - forwardEventsWithRetries( - stream: CancellableStream, - retry: RetryOptions - ): CancellableStream | undefined { - let retryStream = this.stream; - this.eventForwardHelper(stream); - this.statusMetadataHelper(stream); - - stream.on('error', error => { - const timeout = retry.backoffSettings.totalTimeoutMillis; - const maxRetries = retry.backoffSettings.maxRetries!; - if ((maxRetries && maxRetries > 0) || (timeout && timeout > 0)) { - if (this.shouldRetryRequest(error, retry)) { - if (maxRetries && timeout!) { - const newError = new GoogleError( - 'Cannot set both totalTimeoutMillis and maxRetries ' + - 'in backoffSettings.' - ); - newError.code = Status.INVALID_ARGUMENT; - this.emit('error', newError); - this.destroy(); - return; //end chunk - } else { - this.retries!++; - retryStream = this.retry(stream, retry); - this.stream = retryStream; - return retryStream; - } - } else { - const e = GoogleError.parseGRPCStatusDetails(error); - e.note = - 'Exception occurred in retry method that was ' + - 'not classified as transient'; - this.destroy(e); - return; // end chunk - } - } else { - if (maxRetries === 0) { - const e = GoogleError.parseGRPCStatusDetails(error); - e.note = 'Max retries is set to zero.'; - this.destroy(e); - return; // end chunk - } - return GoogleError.parseGRPCStatusDetails(error); - } - }); - return retryStream; - } - - /** - * Resets the target stream as part of the retry process - * @param {CancellableStream} requestStream - the stream to end - */ - resetStreams(requestStream: CancellableStream) { - if (requestStream) { - requestStream.cancel && requestStream.cancel(); - if (requestStream.destroy) { - requestStream.destroy(); - } else if (requestStream.end) { - // TODO: not used in server streaming, but likely needed - // if we want to add BIDI or client side streaming - requestStream.end(); - } - } - } - /** * Specifies the target stream. * @param {ApiCall} apiCall - the API function to be called. @@ -458,25 +304,20 @@ export class StreamProxy extends duplexify implements GRPCCallResult { this.stream = stream; this.setReadable(stream); } else if (this.gaxServerStreamingRetries) { - const retryStream = streamingRetryRequest({ - request: () => { - if (this._isCancelCalled) { - if (this.stream) { - this.stream.cancel(); - } - return; + const request = () => { + if (this._isCancelCalled) { + if (this.stream) { + this.stream.cancel(); } - const stream = apiCall( - argument, - this._callback - ) as CancellableStream; - this.stream = stream; - this.stream = this.forwardEventsWithRetries(stream, retry); - return this.stream; - }, - }); - - this.setReadable(retryStream); + return; + } + const stream = apiCall(argument, this._callback) as CancellableStream; + return stream; + }; + const retryStream = this.newStreamingRetryRequest({request, retry}); + this.stream = retryStream as unknown as CancellableStream; + this.eventForwardHelper(retryStream); + this.setReadable(retryStream!); } else { const retryStream = retryRequest(null, { objectMode: true, @@ -522,4 +363,228 @@ export class StreamProxy extends duplexify implements GRPCCallResult { this.stream.cancel(); } } + + /** + * Creates a new retry request stream - + *inner arrow function "newMakeRequest" handles retrying and resumption + * @param {streamingRetryRequestOptions} opts + * {request} - the request to be made if the stream errors + * {retry} - the retry options associated with the call + * @returns {CancellableStream} - the stream that handles retry logic + */ + private newStreamingRetryRequest( + opts: streamingRetryRequestOptions + ): CancellableStream { + // at this point, it would be unexpected if retry were undefined + // but if it is, provide a logical default so we don't run into trouble + const retry = opts.retry ?? { + retryCodes: [], + backoffSettings: createDefaultBackoffSettings(), + }; + let retries = 0; + const retryStream = new PassThrough({ + objectMode: true, + }) as unknown as CancellableStream; + + const totalTimeout = retry.backoffSettings.totalTimeoutMillis ?? undefined; + const maxRetries = retry.backoffSettings.maxRetries ?? undefined; + let timeout = retry.backoffSettings.initialRpcTimeoutMillis ?? undefined; + + let now = new Date(); + let deadline = 0; + if (totalTimeout) { + deadline = now.getTime() + totalTimeout; + } + const transientErrorHelper = ( + error: Error, + requestStream: CancellableStream + ) => { + const e = GoogleError.parseGRPCStatusDetails(error); + e.note = + 'Exception occurred in retry method that was ' + + 'not classified as transient'; + // clean up the request stream and retryStreams, silently destroy it on the request stream + // but do raise it on destructin of the retryStream so the consumer can see it + requestStream.destroy(); + retryStream.destroy(e); + + return retryStream; + }; + const newMakeRequest = (newopts: streamingRetryRequestOptions) => { + let dataEnd = false; + let statusReceived = false; + + let enteredError = false; + + // make the request + const requestStream = newopts.request!(requestOps); + retryStream.cancel = requestStream.cancel; // make sure the retryStream is also cancellable by the user + + const eventsToForward = ['metadata', 'response', 'status']; + eventsToForward.forEach(event => { + requestStream.on(event, retryStream.emit.bind(retryStream, event)); + }); + this.statusMetadataHelper(requestStream); + + // TODO - b/353262542 address buffer stuff + requestStream.on('data', (data: ResponseType) => { + retries = 0; + this.emit.bind(this, 'data')(data); + }); + + /* in retry-request, which previously handled retries, + * "end" could be emitted on a request stream before other gRPC events. + * To ensure it doesn't reach the consumer stream prematurely, retry-request piped + * two streams together (delayStream and retryStream) + * to ensure that "end" only emitted after a "response" event + * + * We are consciously NOT using pipeline or .pipe as part of similar logic here + * because we want more control over what happens during event handoff and we want to + * avoid the undesired behavior that can happen with error events + * if consumers in client libraries are also using pipes + * + * Since "status" is guaranteed to be the last event emitted by gRPC. + * If we have seen an "end" event, the dataEnd boolean will be true and we can safely + * end the stream. + * + * The "statusReceived" boolean covers the opposite case - that we receive the "status" event before + * a successful stream end event - this signals the .on('end') event handler that it's okay to end the stream + * + * + */ + requestStream.on('status', () => { + statusReceived = true; + if (dataEnd) { + retryStream.end(); + } + return retryStream; + }); + requestStream.on('end', () => { + if (!enteredError) { + dataEnd = true; + + // in this case, we've already received "status" + // which is the last event from gRPC, so it's cool to end the stream + if (statusReceived) { + retryStream.end(); + } + } + return retryStream; + + // there is no else case because if enteredError + // is true, we will handle stream destruction as part of + // either retrying (where we don't want to end the stream) + // or as part of error handling, which will take care of stream destruction + }); + requestStream.on('error', (error: Error) => { + enteredError = true; + + // type check for undefined instead of for truthiness in case maxRetries or timeout is equal to zero + if ( + typeof maxRetries !== undefined || + typeof totalTimeout !== undefined + ) { + if (this.shouldRetryRequest(error, retry)) { + if (maxRetries && totalTimeout) { + const newError = new GoogleError( + 'Cannot set both totalTimeoutMillis and maxRetries ' + + 'in backoffSettings.' + ); + newError.code = Status.INVALID_ARGUMENT; + // clean up the request stream and retryStreams, silently destroy it on the request stream + // but do raise it on destructin of the retryStream so the consumer can see it + requestStream.destroy(); + retryStream.destroy(newError); + + return retryStream; + } else { + // check for exceeding timeout or max retries + + try { + this.throwIfMaxRetriesOrTotalTimeoutExceeded( + deadline, + maxRetries!, + timeout!, + error, + totalTimeout, + retries + ); + } catch (error: unknown) { + const e = GoogleError.parseGRPCStatusDetails( + error as GoogleError + ); + // clean up the request stream and retryStreams, silently destroy it on the request stream + // but do raise it on destruction of the retryStream so the consumer can see it + requestStream.destroy(); + retryStream.destroy(e); + + return retryStream; + } + + const delayMult = retry.backoffSettings.retryDelayMultiplier; + const maxDelay = retry.backoffSettings.maxRetryDelayMillis; + const timeoutMult = retry.backoffSettings.rpcTimeoutMultiplier; + const maxTimeout = retry.backoffSettings.maxRpcTimeoutMillis; + let delay = retry.backoffSettings.initialRetryDelayMillis; + // calculate new deadlines + const toSleep = Math.random() * delay; + const calculateTimeoutAndResumptionFunction = () => { + setTimeout(() => { + // only do timeout calculations if not using maxRetries + if (timeout) { + now = new Date(); + delay = Math.min(delay * delayMult, maxDelay); + const timeoutCal = + timeout && timeoutMult ? timeout * timeoutMult : 0; + const rpcTimeout = maxTimeout ? maxTimeout : 0; + const newDeadline = deadline ? deadline - now.getTime() : 0; + timeout = Math.min(timeoutCal, rpcTimeout, newDeadline); + } + + retries++; + let retryArgument = this.argument! as RequestType; + // if resumption logic is passed, use it to determined the + // new argument for the new request made to the server + // otherwise, the original argument will be passed + if (retry.getResumptionRequestFn !== undefined) { + retryArgument = retry.getResumptionRequestFn(retryArgument); + } + const newRequest = () => { + if (this._isCancelCalled) { + if (this.stream) { + this.stream.cancel(); + } + return; + } + const newStream = this.apiCall!( + retryArgument, + this._callback + ) as CancellableStream; + return newStream; + }; + opts.request = newRequest; + + // make a request with the updated parameters + // based on the resumption strategy + return newMakeRequest(opts); + }, toSleep); + }; + return calculateTimeoutAndResumptionFunction(); + } + } else { + // non retryable error + return transientErrorHelper(error, requestStream); + } + } else { + // neither timeout nor maxRetries are defined, surface the error to the caller + return transientErrorHelper(error, requestStream); + } + }); + // return the stream if we didn't return it as + // part of an error state + return retryStream; + }; + // this is the first make request call with the options the user passed in + return newMakeRequest(opts); + } } diff --git a/gax/src/streamingRetryRequest.ts b/gax/src/streamingRetryRequest.ts deleted file mode 100644 index 83414f7b7..000000000 --- a/gax/src/streamingRetryRequest.ts +++ /dev/null @@ -1,110 +0,0 @@ -// Copyright 2023 Google LLC - -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at - -// https://www.apache.org/licenses/LICENSE-2.0 - -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -const {PassThrough} = require('stream'); -import {GoogleError} from './googleError'; -import {ResponseType} from './apitypes'; -import {StreamProxy} from './streamingCalls/streaming'; - -const DEFAULTS = { - /* - Max # of retries - */ - maxRetries: 2, -}; -// In retry-request, you could pass parameters to request using the requestOpts parameter -// when we called retry-request from gax, we always passed null -// passing null here removes an unnecessary parameter from this implementation -const requestOps = null; -const objectMode = true; // we don't support objectMode being false - -interface streamingRetryRequestOptions { - request: Function; - maxRetries?: number; -} -/** - * Localized adaptation derived from retry-request - * @param opts - corresponds to https://github.com/googleapis/retry-request#opts-optional - * @returns - */ -export function streamingRetryRequest(opts: streamingRetryRequestOptions) { - opts = Object.assign({}, DEFAULTS, opts); - if (opts.request === undefined) { - throw new Error('A request function must be provided'); - } - - let numNoResponseAttempts = 0; - let streamResponseHandled = false; - - let requestStream: StreamProxy; - let delayStream: StreamProxy; - - const retryStream = new PassThrough({objectMode: objectMode}); - - makeRequest(); - return retryStream; - - function makeRequest() { - streamResponseHandled = false; - - delayStream = new PassThrough({objectMode: objectMode}); - requestStream = opts.request!(requestOps); - - requestStream - // gRPC via google-cloud-node can emit an `error` as well as a `response` - // Whichever it emits, we run with-- we can't run with both. That's what - // is up with the `streamResponseHandled` tracking. - .on('error', (err: GoogleError) => { - if (streamResponseHandled) { - return; - } - streamResponseHandled = true; - onResponse(err); - }) - .on('response', (resp: ResponseType) => { - if (streamResponseHandled) { - return; - } - - streamResponseHandled = true; - onResponse(null, resp); - }); - requestStream.pipe(delayStream); - } - - function onResponse(err: GoogleError | null, response: ResponseType = null) { - // An error such as DNS resolution. - if (err) { - numNoResponseAttempts++; - - if (numNoResponseAttempts <= opts.maxRetries!) { - makeRequest(); - } else { - retryStream.emit('error', err); - } - - return; - } - - // No more attempts need to be made, just continue on. - retryStream.emit('response', response); - delayStream.pipe(retryStream); - requestStream.on('error', () => { - // retryStream must be destroyed here for the stream handoff part of retries to function properly - // but the error event should not be passed - if it emits as part of .destroy() - // it will bubble up early to the caller - retryStream.destroy(); - }); - } -} diff --git a/gax/test/test-application/package.json b/gax/test/test-application/package.json index 3fc47060c..dbef3840c 100644 --- a/gax/test/test-application/package.json +++ b/gax/test/test-application/package.json @@ -26,9 +26,10 @@ "dependencies": { "@grpc/grpc-js": "~1.6.0", "google-gax": "file:./google-gax.tgz", + "protobufjs": "^7.0.0", + "pumpify": "^2.0.1", "showcase-echo-client": "./showcase-echo-client.tgz", - "showcase-server": "./showcase-server.tgz", - "protobufjs": "^7.0.0" + "showcase-server": "./showcase-server.tgz" }, "engines": { "node": ">=14.0.0" diff --git a/gax/test/test-application/src/index.ts b/gax/test/test-application/src/index.ts index 5a036f74f..19917d74b 100644 --- a/gax/test/test-application/src/index.ts +++ b/gax/test/test-application/src/index.ts @@ -17,7 +17,6 @@ 'use strict'; import {EchoClient, SequenceServiceClient, protos} from 'showcase-echo-client'; import {ShowcaseServer} from 'showcase-server'; - import * as assert from 'assert'; import {promises as fsp} from 'fs'; import * as path from 'path'; @@ -32,7 +31,8 @@ import { RetryOptions, } from 'google-gax'; import {RequestType} from 'google-gax/build/src/apitypes'; - +import {Duplex, PassThrough, pipeline} from 'stream'; +const pumpify = require('pumpify'); async function testShowcase() { const grpcClientOpts = { grpc, @@ -45,11 +45,6 @@ async function testShowcase() { gaxServerStreamingRetries: true, }; - const grpcClientOptsWithRetries = { - grpc, - sslCreds: grpc.credentials.createInsecure(), - }; - const fakeGoogleAuth = { getClient: async () => { return { @@ -83,16 +78,16 @@ async function testShowcase() { const grpcSequenceClientWithServerStreamingRetries = new SequenceServiceClient(grpcClientOptsWithServerStreamingRetries); - const grpcSequenceClientWithRetries = new SequenceServiceClient( - grpcClientOptsWithRetries + const grpcSequenceClientLegacyRetries = new SequenceServiceClient( + grpcClientOpts ); const restClient = new EchoClient(restClientOpts); const restClientCompat = new EchoClient(restClientOptsCompat); // assuming gRPC server is started locally - await testEchoErrorWithRetries(grpcSequenceClientWithRetries); - await testEchoErrorWithTimeout(grpcSequenceClientWithRetries); + await testEchoErrorWithRetries(grpcSequenceClientLegacyRetries); + await testEchoErrorWithTimeout(grpcSequenceClientLegacyRetries); await testEcho(grpcClient); await testEchoError(grpcClient); await testExpand(grpcClient); @@ -117,8 +112,8 @@ async function testShowcase() { await testCollectThrows(restClientCompat); // REGAPIC does not support client streaming await testChatThrows(restClientCompat); // REGAPIC does not support bidi streaming await testWait(restClientCompat); - // Testing with gaxServerStreamingRetries being true + // Testing with gaxServerStreamingRetries being true await testServerStreamingRetryOptions( grpcSequenceClientWithServerStreamingRetries ); @@ -126,7 +121,6 @@ async function testShowcase() { await testServerStreamingRetriesWithShouldRetryFn( grpcSequenceClientWithServerStreamingRetries ); - await testServerStreamingRetrieswithRetryOptions( grpcSequenceClientWithServerStreamingRetries ); @@ -139,10 +133,6 @@ async function testShowcase() { grpcSequenceClientWithServerStreamingRetries ); - await testServerStreamingRetrieswithRetryRequestOptionsErrorsOnBadResumptionStrategy( - grpcSequenceClientWithServerStreamingRetries - ); - await testServerStreamingThrowsClassifiedTransientErrorNote( grpcSequenceClientWithServerStreamingRetries ); @@ -159,6 +149,10 @@ async function testShowcase() { grpcSequenceClientWithServerStreamingRetries ); + await testShouldTimeoutWithNewRetries( + grpcSequenceClientWithServerStreamingRetries + ); + await testErrorMaxRetries0(grpcSequenceClientWithServerStreamingRetries); await testServerStreamingRetriesImmediatelywithRetryOptions( @@ -176,6 +170,72 @@ async function testShowcase() { await testCollect(grpcClientWithServerStreamingRetries); await testChat(grpcClientWithServerStreamingRetries); await testWait(grpcClientWithServerStreamingRetries); + + /* Series of tests that validate behavior of gax behavior with stream pipelines */ + + /* NO BUFFERING YES GAX NATIVE RETRIES + This section has pipelines of streams but no data buffering + and tests them against gax clients that DO utilize gax native retries + some scenarios may not actually involve retrying */ + await testStreamingErrorAfterDataNoBufferNoRetry( + grpcSequenceClientWithServerStreamingRetries + ); + + // the next few tests explicitly use the pumpify library + // which does not throw an error if a stream in the pipeline is destroyed + // but does sever the connection. This library is being used because at least one of + // our client libraries uses it + + await testImmediateStreamingErrorNoBufferPumpify( + grpcSequenceClientWithServerStreamingRetries + ); + + await testStreamingPipelineSucceedsAfterDataNoBufferNoRetryPumpify( + grpcSequenceClientWithServerStreamingRetries + ); + + await testStreamingPipelineErrorAfterDataNoBufferNoRetryPumpify( + grpcSequenceClientWithServerStreamingRetries + ); + + await testImmediateStreamingErrorNoBufferYesRetryRequestRetryPumpify( + grpcSequenceClientWithServerStreamingRetries + ); + + await testImmediateStreamingErrorThenSucceedsNoBufferYesRetryPumpify( + grpcSequenceClientWithServerStreamingRetries + ); + + await testStreamingPipelineErrorAfterDataNoBufferYesRetryPumpify( + grpcSequenceClientWithServerStreamingRetries + ); + + // this series of tests uses the node native "pipeline" instead of pumpify + // which unlike pumpify, WILL error if any stream in the pipeline is destroye + + await testImmediateStreamingErrorNoBufferPipeline( + grpcSequenceClientWithServerStreamingRetries + ); + + await testStreamingPipelineSucceedsAfterDataNoBufferNoRetryPipeline( + grpcSequenceClientWithServerStreamingRetries + ); + + await testStreamingPipelineErrorAfterDataNoBufferNoRetryPipeline( + grpcSequenceClientWithServerStreamingRetries + ); + + await testImmediateStreamingErrorNoBufferYesRetryRequestRetryPipeline( + grpcSequenceClientWithServerStreamingRetries + ); + + await testImmediateStreamingErrorThenSucceedsNoBufferYesRetryPipeline( + grpcSequenceClientWithServerStreamingRetries + ); + + await testStreamingPipelineErrorAfterDataNoBufferYesRetryPipeline( + grpcSequenceClientWithServerStreamingRetries + ); } function createStreamingSequenceRequestFactory( @@ -419,12 +479,16 @@ async function testExpand(client: EchoClient) { }; const stream = client.expand(request); const result: string[] = []; + stream.on('data', (response: {content: string}) => { result.push(response.content); }); stream.on('end', () => { assert.deepStrictEqual(words, result); }); + stream.on('error', error => { + throw new Error('testExpand error' + error.message); + }); } async function testPagedExpand(client: EchoClient) { @@ -643,27 +707,32 @@ async function testServerStreamingRetryOptions(client: SequenceServiceClient) { const response = await client.createStreamingSequence(request); const sequence = response[0]; - - const attemptRequest = - new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); - attemptRequest.name = sequence.name!; - - const attemptStream = client.attemptStreamingSequence( - attemptRequest, - settings - ); - attemptStream.on('data', (response: {content: string}) => { - finalData.push(response.content); - }); - attemptStream.on('error', () => { - throw new Error('testServerStreamingRetryOptions problem'); - }); - attemptStream.on('end', () => { + // wrap in a promise to ensure we wait to stop server + await new Promise((resolve, reject) => { + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; + + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings + ); + attemptStream.on('data', (response: {content: string}) => { + finalData.push(response.content); + }); + attemptStream.on('error', error => { + attemptStream.destroy(); + reject('testServerStreamingRetryOptions problem' + error); + }); + attemptStream.on('end', () => { + attemptStream.end(); + resolve(); + }); + }).then(() => { assert.equal( finalData.join(' '), 'This is testing the brand new and shiny StreamingSequence server 3' ); - attemptStream.end(); }); } @@ -698,28 +767,33 @@ async function testServerStreamingRetrieswithRetryOptions( ); const response = await client.createStreamingSequence(request); - const sequence = response[0]; + // wrap in a promise to ensure we wait to stop server + await new Promise((resolve, reject) => { + const sequence = response[0]; - const attemptRequest = - new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); - attemptRequest.name = sequence.name!; + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; - const attemptStream = client.attemptStreamingSequence( - attemptRequest, - settings - ); - attemptStream.on('data', (response: {content: string}) => { - finalData.push(response.content); - }); - attemptStream.on('error', () => { - throw new Error('testServerStreamingRetrieswithRetryOptions problem'); - }); - attemptStream.on('end', () => { + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings + ); + attemptStream.on('data', (response: {content: string}) => { + finalData.push(response.content); + }); + attemptStream.on('error', () => { + reject('testServerStreamingRetrieswithRetryOptions problem'); + }); + attemptStream.on('end', () => { + attemptStream.end(); + resolve(); + }); + }).then(() => { assert.equal( finalData.join(' '), 'This This is This is testing the brand new and shiny StreamingSequence server 3' ); - attemptStream.end(); }); } @@ -758,27 +832,32 @@ async function testServerStreamingRetriesWithShouldRetryFn( ); const response = await client.createStreamingSequence(request); - const sequence = response[0]; - const attemptRequest = - new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); - attemptRequest.name = sequence.name!; - - const attemptStream = client.attemptStreamingSequence( - attemptRequest, - settings - ); - attemptStream.on('data', (response: {content: string}) => { - finalData.push(response.content); - }); - attemptStream.on('error', () => { - throw new Error('testServerStreamingRetriesWithShouldRetryFn problem'); - }); - attemptStream.on('end', () => { + // wrap in a promise to ensure we wait to stop server + await new Promise((resolve, reject) => { + const sequence = response[0]; + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; + + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings + ); + attemptStream.on('data', (response: {content: string}) => { + finalData.push(response.content); + }); + attemptStream.on('error', (error: Error) => { + reject('testServerStreamingRetriesWithShouldRetryFn problem' + error); + }); + attemptStream.on('end', () => { + attemptStream.end(); + resolve(); + }); + }).then(() => { assert.equal( finalData.join(' '), 'This This is This is testing the brand new and shiny StreamingSequence server 3' ); - attemptStream.end(); }); } @@ -814,30 +893,33 @@ async function testServerStreamingRetrieswithRetryRequestOptions( ); const response = await client.createStreamingSequence(request); - const sequence = response[0]; + // wrap in a promise to ensure we wait to stop server + await new Promise((resolve, reject) => { + const sequence = response[0]; - const attemptRequest = - new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); - attemptRequest.name = sequence.name!; + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; - const attemptStream = client.attemptStreamingSequence( - attemptRequest, - settings - ); - attemptStream.on('data', (response: {content: string}) => { - finalData.push(response.content); - }); - attemptStream.on('error', () => { - throw new Error( - 'testServerStreamingRetrieswithRetryRequestOptions problem' + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings ); - }); - attemptStream.on('end', () => { + attemptStream.on('data', (response: {content: string}) => { + finalData.push(response.content); + }); + attemptStream.on('error', () => { + reject('testServerStreamingRetrieswithRetryRequestOptions problem'); + }); + attemptStream.on('end', () => { + attemptStream.end(); + resolve(); + }); + }).then(() => { assert.equal( finalData.join(' '), 'This This is This is testing the brand new and shiny StreamingSequence server 3' ); - attemptStream.end(); }); } @@ -889,28 +971,36 @@ async function testResetRetriesToZero(client: SequenceServiceClient) { 'This is testing the brand new and shiny StreamingSequence server 3' ); const response = await client.createStreamingSequence(request); - const sequence = response[0]; + // wrap in a promise to ensure we wait to stop server + await new Promise((resolve, reject) => { + const sequence = response[0]; - const attemptRequest = - new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); - attemptRequest.name = sequence.name!; + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; - const attemptStream = client.attemptStreamingSequence( - attemptRequest, - settings - ); - attemptStream.on('data', (response: {content: string}) => { - finalData.push(response.content); - }); - attemptStream.on('error', () => { - throw new Error('testResetRetriesToZero should not receive an error event'); - }); - attemptStream.on('end', () => { + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings + ); + attemptStream.on('data', (response: {content: string}) => { + finalData.push(response.content); + }); + attemptStream.on('error', () => { + reject('testResetRetriesToZero should not receive an error event'); + }); + attemptStream.on('end', () => { + attemptStream.end(); + resolve(); + }); + attemptStream.on('close', () => { + reject('testResetRetriesToZero closed on an error'); + }); + }).then(() => { assert.deepStrictEqual( finalData.join(' '), 'This This is This is testing This is testing the This is testing the brand' ); - attemptStream.end(); }); } @@ -948,67 +1038,51 @@ async function testShouldFailOnThirdError(client: SequenceServiceClient) { 'This is testing the brand new and shiny StreamingSequence server 3' ); const response = await client.createStreamingSequence(request); - const sequence = response[0]; + await new Promise((resolve, reject) => { + const sequence = response[0]; - const attemptRequest = - new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); - attemptRequest.name = sequence.name!; + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; - const attemptStream = client.attemptStreamingSequence( - attemptRequest, - settings - ); - attemptStream.on('data', () => { - throw new Error('The stream should not receive any data'); - }); - attemptStream.on('error', (error: GoogleError) => { - assert.strictEqual(error.code, 4); - assert.strictEqual( - error.message, - 'Exceeded maximum number of retries before any response was received' - ); - }); - attemptStream.on('end', () => { - throw new Error( - 'testShouldFailOnThirdError finished before it received an error' + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings ); - }); - attemptStream.on('close', () => { - attemptStream.end(); + attemptStream.on('data', () => { + reject('The stream should not receive any data'); + }); + attemptStream.on('error', (error: GoogleError) => { + assert.strictEqual(error.code, 4); + assert.strictEqual( + error.message, + 'Exceeded maximum number of retries retrying error Error: 6 ALREADY_EXISTS: 6 before any response was received' + ); + }); + attemptStream.on('end', () => { + reject('testShouldFailOnThirdError finished before it received an error'); + }); + attemptStream.on('close', () => { + attemptStream.end(); + resolve(); + }); }); } -// streaming call that retries twice with RetryRequestOptions and resumes from where it left off -async function testServerStreamingRetrieswithRetryRequestOptionsResumptionStrategy( - client: SequenceServiceClient -) { - const finalData: string[] = []; - const shouldRetryFn = (error: GoogleError) => { - return [4, 14].includes(error!.code!); - }; +// When we set a low timeout we should throw and error that says +// the timeout has been exceeded +async function testShouldTimeoutWithNewRetries(client: SequenceServiceClient) { const backoffSettings = createBackoffSettings( - 10000, - 2.5, + 100, + 1.2, 1000, null, 1.5, 3000, - 600000 - ); - const getResumptionRequestFn = (request: RequestType) => { - const newRequest = - new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest() as unknown as RequestType; - newRequest.name = request.name; - newRequest.lastFailIndex = 5; - return newRequest as unknown as RequestType; - }; - - const retryOptions = new RetryOptions( - [], - backoffSettings, - shouldRetryFn, - getResumptionRequestFn + 2 // silly low timeout ); + const allowedCodes = [4, 5, 6]; + const retryOptions = new RetryOptions(allowedCodes, backoffSettings); const settings = { retry: retryOptions, @@ -1017,43 +1091,62 @@ async function testServerStreamingRetrieswithRetryRequestOptionsResumptionStrate client.initialize(); const request = createStreamingSequenceRequestFactory( - [Status.UNAVAILABLE, Status.DEADLINE_EXCEEDED, Status.OK], - [0.1, 0.1, 0.1], - [1, 2, 11], + [ + Status.ALREADY_EXISTS, // Error code 6 + Status.NOT_FOUND, // Error code 5 + Status.DEADLINE_EXCEEDED, // Error code 4 + Status.OK, + ], + [0.1, 0.1, 0.1, 0.1], + [0, 0, 0, 1], 'This is testing the brand new and shiny StreamingSequence server 3' ); const response = await client.createStreamingSequence(request); - const sequence = response[0]; + await new Promise((resolve, reject) => { + const sequence = response[0]; - const attemptRequest = - new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); - attemptRequest.name = sequence.name!; + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; - const attemptStream = client.attemptStreamingSequence( - attemptRequest, - settings - ); - attemptStream.on('data', (response: {content: string}) => { - finalData.push(response.content); - }); - attemptStream.on('error', () => { - throw new Error( - 'testServerStreamingRetrieswithRetryRequestOptionsResumptionStrategy problem' - ); - }); - attemptStream.on('end', () => { - assert.deepStrictEqual( - finalData.join(' '), - 'This new and new and shiny StreamingSequence server 3' + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings ); - attemptStream.end(); + attemptStream.on('data', () => { + reject('The stream should not receive any data'); + }); + attemptStream.on('error', (error: GoogleError) => { + assert.strictEqual(error.code, 4); + try { + assert.strictEqual( + error.message, + 'Total timeout of API exceeded 2 milliseconds retrying error Error: 6 ALREADY_EXISTS: 6 before any response was received.' + ); + } catch (AssertionError) { + assert.strictEqual( + error.message, + 'Total timeout of API exceeded 2 milliseconds retrying error Error: 5 NOT_FOUND: 5 before any response was received.' + ); + } + }); + attemptStream.on('end', () => { + reject( + 'testShouldTimeoutWithNewRetries finished before it received an error' + ); + }); + attemptStream.on('close', () => { + attemptStream.end(); + resolve(); + }); }); } -// retries twice but fails with an error not from the streaming sequence -async function testServerStreamingRetrieswithRetryRequestOptionsErrorsOnBadResumptionStrategy( +// streaming call that retries twice with RetryRequestOptions and resumes from where it left off +async function testServerStreamingRetrieswithRetryRequestOptionsResumptionStrategy( client: SequenceServiceClient ) { + const finalData: string[] = []; const shouldRetryFn = (error: GoogleError) => { return [4, 14].includes(error!.code!); }; @@ -1066,9 +1159,12 @@ async function testServerStreamingRetrieswithRetryRequestOptionsErrorsOnBadResum 3000, 600000 ); - const getResumptionRequestFn = () => { - // return a bad resumption strategy - return {}; + const getResumptionRequestFn = (request: RequestType) => { + const newRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest() as unknown as RequestType; + newRequest.name = request.name; + newRequest.lastFailIndex = 5; + return newRequest as unknown as RequestType; }; const retryOptions = new RetryOptions( @@ -1091,27 +1187,34 @@ async function testServerStreamingRetrieswithRetryRequestOptionsErrorsOnBadResum 'This is testing the brand new and shiny StreamingSequence server 3' ); const response = await client.createStreamingSequence(request); - const sequence = response[0]; + // wrap in a promise to ensure we wait to stop server + await new Promise((resolve, reject) => { + const sequence = response[0]; - const attemptRequest = - new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); - attemptRequest.name = sequence.name!; - - const attemptStream = client.attemptStreamingSequence( - attemptRequest, - settings - ); + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; - attemptStream.on('error', (e: GoogleError) => { - assert.strictEqual(e.code, 3); - assert.match(e.note!, /not classified as transient/); - }); - attemptStream.on('close', () => { - attemptStream.end(); - }); - attemptStream.on('end', () => { - throw new Error( - 'testServerStreamingRetrieswithRetryRequestOptionsErrorsOnBadResumptionStrategy ended cleanly and did not error' + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings + ); + attemptStream.on('data', (response: {content: string}) => { + finalData.push(response.content); + }); + attemptStream.on('error', () => { + reject( + 'testServerStreamingRetrieswithRetryRequestOptionsResumptionStrategy problem' + ); + }); + attemptStream.on('end', () => { + attemptStream.end(); + resolve(); + }); + }).then(() => { + assert.deepStrictEqual( + finalData.join(' '), + 'This new and new and shiny StreamingSequence server 3' ); }); } @@ -1120,6 +1223,7 @@ async function testServerStreamingRetrieswithRetryRequestOptionsErrorsOnBadResum async function testServerStreamingThrowsClassifiedTransientErrorNote( client: SequenceServiceClient ) { + const finalData: string[] = []; const backoffSettings = createBackoffSettings( 100, 1.2, @@ -1146,27 +1250,37 @@ async function testServerStreamingThrowsClassifiedTransientErrorNote( ); const response = await client.createStreamingSequence(request); - const sequence = response[0]; + // wrap in a promise to ensure we wait to stop server + await new Promise((resolve, reject) => { + const sequence = response[0]; - const attemptRequest = - new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); - attemptRequest.name = sequence.name!; + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; - const attemptStream = client.attemptStreamingSequence( - attemptRequest, - settings - ); - attemptStream.on('error', (e: GoogleError) => { - assert.strictEqual(e.code, 14); - assert.match(e.note!, /not classified as transient/); - }); - attemptStream.on('close', () => { - attemptStream.end(); - }); - attemptStream.on('end', () => { - throw new Error( - 'testServerStreamingThrowsClassifiedTransientErrorNote ended cleanly without an error' + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings ); + attemptStream.on('data', (data: {content: string}) => { + finalData.push(data.content); + }); + attemptStream.on('error', (e: GoogleError) => { + assert.strictEqual(e.code, 14); + assert.match(e.note!, /not classified as transient/); + finalData.join(' '); + }); + attemptStream.on('close', () => { + attemptStream.end(); + resolve(); + }); + attemptStream.on('end', () => { + reject( + 'testServerStreamingThrowsClassifiedTransientErrorNote ended cleanly without an error' + ); + }); + }).then(() => { + assert.equal(finalData, 'This'); }); } @@ -1200,27 +1314,31 @@ async function testServerStreamingRetriesAndThrowsClassifiedTransientErrorNote( ); const response = await client.createStreamingSequence(request); - const sequence = response[0]; + // wrap in a promise to ensure we wait to stop server + await new Promise((resolve, reject) => { + const sequence = response[0]; - const attemptRequest = - new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); - attemptRequest.name = sequence.name!; + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; - const attemptStream = client.attemptStreamingSequence( - attemptRequest, - settings - ); - attemptStream.on('error', (e: GoogleError) => { - assert.strictEqual(e.code, 4); - assert.match(e.note!, /not classified as transient/); - }); - attemptStream.on('close', () => { - attemptStream.end(); - }); - attemptStream.on('end', () => { - throw new Error( - 'testServerStreamingRetriesAndThrowsClassifiedTransientErrorNote ended cleanly without throwing an error' + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings ); + attemptStream.on('error', (e: GoogleError) => { + assert.strictEqual(e.code, 4); + assert.match(e.note!, /not classified as transient/); + }); + attemptStream.on('close', () => { + attemptStream.end(); + resolve(); + }); + attemptStream.on('end', () => { + reject( + 'testServerStreamingRetriesAndThrowsClassifiedTransientErrorNote ended cleanly without throwing an error' + ); + }); }); } @@ -1254,30 +1372,34 @@ async function testServerStreamingThrowsCannotSetTotalTimeoutMillisMaxRetries( ); const response = await client.createStreamingSequence(request); - const sequence = response[0]; + // wrap in a promise to ensure we wait to stop server + await new Promise((resolve, reject) => { + const sequence = response[0]; - const attemptRequest = - new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); - attemptRequest.name = sequence.name!; + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; - const attemptStream = client.attemptStreamingSequence( - attemptRequest, - settings - ); - attemptStream.on('error', (e: GoogleError) => { - assert.strictEqual(e.code, 3); - assert.match( - e.message, - /Cannot set both totalTimeoutMillis and maxRetries/ - ); - }); - attemptStream.on('close', () => { - attemptStream.end(); - }); - attemptStream.on('end', () => { - throw new Error( - 'testServerStreamingThrowsCannotSetTotalTimeoutMillisMaxRetries ended cleanly without an error' + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings ); + attemptStream.on('error', (e: GoogleError) => { + assert.strictEqual(e.code, 3); + assert.match( + e.message, + /Cannot set both totalTimeoutMillis and maxRetries/ + ); + }); + attemptStream.on('close', () => { + attemptStream.end(); + resolve(); + }); + attemptStream.on('end', () => { + reject( + 'testServerStreamingThrowsCannotSetTotalTimeoutMillisMaxRetries ended cleanly without an error' + ); + }); }); } @@ -1320,31 +1442,34 @@ async function testErrorMaxRetries0(client: SequenceServiceClient) { 'This is testing the brand new and shiny StreamingSequence server 3' ); const response = await client.createStreamingSequence(request); - const sequence = response[0]; - - const attemptRequest = - new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); - attemptRequest.name = sequence.name!; - const attemptStream = client.attemptStreamingSequence( - attemptRequest, - settings - ); - attemptStream.on('data', () => { - throw new Error('The stream should not receive any data'); - }); - attemptStream.on('error', (error: GoogleError) => { - assert.strictEqual(error.code, 4); - assert.strictEqual(error.note, 'Max retries is set to zero.'); - }); - attemptStream.on('end', () => { - throw new Error( - 'testErrorMaxRetries0 should not end before it receives an error' + // wrap in a promise to ensure we wait to stop server + await new Promise((resolve, reject) => { + const sequence = response[0]; + + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings ); - }); - attemptStream.on('close', () => { - attemptStream.end(); + attemptStream.on('data', () => { + reject('The stream should not receive any data'); + }); + attemptStream.on('error', (error: GoogleError) => { + assert.strictEqual(error.code, 4); + assert.strictEqual(error.note, 'Max retries is set to zero.'); + }); + attemptStream.on('end', () => { + reject('testErrorMaxRetries0 should not end before it receives an error'); + }); + attemptStream.on('close', () => { + attemptStream.end(); + resolve(); + }); }); } + // a streaming call that retries two times and finishes successfully async function testServerStreamingRetriesImmediatelywithRetryOptions( client: SequenceServiceClient @@ -1378,30 +1503,1288 @@ async function testServerStreamingRetriesImmediatelywithRetryOptions( ); const response = await client.createStreamingSequence(request); - const sequence = response[0]; + // wrap in a promise to ensure we wait to stop server + await new Promise((resolve, reject) => { + const sequence = response[0]; - const attemptRequest = - new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); - attemptRequest.name = sequence.name!; + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; + + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings + ); + attemptStream.on('data', (response: {content: string}) => { + finalData.push(response.content); + }); + attemptStream.on('error', () => { + reject( + 'testServerStreamingRetriesImmediatelywithRetryOptions stream had an error' + ); + }); + attemptStream.on('end', () => { + attemptStream.end(); + resolve(); + }); + }).then(() => { + assert.equal( + finalData.join(' '), + 'This is This is testing the brand new and shiny StreamingSequence server 3' + ); + }); +} +// sequence that fails on the first error in the sequence +// tests streams connected by pumpify +async function testImmediateStreamingErrorNoBufferPumpify( + client: SequenceServiceClient +) { + const backoffSettings = createBackoffSettings( + 100, + 1.2, + 1000, + null, + 1.5, + 3000, + 10000 + ); + const allowedCodes = [4]; + const retryOptions = new RetryOptions(allowedCodes, backoffSettings); + + const settings = { + retry: retryOptions, + }; + + client.initialize(); + + const request = createStreamingSequenceRequestFactory( + [Status.UNAVAILABLE, Status.DEADLINE_EXCEEDED, Status.OK], + [0.1, 0.1, 0.1], + [0, 2, 11], //error before any data is sent + 'This is testing the brand new and shiny StreamingSequence server 3' + ); + + const response = await client.createStreamingSequence(request); + // wrap in a promise to ensure we wait to stop server + await new Promise((resolve, reject) => { + const sequence = response[0]; + + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; + + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings + ); + const secondStream = new PassThrough({objectMode: true}); + const thirdStream = new PassThrough({objectMode: true}); + + const togetherStream = pumpify.obj([ + attemptStream, + secondStream, + thirdStream, + ]); + attemptStream.on('data', () => { + reject('testImmediateStreamingErrorNoBufferPumpify error attemptStream'); + }); + + togetherStream.on('data', () => { + reject('testImmediateStreamingErrorNoBufferPumpify error togetherStream'); + }); + + // when using pipeline it is expected that togetherStream would log before thirdStream because they're basically invoking the same thing + // imagine togetherStream is three physical pipes put together - what comes out of the third section of pipe is the same + // as what comes out of the whole thing and arrives at the same time + + // when using pumpify, only first stream and final stream will be logged + togetherStream.on('error', (e: GoogleError) => { + assert.strictEqual(e.code, 14); + }); + + attemptStream.on('error', (e: GoogleError) => { + assert.strictEqual(e.code, 14); + }); + + attemptStream.on('end', () => { + reject('testImmediateStreamingErrorNoBufferPumpify ended without error'); + }); + + togetherStream.on('end', () => { + reject('testImmediateStreamingErrorNoBufferPumpify ended without error'); + }); + togetherStream.on('close', () => { + // streams should already be cleaned up + // but end them anyway for posterity + attemptStream.end(); + secondStream.end(); + thirdStream.end(); + togetherStream.end(); + resolve(); + }); + }); +} + +// sequence that fails on the first error in the sequence +// tests streams connected by pipeline +async function testImmediateStreamingErrorNoBufferPipeline( + client: SequenceServiceClient +) { + const backoffSettings = createBackoffSettings( + 100, + 1.2, + 1000, + null, + 1.5, + 3000, + 10000 + ); + const allowedCodes = [4]; + const retryOptions = new RetryOptions(allowedCodes, backoffSettings); + + const settings = { + retry: retryOptions, + }; + + client.initialize(); + + const request = createStreamingSequenceRequestFactory( + [Status.UNAVAILABLE, Status.DEADLINE_EXCEEDED, Status.OK], + [0.1, 0.1, 0.1], + [0, 2, 11], //error before any data is sent + 'This is testing the brand new and shiny StreamingSequence server 3' + ); + const results: string[] = []; + + const response = await client.createStreamingSequence(request); + // wrap in a promise to ensure we wait to stop server + await new Promise((resolve, reject) => { + const sequence = response[0]; + + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; + + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings + ); + const secondStream = new PassThrough({objectMode: true}); + const thirdStream = new PassThrough({objectMode: true}); + + const togetherStream = pipeline( + [attemptStream, secondStream, thirdStream], + err => { + if (!err) { + reject( + 'testImmediateStreamingErrorNoBufferPipeline suceeded, expected error' + ); + } + } + ); + attemptStream.on('data', () => { + reject( + 'testImmediateStreamingErrorNoBufferPipeline received data, should have errored' + ); + }); + + togetherStream.on('error', (e: GoogleError) => { + results.push('togetherStream'); + assert.strictEqual(e.code, 14); + }); + secondStream.on('error', (e: GoogleError) => { + results.push('secondStream'); + assert.strictEqual(e.code, 14); + }); + thirdStream.on('error', (e: GoogleError) => { + results.push('thirdStream'); // this won't happen + assert.strictEqual(e.code, 14); + }); + attemptStream.on('error', (e: GoogleError) => { + results.push('attemptStream'); + assert.strictEqual(e.code, 14); + }); + togetherStream.on('end', () => { + reject('testImmediateStreamingErrorNoBufferPipeline ended without error'); + }); + togetherStream.on('close', () => { + // these should be cleaned up already but + // it doesn't hurt to call end + attemptStream.end(); + secondStream.end(); + thirdStream.end(); + togetherStream.end(); + resolve(); + }); + }).then(() => { + // when using pipeline it is expected that togetherStream would log before thirdStream because they're basically invoking the same thing + // imagine togetherStream is three physical pipes put together - what comes out of the third section of pipe is the same + // as what comes out of the whole thing and arrives at the same time + assert.deepStrictEqual(results, [ + 'attemptStream', + 'secondStream', + 'togetherStream', + 'thirdStream', + ]); + }); +} + +// sequence that throws a retriable error code before receiving data +// then succeeds +async function testImmediateStreamingErrorThenSucceedsNoBufferYesRetryPumpify( + client: SequenceServiceClient +) { + const backoffSettings = createBackoffSettings( + 100, + 1.2, + 1000, + null, + 1.5, + 3000, + 10000 + ); + const allowedCodes = [14]; + const retryOptions = new RetryOptions(allowedCodes, backoffSettings); // default resumption strategy starts from beginning which is fine in this case + + const settings = { + retry: retryOptions, + }; + + client.initialize(); + const baseArray = Array.from(Array(100).keys()); + let testString = ''; + + for (let i = 0; i < baseArray.length; i++) { + testString = testString.concat(baseArray[i].toString() + ' '); + } + + const request = createStreamingSequenceRequestFactory( + [Status.UNAVAILABLE, Status.OK], + [0.1, 0.1], + [0, 100], // throw a retryable error code immediately + testString + ); + + const response = await client.createStreamingSequence(request); + const results: string[] = []; + const results2: string[] = []; + // wrap in a promise to ensure we wait to stop server + await new Promise((resolve, reject) => { + const sequence = response[0]; + + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; + + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings + ); + const secondStream = new PassThrough({objectMode: true}); + const thirdStream = new PassThrough({objectMode: true}); + + const togetherStream = pumpify.obj([ + attemptStream, + secondStream, + thirdStream, + ]); + + attemptStream.on('data', (data: {content: string}) => { + results.push(data.content); + }); + + togetherStream.on('data', (data: {content: string}) => { + results2.push(data.content); + }); + + togetherStream.on('error', (e: GoogleError) => { + reject( + 'testImmediateStreamingErrorThenSucceedsNoBufferYesRetryPumpify error ' + + e + ); + }); + + togetherStream.on('close', () => { + reject( + 'testImmediateStreamingErrorThenSucceedsNoBufferYesRetryPumpify closed on an error' + ); + }); + togetherStream.on('end', () => { + attemptStream.end(); + secondStream.end(); + thirdStream.end(); + togetherStream.end(); + resolve(); + }); + }).then(() => { + assert.strictEqual(results.length, 100); + assert.strictEqual(results2.length, 100); + }); +} + +// sequence that throws a retriable error code before receiving data +// then succeeds +async function testImmediateStreamingErrorThenSucceedsNoBufferYesRetryPipeline( + client: SequenceServiceClient +) { + const backoffSettings = createBackoffSettings( + 100, + 1.2, + 1000, + null, + 1.5, + 3000, + 10000 + ); + const allowedCodes = [14]; + const retryOptions = new RetryOptions(allowedCodes, backoffSettings); // default resumption strategy starts from beginning which is fine in this case + + const settings = { + retry: retryOptions, + }; + + client.initialize(); + const baseArray = Array.from(Array(100).keys()); + let testString = ''; + + for (let i = 0; i < baseArray.length; i++) { + testString = testString.concat(baseArray[i].toString() + ' '); + } + + const request = createStreamingSequenceRequestFactory( + [Status.UNAVAILABLE, Status.OK], + [0.1, 0.1], + [0, 100], // throw a retryable error code immediately + testString + ); + + const response = await client.createStreamingSequence(request); + const results: String[] = []; + const results2: String[] = []; + // wrap in a promise to ensure we wait to stop server + await new Promise((resolve, reject) => { + const sequence = response[0]; + + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; + + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings + ); + const secondStream = new PassThrough({objectMode: true}); + const thirdStream = new PassThrough({objectMode: true}); + + const togetherStream = pipeline( + [attemptStream, secondStream, thirdStream], + err => { + if (err) { + reject( + 'testImmediateStreamingErrorThenSucceedsNoBufferYesRetryPipeline failed' + + err + ); + } else { + assert.strictEqual(results.length, 100); + } + } + ); + attemptStream.on('data', (data: {content: string}) => { + results.push(data.content); + }); + + togetherStream.on('data', (data: {content: string}) => { + results2.push(data.content); + }); + + togetherStream.on('error', (e: GoogleError) => { + reject( + 'testImmediateStreamingErrorThenSucceedsNoBufferYesRetryPipeline error ' + + e.message + ); + }); + + togetherStream.on('end', () => { + attemptStream.end(); + secondStream.end(); + thirdStream.end(); + togetherStream.end(); + resolve(); + }); + // we only check for attemptStream.on("close") which would indicate + // closing on an error. togetherStream.on("close") is handled + // by the pipeline constructor earlier in the test + attemptStream.on('close', () => { + reject( + 'testImmediateStreamingErrorThenSucceedsNoBufferYesRetryPipeline closed on error' + ); + }); + }).then(() => { + assert.strictEqual(results.length, 100); + assert.strictEqual(results2.length, 100); + }); +} +// sequence that errors immediately, retries, then succeeds +async function testImmediateStreamingErrorNoBufferYesRetryRequestRetryPipeline( + client: SequenceServiceClient +) { + const retryRequestOptions = { + objectMode: true, + shouldRetryFn: () => { + return true; + }, + }; //always retry + + const settings = { + retryRequestOptions: retryRequestOptions, + }; + + client.initialize(); + const baseArray = Array.from(Array(100).keys()); + let testString = ''; + + for (let i = 0; i < baseArray.length; i++) { + testString = testString.concat(baseArray[i].toString() + ' '); + } + + const request = createStreamingSequenceRequestFactory( + [Status.UNAVAILABLE, Status.OK], + [0.1, 0.1], + [0, 100], //error before any data is sent but retry + testString + ); + + const response = await client.createStreamingSequence(request); + const results: String[] = []; + const results2: String[] = []; + // wrap in a promise to ensure we wait to stop server + await new Promise((resolve, reject) => { + const sequence = response[0]; + + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; + + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings + ); + const secondStream = new PassThrough({objectMode: true}); + const thirdStream = new PassThrough({objectMode: true}); + + const togetherStream = pipeline( + [attemptStream, secondStream, thirdStream], + err => { + if (err) { + reject( + 'testStreamingPipelineSucceedsAfterDataNoBufferNoRetryPipeline error' + + err + ); + } else { + assert.strictEqual(results.length, 100); + } + } + ); + + attemptStream.on('data', (data: {content: string}) => { + results.push(data.content); + }); + + togetherStream.on('data', (data: {content: string}) => { + results2.push(data.content); + }); + togetherStream.on('error', (e: GoogleError) => { + reject( + 'testStreamingPipelineSucceedsAfterDataNoBufferNoRetryPipeline error' + + e + ); + }); + + attemptStream.on('error', (e: GoogleError) => { + reject( + 'testStreamingPipelineSucceedsAfterDataNoBufferNoRetryPipeline error' + + e + ); + }); + // we only check for attemptStream.on("close") which would indicate + // closing on an error. togetherStream.on("close") is handled + // by the pipeline constructor earlier in the test + attemptStream.on('close', (e: GoogleError) => { + reject( + 'testImmediateStreamingErrorNobufferYesRetryRequestRetryPipeline closed on error and should not have ' + + e + ); + }); + togetherStream.on('end', () => { + attemptStream.end(); + secondStream.end(); + thirdStream.end(); + togetherStream.end(); + resolve(); + }); + }).then(() => { + assert.strictEqual(results.length, 100); + assert.strictEqual(results2.length, 100); + }); +} +// sequence that errors immediately, retries, then succeeds +async function testImmediateStreamingErrorNoBufferYesRetryRequestRetryPumpify( + client: SequenceServiceClient +) { + const retryRequestOptions = { + objectMode: true, + shouldRetryFn: () => { + return true; + }, + }; //always retry + + const settings = { + retryRequestOptions: retryRequestOptions, + }; + + client.initialize(); + const baseArray = Array.from(Array(100).keys()); + let testString = ''; + + for (let i = 0; i < baseArray.length; i++) { + testString = testString.concat(baseArray[i].toString() + ' '); + } + + const request = createStreamingSequenceRequestFactory( + [Status.UNAVAILABLE, Status.OK], + [0.1, 0.1], + [0, 100], //error before any data is sent but retry + testString + ); + + const response = await client.createStreamingSequence(request); + const results: string[] = []; + const results2: string[] = []; + // wrap in a promise to ensure we wait to stop server + await new Promise((resolve, reject) => { + const sequence = response[0]; + + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; + + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings + ); + const secondStream = new PassThrough({objectMode: true}); + const thirdStream = new PassThrough({objectMode: true}); + const togetherStream = pumpify.obj([ + attemptStream, + secondStream, + thirdStream, + ]); + + attemptStream.on('data', (data: {content: string}) => { + results.push(data.content); + }); + + togetherStream.on('data', (data: {content: string}) => { + results2.push(data.content); + }); + togetherStream.on('error', (e: GoogleError) => { + reject( + 'testImmediateStreamingErrorNoBufferYesRetryRequestRetryPumpify ' + e + ); + }); + + attemptStream.on('error', (e: GoogleError) => { + reject( + 'testImmediateStreamingErrorNoBufferYesRetryRequestRetryPumpify ' + e + ); + }); + togetherStream.on('close', () => { + reject( + 'testImmediateStreamingErrorNoBufferYesRetryRequestRetryPumpify closed on error' + ); + }); + togetherStream.on('end', () => { + attemptStream.end(); + secondStream.end(); + thirdStream.end(); + togetherStream.end(); + resolve(); + }); + }).then(() => { + assert.strictEqual(results.length, 100); + assert.strictEqual(results2.length, 100); + }); +} + +// sequence that errors after 85 items are sent and retries +async function testStreamingPipelineErrorAfterDataNoBufferYesRetryPumpify( + client: SequenceServiceClient +) { + const backoffSettings = createBackoffSettings( + 100, + 1.2, + 1000, + null, + 1.5, + 3000, + 10000 + ); + const allowedCodes = [14]; + const getResumptionRequestFn = (request: RequestType) => { + const newRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest() as unknown as RequestType; + newRequest.name = request.name; + newRequest.lastFailIndex = results.length; // we will have gotten 85 items, so start at the next index (which is 85) + return newRequest as unknown as RequestType; + }; + const retryOptions = new RetryOptions( + allowedCodes, + backoffSettings, + undefined, + getResumptionRequestFn + ); + + const settings = { + retry: retryOptions, + }; + + client.initialize(); + const baseArray = Array.from(Array(100).keys()); + let testString = ''; + + for (let i = 0; i < baseArray.length; i++) { + testString = testString.concat(baseArray[i].toString() + ' '); + } + + const request = createStreamingSequenceRequestFactory( + [Status.UNAVAILABLE, Status.OK], + [0.1, 0.1, 0.1], + [85, 100], //error after the 85th item + testString + ); + + const response = await client.createStreamingSequence(request); + const results: string[] = []; + const results2: string[] = []; + // wrap in a promise to ensure we wait to stop server + await new Promise((resolve, reject) => { + const sequence = response[0]; + + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; + + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings + ); + const secondStream = new PassThrough({objectMode: true}); + const thirdStream = new PassThrough({objectMode: true}); + + const togetherStream: Duplex = pumpify.obj([ + attemptStream, + secondStream, + thirdStream, + ]); + + attemptStream.on('data', (data: {content: string}) => { + results.push(data.content); + }); + + attemptStream.on('end', () => { + assert.strictEqual(results.length, 100); + }); + + togetherStream.on('data', (data: {content: string}) => { + results2.push(data.content); + }); + + attemptStream.on('error', (e: GoogleError) => { + assert.strictEqual(e.code, 14); + }); + + togetherStream.on('end', () => { + resolve(); + }); + + togetherStream.on('error', (e: GoogleError) => { + reject('testStreamingPipelineErrorAfterDataNoBufferYesRetryPumpify' + e); + }); + + togetherStream.on('close', () => { + reject( + 'testStreamingPipelineErrorAfterDataNoBufferYesRetryPumpify errored before all data was sent' + ); + }); + }).then(() => { + assert.strictEqual(results.length, 100); + assert.strictEqual(results2.length, 100); + }); +} +// sequence that errors after 85 items are sent and retries +async function testStreamingPipelineErrorAfterDataNoBufferYesRetryPipeline( + client: SequenceServiceClient +) { + const backoffSettings = createBackoffSettings( + 100, + 1.2, + 1000, + null, + 1.5, + 3000, + 10000 + ); + const allowedCodes = [14]; + const results: string[] = []; + const results2: string[] = []; + const getResumptionRequestFn = (request: RequestType) => { + const newRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest() as unknown as RequestType; + newRequest.name = request.name; + newRequest.lastFailIndex = results.length; // we will have gotten 85 items, so start at the next index (which is 85) + return newRequest as unknown as RequestType; + }; + const retryOptions = new RetryOptions( + allowedCodes, + backoffSettings, + undefined, + getResumptionRequestFn + ); - const attemptStream = client.attemptStreamingSequence( - attemptRequest, - settings + const settings = { + retry: retryOptions, + }; + + client.initialize(); + const baseArray = Array.from(Array(100).keys()); + let testString = ''; + + for (let i = 0; i < baseArray.length; i++) { + testString = testString.concat(baseArray[i].toString() + ' '); + } + + const request = createStreamingSequenceRequestFactory( + [Status.UNAVAILABLE, Status.OK], + [0.1, 0.1, 0.1], + [85, 100], //error after the 85th item + testString ); - attemptStream.on('data', (response: {content: string}) => { - finalData.push(response.content); + + const response = await client.createStreamingSequence(request); + + // wrap in a promise to ensure we wait to stop server + await new Promise((resolve, reject) => { + const sequence = response[0]; + + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; + + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings + ); + const secondStream = new PassThrough({objectMode: true}); + const thirdStream = new PassThrough({objectMode: true}); + + const togetherStream = pipeline( + [attemptStream, secondStream, thirdStream], + err => { + if (err) { + reject( + 'testStreamingPipelineErrorAfterDataNoBufferYesRetryPipeline errored ' + + err + ); + } else { + resolve(); + } + } + ); + + attemptStream.on('data', (data: {content: string}) => { + results.push(data.content); + }); + + attemptStream.on('end', () => { + assert.strictEqual(results.length, 100); + }); + + togetherStream.on('data', (data: {content: string}) => { + results2.push(data.content); + }); + + attemptStream.on('error', (e: GoogleError) => { + assert.strictEqual(e.code, 14); + }); + + togetherStream.on('end', () => { + resolve(); + }); + + togetherStream.on('error', (e: GoogleError) => { + reject('testStreamingPipelineErrorAfterDataNoBufferYesRetryPipeline' + e); + }); + // we only check for attemptStream.on("close") which would indicate + // closing on an error. togetherStream.on("close") is handled + // by the pipeline constructor earlier in the test + attemptStream.on('close', () => { + reject( + 'testStreamingPipelineErrorAfterDataNoBufferYesRetryPipeline errored before all data was sent' + ); + }); + }).then(() => { + assert.strictEqual(results.length, 100); + assert.strictEqual(results2.length, 100); }); - attemptStream.on('error', () => { - throw new Error( - 'testServerStreamingRetriesImmediatelywithRetryOptions stream had an error' +} + +// successful sequence with no errors +async function testStreamingPipelineSucceedsAfterDataNoBufferNoRetryPumpify( + client: SequenceServiceClient +) { + const backoffSettings = createBackoffSettings( + 100, + 1.2, + 1000, + null, + 1.5, + 3000, + 10000 + ); + const allowedCodes = [4]; + const retryOptions = new RetryOptions(allowedCodes, backoffSettings); + + const settings = { + retry: retryOptions, + }; + + client.initialize(); + const baseArray = Array.from(Array(100).keys()); + let testString = ''; + + for (let i = 0; i < baseArray.length; i++) { + testString = testString.concat(baseArray[i].toString() + ' '); + } + + const request = createStreamingSequenceRequestFactory( + [Status.OK], + [0.1], + [100], //succeed at the end + testString + ); + + const response = await client.createStreamingSequence(request); + const results: string[] = []; + const results2: string[] = []; + // wrap in a promise to ensure we wait to stop server + await new Promise((resolve, reject) => { + const sequence = response[0]; + + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; + + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings ); + + const secondStream = new PassThrough({objectMode: true}); + const thirdStream = new PassThrough({objectMode: true}); + + const togetherStream = pumpify.obj([ + attemptStream, + secondStream, + thirdStream, + ]); + attemptStream.on('data', (data: {content: string}) => { + results.push(data.content); + }); + + attemptStream.on('end', () => { + assert.strictEqual(results.length, 100); + }); + + attemptStream.on('error', (e: GoogleError) => { + reject( + 'testStreamingPipelineSucceedsAfterDataNoBufferNoRetryPumpify attemptStream error' + + e + ); + }); + togetherStream.on('data', (data: {content: string}) => { + results2.push(data.content); + }); + togetherStream.on('error', (e: GoogleError) => { + reject( + 'testStreamingPipelineSucceedsAfterDataNoBufferNoRetryPumpify togetherStream error ' + + e + ); + }); + togetherStream.on('end', () => { + attemptStream.end(); + secondStream.end(); + thirdStream.end(); + togetherStream.end(); + resolve(); + }); + }).then(() => { + assert.strictEqual(results.length, 100); + assert.strictEqual(results2.length, 100); }); - attemptStream.on('end', () => { - assert.equal( - finalData.join(' '), - 'This is This is testing the brand new and shiny StreamingSequence server 3' +} +// successful sequence with no errors +async function testStreamingPipelineSucceedsAfterDataNoBufferNoRetryPipeline( + client: SequenceServiceClient +) { + const backoffSettings = createBackoffSettings( + 100, + 1.2, + 1000, + null, + 1.5, + 3000, + 10000 + ); + const allowedCodes = [4]; + const retryOptions = new RetryOptions(allowedCodes, backoffSettings); + + const settings = { + retry: retryOptions, + }; + + client.initialize(); + const baseArray = Array.from(Array(100).keys()); + let testString = ''; + + for (let i = 0; i < baseArray.length; i++) { + testString = testString.concat(baseArray[i].toString() + ' '); + } + + const request = createStreamingSequenceRequestFactory( + [Status.OK], + [0.1], + [100], //succeed at the end + testString + ); + + const response = await client.createStreamingSequence(request); + const results: string[] = []; + const results2: string[] = []; + await new Promise((resolve, reject) => { + const sequence = response[0]; + + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; + + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings + ); + + const secondStream = new PassThrough({objectMode: true}); + const thirdStream = new PassThrough({objectMode: true}); + + const togetherStream = pipeline( + [attemptStream, secondStream, thirdStream], + err => { + // if an error happens on any stream in the pipeline this will be called + if (err) { + reject( + 'testStreamingPipelineSucceedsAfterDataNoBufferNoRetryPipeline error' + ); + } else { + // this handles the error free closure of the stream + resolve(); + } + } ); - attemptStream.end(); + attemptStream.on('data', (data: {content: string}) => { + results.push(data.content); + }); + + attemptStream.on('end', () => { + assert.strictEqual(results.length, 100); + }); + + togetherStream.on('data', (data: {content: string}) => { + results2.push(data.content); + }); + + togetherStream.on('end', () => { + attemptStream.end(); + secondStream.end(); + thirdStream.end(); + togetherStream.end(); + resolve(); + }); + + // we only check for attemptStream.on("close") which would indicate + // closing on an error. togetherStream.on("close") is handled + // by the pipeline constructor earlier in the test + attemptStream.on('close', () => { + reject( + 'testStreamingPipelineSucceedsAfterDataNoBufferNoRetryPipeline closed on error' + ); + }); + }).then(() => { + assert.strictEqual(results.length, 100); + assert.strictEqual(results2.length, 100); + }); +} + +// sequence that fails after receiving 85 items +async function testStreamingPipelineErrorAfterDataNoBufferNoRetryPumpify( + client: SequenceServiceClient +) { + const backoffSettings = createBackoffSettings( + 100, + 1.2, + 1000, + null, + 1.5, + 3000, + 10000 + ); + const allowedCodes = [4]; // not the code we're going to error on + const retryOptions = new RetryOptions(allowedCodes, backoffSettings); + + const settings = { + retry: retryOptions, + }; + + client.initialize(); + const baseString = 'zero one two three four five six seven eight nine '; + let testString = ''; + + const repeats = 100; + for (let i = 0; i < repeats; i++) { + testString = testString.concat(baseString); + } + + const request = createStreamingSequenceRequestFactory( + [Status.UNAVAILABLE, Status.DEADLINE_EXCEEDED, Status.OK], + [0.1, 0.1, 0.1], + [85, 99, 100], //error at the 85th + testString + ); + + const response = await client.createStreamingSequence(request); + const results: string[] = []; + const results2: string[] = []; + // wrap in a promise to ensure we wait to stop server + await new Promise((resolve, reject) => { + const sequence = response[0]; + + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; + + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings + ); + + const secondStream = new PassThrough({objectMode: true}); + const thirdStream = new PassThrough({objectMode: true}); + + const togetherStream = pumpify.obj([ + attemptStream, + secondStream, + thirdStream, + ]); + attemptStream.on('data', (data: {content: string}) => { + results.push(data.content); + }); + + attemptStream.on('error', (e: GoogleError) => { + assert.strictEqual(results.length, 85); + assert.strictEqual(e.code, 14); + }); + togetherStream.on('data', (data: {content: string}) => { + results2.push(data.content); + }); + togetherStream.on('error', (e: GoogleError) => { + assert.strictEqual(e.code, 14); + }); + togetherStream.on('end', () => { + reject( + 'testStreamingPipelineErrorAfterDataNoBufferNoRetryPumpify ended cleanly but should have errored' + ); + }); + togetherStream.on('close', () => { + attemptStream.end(); + secondStream.end(); + thirdStream.end(); + togetherStream.end(); + resolve(); + }); + }).then(() => { + assert.strictEqual(results2.length, 85); + assert.strictEqual(results.length, 85); + }); +} +// sequence that fails after receiving 85 items +async function testStreamingPipelineErrorAfterDataNoBufferNoRetryPipeline( + client: SequenceServiceClient +) { + const backoffSettings = createBackoffSettings( + 100, + 1.2, + 1000, + null, + 1.5, + 3000, + 10000 + ); + const allowedCodes = [4]; // not the code we're going to error on + const retryOptions = new RetryOptions(allowedCodes, backoffSettings); + + const settings = { + retry: retryOptions, + }; + + client.initialize(); + const baseString = 'zero one two three four five six seven eight nine '; + let testString = ''; + + const repeats = 100; + for (let i = 0; i < repeats; i++) { + testString = testString.concat(baseString); + } + + const request = createStreamingSequenceRequestFactory( + [Status.UNAVAILABLE, Status.DEADLINE_EXCEEDED, Status.OK], + [0.1, 0.1, 0.1], + [85, 99, 100], //error at the 85th + testString + ); + + const response = await client.createStreamingSequence(request); + const results: string[] = []; + const results2: string[] = []; + // wrap in a promise to ensure we wait to stop server + await new Promise((resolve, reject) => { + const sequence = response[0]; + + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; + + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings + ); + + const secondStream = new PassThrough({objectMode: true}); + const thirdStream = new PassThrough({objectMode: true}); + + const togetherStream = pipeline( + [attemptStream, secondStream, thirdStream], + err => { + // else case is handled by togetherStream.on("close") below + if (!err) { + reject( + 'testImmediateStreamingErrorNoBufferPipeline suceeded, expected error' + ); + } + } + ); + attemptStream.on('data', (data: {content: string}) => { + results.push(data.content); + }); + + attemptStream.on('error', (e: GoogleError) => { + assert.strictEqual(results.length, 85); + assert.strictEqual(e.code, 14); + }); + togetherStream.on('data', (data: {content: string}) => { + results2.push(data.content); + }); + togetherStream.on('error', (e: GoogleError) => { + assert.strictEqual(e.code, 14); + }); + togetherStream.on('close', () => { + attemptStream.end(); + secondStream.end(); + thirdStream.end(); + togetherStream.end(); + resolve(); + }); + }).then(() => { + assert.strictEqual(results2.length, 85); + assert.strictEqual(results.length, 85); + }); +} +async function testStreamingErrorAfterDataNoBufferNoRetry( + client: SequenceServiceClient +) { + const backoffSettings = createBackoffSettings( + 100, + 1.2, + 1000, + null, + 1.5, + 3000, + 10000 + ); + const allowedCodes = [4]; + const retryOptions = new RetryOptions(allowedCodes, backoffSettings); + + const settings = { + retry: retryOptions, + }; + + client.initialize(); + const baseString = 'zero one two three four five six seven eight nine '; + let testString = ''; + + const repeats = 100; + for (let i = 0; i < repeats; i++) { + testString = testString.concat(baseString); + } + + const request = createStreamingSequenceRequestFactory( + [Status.UNAVAILABLE, Status.DEADLINE_EXCEEDED, Status.OK], + [0.5, 0.1, 0.1], + [85, 155, 99], + testString + ); + + const response = await client.createStreamingSequence(request); + const results = []; + + // wrap in a promise to ensure we wait to stop server + await new Promise((resolve, reject) => { + const sequence = response[0]; + + const attemptRequest = + new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest(); + attemptRequest.name = sequence.name!; + + const attemptStream = client.attemptStreamingSequence( + attemptRequest, + settings + ); + + attemptStream.on('data', data => { + results.push(data); + }); + + attemptStream.on('error', (e: GoogleError) => { + assert.strictEqual(e.code, 14); + resolve(); + }); + attemptStream.on('end', () => { + reject( + 'testStreamingErroAfterDataNoBufferNoRetry finished successfully but should have errored' + ); + }); + attemptStream.on('close', () => { + resolve(); + }); + }).then(() => { + assert.strictEqual(results.length, 85); // we chose to throw an error after the 85th result }); } diff --git a/gax/test/unit/streaming.ts b/gax/test/unit/streaming.ts index 5f6944002..8365421f7 100644 --- a/gax/test/unit/streaming.ts +++ b/gax/test/unit/streaming.ts @@ -21,7 +21,12 @@ import * as sinon from 'sinon'; import {afterEach, describe, it} from 'mocha'; import {PassThrough} from 'stream'; -import {GaxCallStream, GRPCCall, RequestType} from '../../src/apitypes'; +import { + GaxCallStream, + GRPCCall, + RequestType, + CancellableStream, +} from '../../src/apitypes'; import {createApiCall} from '../../src/createApiCall'; import {StreamingApiCaller} from '../../src/streamingCalls/streamingApiCaller'; import * as gax from '../../src/gax'; @@ -234,8 +239,7 @@ describe('streaming', () => { }); it('cancels in the middle', done => { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - function schedulePush(s: any, c: number) { + function schedulePush(s: CancellableStream, c: number) { const intervalId = setInterval(() => { s.push(c); c++; @@ -248,10 +252,9 @@ describe('streaming', () => { function func() { const s = new PassThrough({ objectMode: true, - }); + }) as unknown as CancellableStream; schedulePush(s, 0); - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (s as any).cancel = () => { + s.cancel = () => { s.end(); s.emit('error', cancelError); }; @@ -294,6 +297,70 @@ describe('streaming', () => { done(); }); }); + it('cancels in the middle when new retries are enabled', done => { + function schedulePush(s: CancellableStream, c: number) { + const intervalId = setInterval(() => { + s.push(c); + c++; + }, 10); + s.on('finish', () => { + clearInterval(intervalId); + }); + } + const cancelError = new Error('cancelled'); + function func() { + const s = new PassThrough({ + objectMode: true, + }) as unknown as CancellableStream; + schedulePush(s, 0); + s.cancel = () => { + s.end(); + s.emit('error', cancelError); + }; + setImmediate(() => { + s.emit('metadata'); + }); + setImmediate(() => { + s.emit('status'); + }); + return s; + } + const apiCall = createApiCallStreaming( + //@ts-ignore + func, + streaming.StreamType.SERVER_STREAMING, + false, + true // gax native retries + ); + const s = apiCall( + {}, + { + retry: gax.createRetryOptions([5], { + initialRetryDelayMillis: 100, + retryDelayMultiplier: 1.2, + maxRetryDelayMillis: 1000, + rpcTimeoutMultiplier: 1.5, + maxRpcTimeoutMillis: 3000, + maxRetries: 0, + }), + } + ); + let counter = 0; + const expectedCount = 5; + s.on('data', data => { + assert.strictEqual(data, counter); + counter++; + if (counter === expectedCount) { + s.cancel(); + } else if (counter > expectedCount) { + done(new Error('should not reach')); + } + }); + s.on('error', err => { + assert.strictEqual(err, cancelError); + done(); + }); + }); it('emit response when stream received metadata event', done => { const responseMetadata = {metadata: true}; @@ -309,7 +376,9 @@ describe('streaming', () => { const s = new PassThrough({ objectMode: true, }); - s.push(null); + setImmediate(() => { + s.push(null); + }); setImmediate(() => { s.emit('metadata', responseMetadata); }); @@ -829,11 +898,91 @@ describe('streaming', () => { done(); }); }); - it('emit transient error on second or later error when new retries are enabled', done => { - // stubbing cancel is needed because PassThrough doesn't have - // a cancel method and cancel is called as part of the retry - const cancelStub = sinon.stub(streaming.StreamProxy.prototype, 'cancel'); + it('emit transient error message if neither maxRetries nor totaltimeout are defined when new retries are enabled', done => { + const errorInfoObj = { + reason: 'SERVICE_DISABLED', + domain: 'googleapis.com', + metadata: { + consumer: 'projects/455411330361', + service: 'translate.googleapis.com', + }, + }; + const errorProtoJson = require('../../protos/status.json'); + const root = protobuf.Root.fromJSON(errorProtoJson); + const errorInfoType = root.lookupType('ErrorInfo'); + const buffer = errorInfoType.encode(errorInfoObj).finish() as Buffer; + const any = { + type_url: 'type.googleapis.com/google.rpc.ErrorInfo', + value: buffer, + }; + const status = {code: 3, message: 'test', details: [any]}; + const Status = root.lookupType('google.rpc.Status'); + const status_buffer = Status.encode(status).finish() as Buffer; + const metadata = new Metadata(); + metadata.set('grpc-status-details-bin', status_buffer); + const error = Object.assign(new GoogleError('test error'), { + code: 3, + details: 'Failed to read', + metadata: metadata, + }); + const spy = sinon.spy((...args: Array<{}>) => { + assert.strictEqual(args.length, 3); + const s = new PassThrough({ + objectMode: true, + }); + s.push(null); + setImmediate(() => { + // emits an error not in our included retry codes + s.emit('error', error); + }); + setImmediate(() => { + s.emit('status', status); + }); + + return s; + }); + const apiCall = createApiCallStreaming( + spy, + streaming.StreamType.SERVER_STREAMING, + false, + true // new retry behavior enabled + ); + + const s = apiCall( + {}, + { + retry: gax.createRetryOptions([5], { + initialRetryDelayMillis: 100, + retryDelayMultiplier: 1.2, + maxRetryDelayMillis: 1000, + rpcTimeoutMultiplier: 1.5, + maxRpcTimeoutMillis: 3000, + // neither maxRetries nor totalTimeoutMillis is defined + }), + } + ); + + s.on('error', err => { + s.pause(); + s.destroy(); + + assert(err instanceof GoogleError); + assert.deepStrictEqual(err.message, 'test error'); + assert.deepStrictEqual( + err.note, + 'Exception occurred in retry method that was not classified as transient' + ); + assert.strictEqual(err.domain, errorInfoObj.domain); + assert.strictEqual(err.reason, errorInfoObj.reason); + assert.strictEqual( + JSON.stringify(err.errorInfoMetadata), + JSON.stringify(errorInfoObj.metadata) + ); + done(); + }); + }); + it('emit transient error on second or later error when new retries are enabled', done => { const errorInfoObj = { reason: 'SERVICE_DISABLED', domain: 'googleapis.com', @@ -877,7 +1026,9 @@ describe('streaming', () => { case 0: e = error; - s.push(null); + setImmediate(() => { + s.push('hello'); + }); setImmediate(() => { s.emit('error', e); // is included in our retry codes }); @@ -890,7 +1041,9 @@ describe('streaming', () => { case 1: e = error2; // is not in our retry codes - s.push(null); + setImmediate(() => { + s.push('world'); + }); setImmediate(() => { s.emit('error', e); }); @@ -902,6 +1055,10 @@ describe('streaming', () => { return s; default: + // should not reach this + setImmediate(() => { + s.emit('status', status); + }); setImmediate(() => { s.emit('end'); }); @@ -943,11 +1100,9 @@ describe('streaming', () => { JSON.stringify(err.errorInfoMetadata), JSON.stringify(errorInfoObj.metadata) ); - assert.strictEqual(cancelStub.callCount, 1); done(); }); }); - it('emit error and retry once', done => { const firstError = Object.assign(new GoogleError('UNAVAILABLE'), { code: 14, @@ -963,10 +1118,10 @@ describe('streaming', () => { objectMode: true, }); setImmediate(() => { - s.push('Hello'); - s.push('World'); switch (counter) { case 0: + s.push('Hello'); + s.push('World'); s.emit('error', firstError); counter++; break; @@ -1013,12 +1168,7 @@ describe('streaming', () => { }); }); - it('emit error and retry twice with shouldRetryFn', done => { - // stubbing cancel is needed because PassThrough doesn't have - // a cancel method and cancel is called as part of the retry - sinon.stub(streaming.StreamProxy.prototype, 'cancel').callsFake(() => { - done(); - }); + it('emit error, retry twice, and succeed with shouldRetryFn', done => { const firstError = Object.assign(new GoogleError('UNAVAILABLE'), { code: 14, details: 'UNAVAILABLE', @@ -1034,6 +1184,8 @@ describe('streaming', () => { switch (counter) { case 0: setImmediate(() => { + s.push('Hello'); + s.push('World'); s.emit('error', firstError); }); setImmediate(() => { @@ -1043,6 +1195,8 @@ describe('streaming', () => { return s; case 1: setImmediate(() => { + s.push('testing'); + s.push('retries'); s.emit('error', firstError); }); setImmediate(() => { @@ -1051,6 +1205,14 @@ describe('streaming', () => { counter++; return s; default: + setImmediate(() => { + s.emit('metadata'); + }); + // grpc streams always emit status + setImmediate(() => { + s.emit('status'); + }); + // emit end because there is no more data setImmediate(() => { s.emit('end'); }); @@ -1085,16 +1247,18 @@ describe('streaming', () => { ), } ); - + const finalData: string[] = []; + s.on('data', data => { + finalData.push(data); + }); s.on('end', () => { s.destroy(); assert.strictEqual(counter, 2); + assert.strictEqual(finalData.join(' '), 'Hello World testing retries'); + done(); }); }); it('retries using resumption request function ', done => { - // stubbing cancel is needed because PassThrough doesn't have - // a cancel method and cancel is called as part of the retry - sinon.stub(streaming.StreamProxy.prototype, 'cancel'); const receivedData: string[] = []; const error = Object.assign(new GoogleError('test error'), { code: 14, @@ -1111,8 +1275,11 @@ describe('streaming', () => { }); switch (arg) { case 0: - s.push('Hello'); - s.push('World'); + setImmediate(() => { + s.push('Hello'); + s.push('World'); + }); + setImmediate(() => { s.emit('metadata'); }); @@ -1134,11 +1301,17 @@ describe('streaming', () => { }); return s; case 2: - s.push('testing'); - s.push('retries'); + setImmediate(() => { + s.push('testing'); + s.push('retries'); + }); + setImmediate(() => { s.emit('metadata'); }); + setImmediate(() => { + s.emit('status'); + }); setImmediate(() => { s.emit('end'); }); @@ -1199,9 +1372,6 @@ describe('streaming', () => { }); }); it('errors when there is a resumption request function an gaxStreamingRetries is not enabled', done => { - // stubbing cancel is needed because PassThrough doesn't have - // a cancel method and cancel is called as part of the retry - sinon.stub(streaming.StreamProxy.prototype, 'cancel'); const error = Object.assign(new GoogleError('test error'), { code: 14, details: 'UNAVAILABLE', @@ -1302,9 +1472,75 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en afterEach(() => { sinon.restore(); }); + it('server streaming call retries until exceeding timeout and surfaces underlying error', done => { + const retrySpy = sinon.spy( + streaming.StreamProxy.prototype, + 'throwIfMaxRetriesOrTotalTimeoutExceeded' + ); + const firstError = Object.assign(new GoogleError('UNAVAILABLE'), { + code: 14, + details: 'UNAVAILABLE', + metadata: new Metadata(), + }); - it('server streaming call retries until exceeding max retries', done => { - const retrySpy = sinon.spy(streaming.StreamProxy.prototype, 'retry'); + const spy = sinon.spy((...args: Array<{}>) => { + assert.strictEqual(args.length, 3); + const s = new PassThrough({ + objectMode: true, + }); + setImmediate(() => { + s.emit('metadata'); + }); + setImmediate(() => { + s.emit('error', firstError); + }); + return s; + }); + + const apiCall = createApiCallStreaming( + spy, + streaming.StreamType.SERVER_STREAMING, + false, + true + ); + + const call = apiCall( + {}, + { + retry: gax.createRetryOptions([14], { + initialRetryDelayMillis: 100, + retryDelayMultiplier: 1.2, + maxRetryDelayMillis: 1000, + rpcTimeoutMultiplier: 1.5, + maxRpcTimeoutMillis: 3000, + totalTimeoutMillis: 200, // timeout that ensures it should retry at least once + }), + } + ); + + call.on('error', err => { + try { + assert(err instanceof GoogleError); + if (err.code !== 14) { + // ignore the error we are expecting + assert.strictEqual(err.code, 4); + assert.notStrictEqual(retrySpy.callCount, 0); // it MUST retry at least once + assert.strictEqual( + err.message, + 'Total timeout of API exceeded 200 milliseconds retrying error Error: UNAVAILABLE before any response was received.' + ); + done(); + } + } catch (error: unknown) { + done(error); + } + }); + }); + it('server streaming call retries until exceeding max retries and surfaces underlying error in note', done => { + const retrySpy = sinon.spy( + streaming.StreamProxy.prototype, + 'throwIfMaxRetriesOrTotalTimeoutExceeded' + ); const firstError = Object.assign(new GoogleError('UNAVAILABLE'), { code: 14, details: 'UNAVAILABLE', @@ -1352,10 +1588,10 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en if (err.code !== 14) { // ignore the error we are expecting assert.strictEqual(err.code, 4); - assert.strictEqual(retrySpy.callCount, 2); + assert.strictEqual(retrySpy.callCount, 3); // we pass the first two times assert.strictEqual( err.message, - 'Exceeded maximum number of retries before any response was received' + 'Exceeded maximum number of retries retrying error Error: UNAVAILABLE before any response was received' ); done(); } @@ -1365,7 +1601,11 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en }); }); it('does not retry when there is no shouldRetryFn and retryCodes is an empty array', done => { - const retrySpy = sinon.spy(streaming.StreamProxy.prototype, 'retry'); + // we don't call the timeout/max retry check on non retryable error codes + const retrySpy = sinon.spy( + streaming.StreamProxy.prototype, + 'throwIfMaxRetriesOrTotalTimeoutExceeded' + ); const firstError = Object.assign(new GoogleError('UNAVAILABLE'), { code: 14, details: 'UNAVAILABLE', @@ -1469,7 +1709,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en assert.strictEqual(err.code, 4); assert.strictEqual( err.message, - 'Total timeout of API exceeded 10 milliseconds before any response was received.' + 'Total timeout of API exceeded 10 milliseconds retrying error Error: UNAVAILABLE before any response was received.' ); done(); } @@ -1477,7 +1717,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en }); it('allows custom CallOptions.retry settings with shouldRetryFn instead of retryCodes and new retry behavior', done => { sinon - .stub(streaming.StreamProxy.prototype, 'forwardEventsWithRetries') + .stub(streaming.StreamProxy.prototype, 'eventForwardHelper') .callsFake((stream): undefined => { assert(stream instanceof internal.Stream); done(); @@ -1497,6 +1737,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en true //gaxStreamingRetries ); + // anonymous function is a shouldRetryFn apiCall( {}, { @@ -1519,7 +1760,7 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en }); it('allows custom CallOptions.retry settings with retryCodes and new retry behavior', done => { sinon - .stub(streaming.StreamProxy.prototype, 'forwardEventsWithRetries') + .stub(streaming.StreamProxy.prototype, 'eventForwardHelper') .callsFake((stream): undefined => { assert(stream instanceof internal.Stream); done(); @@ -1555,11 +1796,19 @@ describe('handles server streaming retries in gax when gaxStreamingRetries is en }); it('allows the user to pass a custom resumption strategy', done => { sinon - .stub(streaming.StreamProxy.prototype, 'forwardEventsWithRetries') - .callsFake((stream, retry): undefined => { - assert(stream instanceof internal.Stream); - assert(retry.getResumptionRequestFn instanceof Function); + // typecasting to any is a workaround for stubbing private functions in sinon + // eslint-disable-next-line @typescript-eslint/no-explicit-any + .stub(streaming.StreamProxy.prototype, 'newStreamingRetryRequest' as any) + .callsFake((opts): CancellableStream => { + // @ts-ignore errors on unknown type because newStreamingRetryRequest is a private function + assert(opts.retry.getResumptionRequestFn instanceof Function); done(); + const returnStream = new PassThrough() as unknown as CancellableStream; + returnStream.cancel = () => { + returnStream.destroy(); + }; + // we have to return something like newStreamingRetryRequest does + return returnStream; }); const spy = sinon.spy((...args: Array<{}>) => { assert.strictEqual(args.length, 3); diff --git a/gax/test/unit/streamingRetryRequest.ts b/gax/test/unit/streamingRetryRequest.ts deleted file mode 100644 index fb760916a..000000000 --- a/gax/test/unit/streamingRetryRequest.ts +++ /dev/null @@ -1,111 +0,0 @@ -// Copyright 2023 Google LLC - -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at - -// https://www.apache.org/licenses/LICENSE-2.0 - -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -/* eslint-disable @typescript-eslint/ban-ts-comment */ - -import * as assert from 'assert'; -import * as sinon from 'sinon'; -import {describe, it} from 'mocha'; -import {PassThrough} from 'stream'; - -import {CancellableStream, GaxCallStream, GRPCCall} from '../../src/apitypes'; -import {createApiCall} from '../../src/createApiCall'; -import * as gax from '../../src/gax'; -import {StreamDescriptor} from '../../src/streamingCalls/streamDescriptor'; -import * as streaming from '../../src/streamingCalls/streaming'; -import internal = require('stream'); -import {StreamArrayParser} from '../../src/streamArrayParser'; -import {streamingRetryRequest} from '../../src/streamingRetryRequest'; - -function createApiCallStreaming( - func: - | Promise - | sinon.SinonSpy, internal.Transform | StreamArrayParser>, - type: streaming.StreamType, - rest?: boolean, - gaxStreamingRetries?: boolean -) { - const settings = new gax.CallSettings(); - return createApiCall( - //@ts-ignore - Promise.resolve(func), - settings, - new StreamDescriptor(type, rest, gaxStreamingRetries) - ) as GaxCallStream; -} - -describe('retry-request', () => { - describe('streams', () => { - let receivedData: number[] = []; - it('works with defaults in a stream', done => { - const spy = sinon.spy((...args: Array<{}>) => { - assert.strictEqual(args.length, 3); - const s = new PassThrough({ - objectMode: true, - }); - s.push({resources: [1, 2]}); - s.push({resources: [3, 4, 5]}); - s.push(null); - setImmediate(() => { - s.emit('metadata'); - }); - return s; - }); - - const apiCall = createApiCallStreaming( - spy, - streaming.StreamType.SERVER_STREAMING, - false, - true - ); - - const retryStream = streamingRetryRequest({ - request: () => { - const stream = apiCall( - {}, - { - retry: gax.createRetryOptions([5], { - initialRetryDelayMillis: 100, - retryDelayMultiplier: 1.2, - maxRetryDelayMillis: 1000, - rpcTimeoutMultiplier: 1.5, - maxRpcTimeoutMillis: 3000, - maxRetries: 0, - }), - } - ) as CancellableStream; - return stream; - }, - }) - .on('end', () => { - assert.deepStrictEqual(receivedData, [1, 2, 3, 4, 5]); - done(); - }) - .on('data', (data: {resources: number[]}) => { - receivedData = receivedData.concat(data.resources); - }); - assert.strictEqual(retryStream._readableState.objectMode, true); - }); - it('throws request error', done => { - try { - const opts = {}; - //@ts-expect-error - streamingRetryRequest(opts); - } catch (err) { - assert(err instanceof Error); - assert.match(err.message, /A request function must be provided/); - done(); - } - }); - }); -});