Skip to content

Commit

Permalink
feat: allow UnhandledPromiseRejection errors in BulkWriter if no erro…
Browse files Browse the repository at this point in the history
…r handler is specified (#1572)
  • Loading branch information
Brian Chen authored Jul 30, 2021
1 parent 8b38f32 commit e862ac8
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 18 deletions.
49 changes: 32 additions & 17 deletions dev/src/bulk-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,16 @@ export class BulkWriter {
*/
private _bufferedOperations: Array<BufferedOperation> = [];

/**
* Whether a custom error handler has been set. BulkWriter only swallows
* errors if an error handler is set. Otherwise, an UnhandledPromiseRejection
* is thrown by Node if an operation promise is rejected without being
* handled.
* @private
* @internal
*/
private _errorHandlerSet = false;

// Visible for testing.
_getBufferedOperationsCount(): number {
return this._bufferedOperations.length;
Expand Down Expand Up @@ -555,11 +565,9 @@ export class BulkWriter {
data: T
): Promise<WriteResult> {
this._verifyNotClosed();
const op = this._enqueue(documentRef, 'create', bulkCommitBatch =>
return this._enqueue(documentRef, 'create', bulkCommitBatch =>
bulkCommitBatch.create(documentRef, data)
);
silencePromise(op);
return op;
}

/**
Expand Down Expand Up @@ -595,11 +603,9 @@ export class BulkWriter {
precondition?: firestore.Precondition
): Promise<WriteResult> {
this._verifyNotClosed();
const op = this._enqueue(documentRef, 'delete', bulkCommitBatch =>
return this._enqueue(documentRef, 'delete', bulkCommitBatch =>
bulkCommitBatch.delete(documentRef, precondition)
);
silencePromise(op);
return op;
}

set<T>(
Expand Down Expand Up @@ -652,11 +658,9 @@ export class BulkWriter {
options?: firestore.SetOptions
): Promise<WriteResult> {
this._verifyNotClosed();
const op = this._enqueue(documentRef, 'set', bulkCommitBatch =>
return this._enqueue(documentRef, 'set', bulkCommitBatch =>
bulkCommitBatch.set(documentRef, data, options)
);
silencePromise(op);
return op;
}

/**
Expand Down Expand Up @@ -708,11 +712,9 @@ export class BulkWriter {
>
): Promise<WriteResult> {
this._verifyNotClosed();
const op = this._enqueue(documentRef, 'update', bulkCommitBatch =>
return this._enqueue(documentRef, 'update', bulkCommitBatch =>
bulkCommitBatch.update(documentRef, dataOrField, ...preconditionOrValues)
);
silencePromise(op);
return op;
}

/**
Expand Down Expand Up @@ -793,6 +795,7 @@ export class BulkWriter {
* });
*/
onWriteError(shouldRetryCallback: (error: BulkWriterError) => boolean): void {
this._errorHandlerSet = true;
this._errorFn = shouldRetryCallback;
}

Expand Down Expand Up @@ -975,11 +978,23 @@ export class BulkWriter {
this._successFn.bind(this)
);

// Swallow the error if the developer has set an error listener. This
// prevents UnhandledPromiseRejections from being thrown if a floating
// BulkWriter operation promise fails when an error handler is specified.
//
// This is done here in order to chain the caught promise onto `lastOp`,
// which ensures that flush() resolves after the operation promise.
const userPromise = bulkWriterOp.promise.catch(err => {
if (!this._errorHandlerSet) {
throw err;
} else {
return bulkWriterOp.promise;
}
});

// Advance the `_lastOp` pointer. This ensures that `_lastOp` only resolves
// when both the previous and the current write resolves.
this._lastOp = this._lastOp.then(() =>
silencePromise(bulkWriterOp.promise)
);
// when both the previous and the current write resolve.
this._lastOp = this._lastOp.then(() => silencePromise(userPromise));

// Schedule the operation if the BulkWriter has fewer than the maximum
// number of allowed pending operations, or add the operation to the
Expand All @@ -999,7 +1014,7 @@ export class BulkWriter {
// Chain the BulkWriter operation promise with the buffer processing logic
// in order to ensure that it runs and that subsequent operations are
// enqueued before the next batch is scheduled in `_sendBatch()`.
return bulkWriterOp.promise
return userPromise
.then(res => {
this._pendingOpsCount--;
this._processBufferedOps();
Expand Down
29 changes: 28 additions & 1 deletion dev/test/bulk-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import {
DEFAULT_MAXIMUM_OPS_PER_SECOND_LIMIT,
RETRY_MAX_BATCH_SIZE,
} from '../src/bulk-writer';
import {Deferred} from '../src/util';
import {
ApiOverride,
create,
Expand Down Expand Up @@ -391,7 +392,30 @@ describe('BulkWriter', () => {
return bulkWriter.close().then(async () => verifyOpCount(1));
});

it('swallows UnhandledPromiseRejections even if the error is not caught', async () => {
it('throws UnhandledPromiseRejections if no error handler is passed in', async () => {
let errorThrown = false;
const unhandledDeferred = new Deferred<void>();
process.on('unhandledRejection', () => {
errorThrown = true;
unhandledDeferred.resolve();
});

const bulkWriter = await instantiateInstance([
{
request: createRequest([setOp('doc', 'bar')]),
response: failedResponse(),
},
]);

const doc = firestore.doc('collectionId/doc');
bulkWriter.set(doc, {foo: 'bar'});

await bulkWriter.close();
await unhandledDeferred.promise;
expect(errorThrown).to.be.true;
});

it('swallows UnhandledPromiseRejections if an error handler is passed in', async () => {
const bulkWriter = await instantiateInstance([
{
request: createRequest([setOp('doc', 'bar')]),
Expand All @@ -401,6 +425,9 @@ describe('BulkWriter', () => {

const doc = firestore.doc('collectionId/doc');
bulkWriter.set(doc, {foo: 'bar'});
// Set the error handler after calling set() to ensure that the check is
// performed when the promise resolves.
bulkWriter.onWriteError(() => false);
return bulkWriter.close();
});

Expand Down

0 comments on commit e862ac8

Please sign in to comment.