From 12e82a1ad3ce22160b6799dcb1675a7311b85efb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Istv=C3=A1n=20So=C3=B3s?= Date: Fri, 21 Jul 2023 13:40:47 +0200 Subject: [PATCH] Support retrying Datastore operations. (#168) * Support retrying Datastore operations. * Expose only maxAttempts * add more errors --- CHANGELOG.md | 3 +- lib/datastore.dart | 18 ++++ lib/src/retry_datastore_impl.dart | 159 ++++++++++++++++++++++++++++++ pubspec.yaml | 3 +- test/db_all_e2e_test.dart | 6 +- 5 files changed, 185 insertions(+), 4 deletions(-) create mode 100644 lib/src/retry_datastore_impl.dart diff --git a/CHANGELOG.md b/CHANGELOG.md index 2efd48dc..bb604150 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ -## 0.8.10-wip +## 0.8.10 - Widen the SDK constraint to support Dart 3.0 +- Support retrying Datastore operations. ## 0.8.9 diff --git a/lib/datastore.dart b/lib/datastore.dart index 3b05eeae..53ca4eb2 100644 --- a/lib/datastore.dart +++ b/lib/datastore.dart @@ -12,10 +12,12 @@ library; import 'dart:async'; import 'package:http/http.dart' as http; +import 'package:retry/retry.dart'; import 'common.dart' show Page; import 'service_scope.dart' as ss; import 'src/datastore_impl.dart' show DatastoreImpl; +import 'src/retry_datastore_impl.dart'; const Symbol _datastoreKey = #gcloud.datastore; @@ -391,6 +393,22 @@ abstract class Datastore { return DatastoreImpl(client, project); } + /// Retry Datastore operations where the issue seems to be transient. + /// + /// The [delegate] is the configured [Datastore] implementation that will be + /// used. + /// + /// The operations will be retried at maximum of [maxAttempts]. + factory Datastore.withRetry( + Datastore delegate, { + int? maxAttempts, + }) { + return RetryDatastoreImpl( + delegate, + RetryOptions(maxAttempts: maxAttempts ?? 3), + ); + } + /// Allocate integer IDs for the partially populated [keys] given as argument. /// /// The returned [Key]s will be fully populated with the allocated IDs. diff --git a/lib/src/retry_datastore_impl.dart b/lib/src/retry_datastore_impl.dart new file mode 100644 index 00000000..e57410cf --- /dev/null +++ b/lib/src/retry_datastore_impl.dart @@ -0,0 +1,159 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'package:retry/retry.dart'; + +import '../common.dart'; +import '../datastore.dart' as datastore; + +/// Datastore implementation which retries most operations +class RetryDatastoreImpl implements datastore.Datastore { + final datastore.Datastore _delegate; + final RetryOptions _retryOptions; + + RetryDatastoreImpl(this._delegate, this._retryOptions); + + @override + Future> allocateIds(List keys) async { + return await _retryOptions.retry( + () => _delegate.allocateIds(keys), + retryIf: _retryIf, + ); + } + + @override + Future beginTransaction({ + bool crossEntityGroup = false, + }) async { + return await _retryOptions.retry( + () => _delegate.beginTransaction(crossEntityGroup: crossEntityGroup), + retryIf: _retryIf, + ); + } + + @override + Future commit({ + List inserts = const [], + List autoIdInserts = const [], + List deletes = const [], + datastore.Transaction? transaction, + }) async { + Future fn() async { + if (transaction == null) { + return await _delegate.commit( + inserts: inserts, + autoIdInserts: autoIdInserts, + deletes: deletes, + ); + } else { + return await _delegate.commit( + inserts: inserts, + autoIdInserts: autoIdInserts, + deletes: deletes, + transaction: transaction, + ); + } + } + + final shouldNotRetry = autoIdInserts.isNotEmpty && transaction == null; + if (shouldNotRetry) { + return await fn(); + } else { + return await _retryOptions.retry(fn, retryIf: _retryIf); + } + } + + @override + Future> lookup( + List keys, { + datastore.Transaction? transaction, + }) async { + return await _retryOptions.retry( + () async { + if (transaction == null) { + return await _delegate.lookup(keys); + } else { + return await _delegate.lookup(keys, transaction: transaction); + } + }, + retryIf: _retryIf, + ); + } + + @override + Future> query( + datastore.Query query, { + datastore.Partition? partition, + datastore.Transaction? transaction, + }) async { + Future> fn() async { + if (partition != null && transaction != null) { + return await _delegate.query( + query, + partition: partition, + transaction: transaction, + ); + } else if (partition != null) { + return await _delegate.query(query, partition: partition); + } else if (transaction != null) { + return await _delegate.query( + query, + transaction: transaction, + ); + } else { + return await _delegate.query(query); + } + } + + return await _retryOptions.retry( + () async => _RetryPage(await fn(), _retryOptions), + retryIf: _retryIf, + ); + } + + @override + Future rollback(datastore.Transaction transaction) async { + return await _retryOptions.retry( + () => _delegate.rollback(transaction), + retryIf: _retryIf, + ); + } +} + +class _RetryPage implements Page { + final Page _delegate; + final RetryOptions _retryOptions; + + _RetryPage(this._delegate, this._retryOptions); + + @override + bool get isLast => _delegate.isLast; + + @override + List get items => _delegate.items; + + @override + Future> next({int? pageSize}) async { + return await _retryOptions.retry( + () async { + if (pageSize == null) { + return await _delegate.next(); + } else { + return await _delegate.next(pageSize: pageSize); + } + }, + retryIf: _retryIf, + ); + } +} + +bool _retryIf(Exception e) { + if (e is datastore.TransactionAbortedError || + e is datastore.NeedIndexError || + e is datastore.QuotaExceededError || + e is datastore.PermissionDeniedError) { + return false; + } + return true; +} diff --git a/pubspec.yaml b/pubspec.yaml index e5e48a3f..7a58f69e 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,5 +1,5 @@ name: gcloud -version: 0.8.10-wip +version: 0.8.10 description: >- High level idiomatic Dart API for Google Cloud Storage, Pub-Sub and Datastore. repository: https://github.com/dart-lang/gcloud @@ -16,6 +16,7 @@ dependencies: googleapis: '>=3.0.0 <12.0.0' http: '>=0.13.5 <2.0.0' meta: ^1.3.0 + retry: ^3.1.1 dev_dependencies: dart_flutter_team_lints: ^1.0.0 diff --git a/test/db_all_e2e_test.dart b/test/db_all_e2e_test.dart index f66f5156..af37670f 100644 --- a/test/db_all_e2e_test.dart +++ b/test/db_all_e2e_test.dart @@ -10,6 +10,7 @@ library gcloud.test.db_all_test; import 'dart:async'; import 'dart:io'; +import 'package:gcloud/datastore.dart'; import 'package:gcloud/db.dart' as db; import 'package:gcloud/src/datastore_impl.dart' as datastore_impl; import 'package:http/http.dart'; @@ -25,12 +26,13 @@ Future main() async { var now = DateTime.now().millisecondsSinceEpoch; var namespace = '${Platform.operatingSystem}$now'; - late datastore_impl.DatastoreImpl datastore; + late Datastore datastore; late db.DatastoreDB datastoreDB; Client? client; await withAuthClient(scopes, (String project, httpClient) async { - datastore = datastore_impl.DatastoreImpl(httpClient, project); + datastore = + Datastore.withRetry(datastore_impl.DatastoreImpl(httpClient, project)); datastoreDB = db.DatastoreDB(datastore); client = httpClient; });