From a963fb347d345139c32e9c16e2cfd7c01b7f4925 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Istv=C3=A1n=20So=C3=B3s?= Date: Thu, 30 Nov 2023 12:29:00 +0100 Subject: [PATCH] Export /api/packages/ JSON documents to storage bucket (#7086) --- app/lib/frontend/handlers/custom_api.dart | 27 +- app/lib/package/export_api_to_bucket.dart | 310 ++++++++++++++++++ app/lib/search/backend.dart | 24 ++ app/lib/shared/storage.dart | 10 + .../package/export_api_to_bucket_test.dart | 75 +++++ app/test/search/backend_test.dart | 21 +- app/test/shared/test_services.dart | 20 ++ 7 files changed, 446 insertions(+), 41 deletions(-) create mode 100644 app/lib/package/export_api_to_bucket.dart create mode 100644 app/test/package/export_api_to_bucket_test.dart diff --git a/app/lib/frontend/handlers/custom_api.dart b/app/lib/frontend/handlers/custom_api.dart index 1dcd242499..643e23176d 100644 --- a/app/lib/frontend/handlers/custom_api.dart +++ b/app/lib/frontend/handlers/custom_api.dart @@ -8,10 +8,6 @@ import 'dart:io'; import 'package:_pub_shared/data/package_api.dart'; import 'package:_pub_shared/search/search_form.dart'; import 'package:gcloud/storage.dart'; -import 'package:pub_dev/shared/count_topics.dart'; -import 'package:pub_dev/shared/storage.dart'; -import 'package:pub_dev/task/backend.dart'; -import 'package:pub_dev/task/models.dart'; import 'package:shelf/shelf.dart' as shelf; import '../../frontend/request_context.dart'; @@ -20,15 +16,19 @@ import '../../package/models.dart'; import '../../package/name_tracker.dart'; import '../../package/overrides.dart'; import '../../scorecard/backend.dart'; +import '../../search/backend.dart'; import '../../search/search_client.dart'; import '../../search/search_service.dart'; import '../../shared/configuration.dart'; +import '../../shared/count_topics.dart'; import '../../shared/exceptions.dart'; import '../../shared/handlers.dart'; import '../../shared/redis_cache.dart' show cache; +import '../../shared/storage.dart'; import '../../shared/urls.dart' as urls; import '../../shared/utils.dart' show jsonUtf8Encoder; - +import '../../task/backend.dart'; +import '../../task/models.dart'; import 'headers.dart'; /// Handles requests for /api/documentation/ @@ -104,22 +104,7 @@ Future apiPackageNameCompletionDataHandler( 'Client must send "Accept-Encoding: gzip" header'); } - final bytes = await cache.packageNameCompletionDataJsonGz().get(() async { - final rs = await searchClient.search( - ServiceSearchQuery.parse( - tagsPredicate: TagsPredicate.regularSearch(), - limit: 20000, - ), - // Do not cache response at the search client level, as we'll be caching - // it in a processed form much longer. - skipCache: true, - ); - - return gzip.encode(jsonUtf8Encoder.convert({ - 'packages': rs.packageHits.map((p) => p.package).toList(), - })); - }); - + final bytes = await searchBackend.getPackageNameCompletitionDataJsonGz(); return shelf.Response(200, body: bytes, headers: { ...jsonResponseHeaders, 'Content-Encoding': 'gzip', diff --git a/app/lib/package/export_api_to_bucket.dart b/app/lib/package/export_api_to_bucket.dart new file mode 100644 index 0000000000..7e7ef1681d --- /dev/null +++ b/app/lib/package/export_api_to_bucket.dart @@ -0,0 +1,310 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:io'; + +import 'package:basics/basics.dart'; +import 'package:clock/clock.dart'; +import 'package:crypto/crypto.dart'; +import 'package:gcloud/storage.dart'; +import 'package:logging/logging.dart'; +import 'package:meta/meta.dart'; +import 'package:pool/pool.dart'; +import 'package:retry/retry.dart'; + +import '../search/backend.dart'; +import '../shared/datastore.dart'; +import '../shared/storage.dart'; +import '../shared/utils.dart'; +import '../shared/versions.dart'; +import '../task/global_lock.dart'; +import 'backend.dart'; +import 'models.dart'; + +final Logger _logger = Logger('export_api_to_bucket'); + +/// The default concurrency to upload API JSON files to the bucket. +const _defaultBucketUpdateConcurrency = 8; + +/// The default cache timeout for content. +const _pkgApiMaxCacheAge = Duration(minutes: 10); +const _pkgNameCompletitionDataMaxAge = Duration(hours: 8); + +List _apiPkgObjectNames(String package) => [ + '$runtimeVersion/api/packages/$package', + 'current/api/packages/$package', + ]; + +List _apiPkgNameCompletitionDataNames() => [ + '$runtimeVersion/api/package-name-completion-data', + 'current/api/package-name-completion-data', + ]; + +class ApiExporter { + final Bucket _bucket; + final int _concurrency; + final _pkgLastUpdated = {}; + + ApiExporter({ + required Bucket bucket, + int concurrency = _defaultBucketUpdateConcurrency, + }) : _bucket = bucket, + _concurrency = concurrency; + + /// Runs a forever loop and tries to get a global lock. + /// + /// Once it has the claim, it scans the package entities and uploads + /// the package API JSONs to the bucket. + /// Tracks the package updates for the next up-to 24 hours and writes + /// the API JSONs after every few minutes. + /// + /// When other process has the claim, the loop waits a minute before + /// attempting to get the claim. + Future uploadInForeverLoop() async { + final lock = GlobalLock.create( + '$runtimeVersion/package/update-api-bucket', + expiration: Duration(minutes: 20), + ); + while (true) { + try { + await lock.withClaim((claim) async { + await incrementalPkgScanAndUpload(claim); + }); + } catch (e, st) { + _logger.warning('Package API bucket update failed.', e, st); + } + // Wait for 1 minutes for sanity, before trying again. + await Future.delayed(Duration(minutes: 1)); + } + } + + /// Gets and uploads the package name completion data. + Future uploadPkgNameCompletionData() async { + final bytes = await searchBackend.getPackageNameCompletitionDataJsonGz(); + final bytesAndHash = _BytesAndHash(bytes); + for (final objectName in _apiPkgNameCompletitionDataNames()) { + if (await _isSameContent(objectName, bytesAndHash)) { + continue; + } + await uploadWithRetry( + _bucket, + objectName, + bytes.length, + () => Stream.value(bytes), + metadata: ObjectMetadata( + contentType: 'application/json; charset="utf-8"', + contentEncoding: 'gzip', + cacheControl: + 'public, max-age=${_pkgNameCompletitionDataMaxAge.inSeconds}', + ), + ); + } + } + + /// Note: there is no global locking here, the full scan should be called + /// only once every day, and it may be racing against the incremental + /// updates. + @visibleForTesting + Future fullPkgScanAndUpload() async { + final pool = Pool(_concurrency); + final futures = []; + await for (final mp in dbService.query().run()) { + final f = pool.withResource(() => _deletePackageFromBucket(mp.name!)); + futures.add(f); + } + await Future.wait(futures); + futures.clear(); + + await for (final package in dbService.query().run()) { + final f = pool.withResource(() async { + if (package.isVisible) { + await _uploadPackageToBucket(package.name!); + } else { + await _deletePackageFromBucket(package.name!); + } + }); + futures.add(f); + } + await Future.wait(futures); + await pool.close(); + } + + @visibleForTesting + Future incrementalPkgScanAndUpload( + GlobalLockClaim claim, { + Duration sleepDuration = const Duration(minutes: 2), + }) async { + final pool = Pool(_concurrency); + // The claim will be released after a day, another process may + // start to upload the API JSONs from scratch again. + final workUntil = clock.now().add(Duration(days: 1)); + + // start monitoring with a window of 7 days lookback + var lastQueryStarted = clock.now().subtract(Duration(days: 7)); + while (claim.valid) { + final now = clock.now().toUtc(); + if (now.isAfter(workUntil)) { + break; + } + + // clear old entries from last seen cache + _pkgLastUpdated.removeWhere((key, event) => + now.difference(event.updated) > const Duration(hours: 1)); + + lastQueryStarted = now; + final futures = []; + final eventsSince = lastQueryStarted.subtract(Duration(minutes: 5)); + await for (final event in _queryRecentPkgUpdatedEvents(eventsSince)) { + if (!claim.valid) { + break; + } + final f = pool.withResource(() async { + if (!claim.valid) { + return; + } + final last = _pkgLastUpdated[event.package]; + if (last != null && last.updated.isAtOrAfter(event.updated)) { + return; + } + _pkgLastUpdated[event.package] = event; + if (event.isVisible) { + await _uploadPackageToBucket(event.package); + } else { + await _deletePackageFromBucket(event.package); + } + }); + futures.add(f); + } + await Future.wait(futures); + futures.clear(); + await Future.delayed(sleepDuration); + } + await pool.close(); + } + + /// Uploads the package version API response bytes to the bucket, mirroring + /// the endpoint name in the file location. + Future _uploadPackageToBucket(String package) async { + final data = await retry(() => packageBackend.listVersions(package)); + final rawBytes = jsonUtf8Encoder.convert(data.toJson()); + final bytes = gzip.encode(rawBytes); + final bytesAndHash = _BytesAndHash(bytes); + + for (final objectName in _apiPkgObjectNames(package)) { + if (await _isSameContent(objectName, bytesAndHash)) { + continue; + } + + await uploadWithRetry( + _bucket, + objectName, + bytes.length, + () => Stream.value(bytes), + metadata: ObjectMetadata( + contentType: 'application/json; charset="utf-8"', + contentEncoding: 'gzip', + cacheControl: 'public, max-age=${_pkgApiMaxCacheAge.inSeconds}', + ), + ); + } + } + + Future _deletePackageFromBucket(String package) async { + for (final objectName in _apiPkgObjectNames(package)) { + await _bucket.tryDelete(objectName); + } + } + + Stream<_PkgUpdatedEvent> _queryRecentPkgUpdatedEvents(DateTime since) async* { + final q1 = dbService.query() + ..filter('moderated >=', since) + ..order('-moderated'); + yield* q1.run().map((mp) => mp.asPkgUpdatedEvent()); + + final q2 = dbService.query() + ..filter('updated >=', since) + ..order('-updated'); + yield* q2.run().map((p) => p.asPkgUpdatedEvent()); + } + + /// Deletes obsolete runtime-versions from the bucket. + Future deleteObsoleteRuntimeContent() async { + final versions = {}; + + // Objects in the bucket are stored under the following pattern: + // `current/api/` + // `/api/` + // Thus, we list with `/` as delimiter and get a list of runtimeVersions + await for (final d in _bucket.list(prefix: '', delimiter: '/')) { + if (!d.isDirectory) { + _logger.warning( + 'Bucket `${_bucket.bucketName}` should not contain any top-level object: `${d.name}`'); + continue; + } + + // Remove trailing slash from object prefix, to get a runtimeVersion + if (!d.name.endsWith('/')) { + _logger.warning( + 'Unexpected top-level directory name in bucket `${_bucket.bucketName}`: `${d.name}`'); + return; + } + final rtVersion = d.name.substring(0, d.name.length - 1); + if (runtimeVersionPattern.matchAsPrefix(rtVersion) == null) { + continue; + } + + // Check if the runtimeVersion should be GC'ed + if (shouldGCVersion(rtVersion)) { + versions.add(rtVersion); + } + } + + for (final v in versions) { + await deleteBucketFolderRecursively(_bucket, '$v/', concurrency: 4); + } + } + + Future _isSameContent( + String objectName, _BytesAndHash bytesAndHash) async { + final info = await _bucket.tryInfo(objectName); + if (info == null) { + return false; + } + if (info.length != bytesAndHash.length) { + return false; + } + if (bytesAndHash.md5Hash.length != info.md5Hash.length) { + return false; + } + // making sure the timing is fixed + var isSame = true; + for (var i = 0; i < bytesAndHash.md5Hash.length; i++) { + if (bytesAndHash.md5Hash[i] != info.md5Hash[i]) { + isSame = false; + } + } + return isSame; + } +} + +typedef _PkgUpdatedEvent = ({String package, DateTime updated, bool isVisible}); + +extension on ModeratedPackage { + _PkgUpdatedEvent asPkgUpdatedEvent() => + (package: name!, updated: moderated, isVisible: false); +} + +extension on Package { + _PkgUpdatedEvent asPkgUpdatedEvent() => + (package: name!, updated: updated!, isVisible: isVisible); +} + +class _BytesAndHash { + final List bytes; + + _BytesAndHash(this.bytes); + + late final length = bytes.length; + late final md5Hash = md5.convert(bytes).bytes; +} diff --git a/app/lib/search/backend.dart b/app/lib/search/backend.dart index f7c78911ac..5dff13a916 100644 --- a/app/lib/search/backend.dart +++ b/app/lib/search/backend.dart @@ -6,6 +6,7 @@ import 'dart:async'; import 'dart:convert'; import 'dart:io'; +import 'package:_pub_shared/search/search_form.dart'; import 'package:_pub_shared/search/tags.dart'; import 'package:clock/clock.dart'; import 'package:collection/collection.dart'; @@ -18,7 +19,10 @@ import 'package:meta/meta.dart'; import 'package:pana/src/dartdoc/pub_dartdoc_data.dart'; import 'package:pool/pool.dart'; +import 'package:pub_dev/search/search_client.dart'; import 'package:pub_dev/shared/popularity_storage.dart'; +import 'package:pub_dev/shared/redis_cache.dart'; +import 'package:pub_dev/shared/utils.dart'; import 'package:retry/retry.dart'; import '../package/backend.dart'; @@ -475,6 +479,26 @@ class SearchBackend { 'delete-old-search-snapshots cleared $counts entries ($runtimeVersion)'); } + /// Creates the gzipped byte content for the /api/package-name-completion-data endpoint. + Future> getPackageNameCompletitionDataJsonGz() async { + final bytes = await cache.packageNameCompletionDataJsonGz().get(() async { + final rs = await searchClient.search( + ServiceSearchQuery.parse( + tagsPredicate: TagsPredicate.regularSearch(), + limit: 20000, + ), + // Do not cache response at the search client level, as we'll be caching + // it in a processed form much longer. + skipCache: true, + ); + + return gzip.encode(jsonUtf8Encoder.convert({ + 'packages': rs.packageHits.map((p) => p.package).toList(), + })); + }); + return bytes!; + } + Future close() async { _snapshotStorage.close(); _http.close(); diff --git a/app/lib/shared/storage.dart b/app/lib/shared/storage.dart index ce591c0f2e..04315c8393 100644 --- a/app/lib/shared/storage.dart +++ b/app/lib/shared/storage.dart @@ -82,6 +82,16 @@ extension BucketExt on Bucket { } } + /// Deletes [name] if it exists, ignores 404 otherwise. + Future tryDelete(String name) async { + try { + return await delete(name); + } on DetailedApiRequestError catch (e) { + if (e.status == 404) return null; + rethrow; + } + } + Future uploadPublic(String objectName, int length, Stream> Function() openStream, String contentType) { final publicRead = AclEntry(AllUsersScope(), AclPermission.READ); diff --git a/app/test/package/export_api_to_bucket_test.dart b/app/test/package/export_api_to_bucket_test.dart new file mode 100644 index 0000000000..2d85ddcad5 --- /dev/null +++ b/app/test/package/export_api_to_bucket_test.dart @@ -0,0 +1,75 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:convert'; +import 'dart:io'; + +import 'package:clock/clock.dart'; +import 'package:gcloud/storage.dart'; +import 'package:pub_dev/package/export_api_to_bucket.dart'; +import 'package:pub_dev/shared/storage.dart'; +import 'package:pub_dev/shared/versions.dart'; +import 'package:test/test.dart'; + +import '../shared/test_services.dart'; + +void main() { + group('export API to bucket', () { + testWithProfile( + 'export and cleanup', + fn: () async { + await storageService.createBucket('bucket'); + final bucket = storageService.bucket('bucket'); + final exporter = ApiExporter( + bucket: bucket, + concurrency: 2, + ); + await exporter.uploadPkgNameCompletionData(); + await exporter.fullPkgScanAndUpload(); + + final claim = + FakeGlobalLockClaim(clock.now().add(Duration(seconds: 3))); + await exporter.incrementalPkgScanAndUpload( + claim, + sleepDuration: Duration(milliseconds: 300), + ); + await exporter.deleteObsoleteRuntimeContent(); + + final files = await bucket + .list(delimiter: 'bogus-delimiter-for-full-file-list') + .map((e) => e.name) + .toList(); + expect(files.toSet(), { + '$runtimeVersion/api/package-name-completion-data', + 'current/api/package-name-completion-data', + '$runtimeVersion/api/packages/flutter_titanium', + '$runtimeVersion/api/packages/neon', + '$runtimeVersion/api/packages/oxygen', + 'current/api/packages/flutter_titanium', + 'current/api/packages/neon', + 'current/api/packages/oxygen', + }); + + Future readAndDecodeJson(String path) async => json + .decode(utf8.decode(gzip.decode(await bucket.readAsBytes(path)))); + + expect( + await readAndDecodeJson('current/api/packages/neon'), + { + 'name': 'neon', + 'latest': isNotEmpty, + 'versions': hasLength(1), + }, + ); + + expect( + await readAndDecodeJson('current/api/package-name-completion-data'), + { + 'packages': hasLength(3), + }, + ); + }, + ); + }); +} diff --git a/app/test/search/backend_test.dart b/app/test/search/backend_test.dart index 13220cc31f..dc2fbfe9bd 100644 --- a/app/test/search/backend_test.dart +++ b/app/test/search/backend_test.dart @@ -5,7 +5,6 @@ import 'package:clock/clock.dart'; import 'package:pub_dev/search/backend.dart'; import 'package:pub_dev/search/sdk_mem_index.dart'; -import 'package:pub_dev/task/global_lock.dart'; import 'package:test/test.dart'; import '../shared/test_services.dart'; @@ -30,7 +29,7 @@ void main() { var documents = await searchBackend.fetchSnapshotDocuments(); expect(documents, isNull); await searchBackend.doCreateAndUpdateSnapshot( - _FakeGlobalLockClaim(clock.now().add(Duration(seconds: 3))), + FakeGlobalLockClaim(clock.now().add(Duration(seconds: 3))), concurrency: 2, sleepDuration: Duration(milliseconds: 300), ); @@ -39,21 +38,3 @@ void main() { }); }); } - -class _FakeGlobalLockClaim implements GlobalLockClaim { - @override - DateTime expires; - - _FakeGlobalLockClaim(this.expires); - - @override - Future refresh() async { - return true; - } - - @override - Future release() async {} - - @override - bool get valid => expires.isAfter(clock.now()); -} diff --git a/app/test/shared/test_services.dart b/app/test/shared/test_services.dart index bb28743b1f..32b83d759d 100644 --- a/app/test/shared/test_services.dart +++ b/app/test/shared/test_services.dart @@ -3,6 +3,7 @@ // BSD-style license that can be found in the LICENSE file. import 'dart:async'; +import 'package:clock/clock.dart'; import 'package:fake_gcloud/mem_datastore.dart'; import 'package:fake_gcloud/mem_storage.dart'; import 'package:gcloud/db.dart'; @@ -26,6 +27,7 @@ import 'package:pub_dev/shared/logging.dart'; import 'package:pub_dev/shared/redis_cache.dart'; import 'package:pub_dev/shared/versions.dart'; import 'package:pub_dev/task/cloudcompute/fakecloudcompute.dart'; +import 'package:pub_dev/task/global_lock.dart'; import 'package:pub_dev/tool/test_profile/import_source.dart'; import 'package:pub_dev/tool/test_profile/importer.dart'; import 'package:pub_dev/tool/test_profile/models.dart'; @@ -274,3 +276,21 @@ void setupTestsWithAdminTokenIssues(Future Function(PubApiClient client) fn) { await expectApiException(rs, status: 403, code: 'InsufficientPermissions'); }); } + +class FakeGlobalLockClaim implements GlobalLockClaim { + @override + DateTime expires; + + FakeGlobalLockClaim(this.expires); + + @override + Future refresh() async { + return true; + } + + @override + Future release() async {} + + @override + bool get valid => expires.isAfter(clock.now()); +}