Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export /api/packages/<pkg> JSON documents to storage bucket #7086

Merged
merged 7 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 6 additions & 21 deletions app/lib/frontend/handlers/custom_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@ import 'dart:io';
import 'package:_pub_shared/data/package_api.dart';
import 'package:_pub_shared/search/search_form.dart';
import 'package:gcloud/storage.dart';
import 'package:pub_dev/shared/count_topics.dart';
import 'package:pub_dev/shared/storage.dart';
import 'package:pub_dev/task/backend.dart';
import 'package:pub_dev/task/models.dart';
import 'package:shelf/shelf.dart' as shelf;

import '../../frontend/request_context.dart';
Expand All @@ -20,15 +16,19 @@ import '../../package/models.dart';
import '../../package/name_tracker.dart';
import '../../package/overrides.dart';
import '../../scorecard/backend.dart';
import '../../search/backend.dart';
import '../../search/search_client.dart';
import '../../search/search_service.dart';
import '../../shared/configuration.dart';
import '../../shared/count_topics.dart';
import '../../shared/exceptions.dart';
import '../../shared/handlers.dart';
import '../../shared/redis_cache.dart' show cache;
import '../../shared/storage.dart';
import '../../shared/urls.dart' as urls;
import '../../shared/utils.dart' show jsonUtf8Encoder;

import '../../task/backend.dart';
import '../../task/models.dart';
import 'headers.dart';

/// Handles requests for /api/documentation/<package>
Expand Down Expand Up @@ -104,22 +104,7 @@ Future<shelf.Response> apiPackageNameCompletionDataHandler(
'Client must send "Accept-Encoding: gzip" header');
}

final bytes = await cache.packageNameCompletionDataJsonGz().get(() async {
final rs = await searchClient.search(
ServiceSearchQuery.parse(
tagsPredicate: TagsPredicate.regularSearch(),
limit: 20000,
),
// Do not cache response at the search client level, as we'll be caching
// it in a processed form much longer.
skipCache: true,
);

return gzip.encode(jsonUtf8Encoder.convert({
'packages': rs.packageHits.map((p) => p.package).toList(),
}));
});

final bytes = await searchBackend.getPackageNameCompletitionDataJsonGz();
return shelf.Response(200, body: bytes, headers: {
...jsonResponseHeaders,
'Content-Encoding': 'gzip',
Expand Down
310 changes: 310 additions & 0 deletions app/lib/package/export_api_to_bucket.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,310 @@
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:io';

import 'package:basics/basics.dart';
import 'package:clock/clock.dart';
import 'package:crypto/crypto.dart';
import 'package:gcloud/storage.dart';
import 'package:logging/logging.dart';
import 'package:meta/meta.dart';
import 'package:pool/pool.dart';
import 'package:retry/retry.dart';

import '../search/backend.dart';
import '../shared/datastore.dart';
import '../shared/storage.dart';
import '../shared/utils.dart';
import '../shared/versions.dart';
import '../task/global_lock.dart';
import 'backend.dart';
import 'models.dart';

final Logger _logger = Logger('export_api_to_bucket');

/// The default concurrency to upload API JSON files to the bucket.
const _defaultBucketUpdateConcurrency = 8;

/// The default cache timeout for content.
const _pkgApiMaxCacheAge = Duration(minutes: 10);
const _pkgNameCompletitionDataMaxAge = Duration(hours: 8);

List<String> _apiPkgObjectNames(String package) => [
'$runtimeVersion/api/packages/$package',
'current/api/packages/$package',
isoos marked this conversation as resolved.
Show resolved Hide resolved
];

List<String> _apiPkgNameCompletitionDataNames() => [
'$runtimeVersion/api/package-name-completion-data',
'current/api/package-name-completion-data',
];

class ApiExporter {
final Bucket _bucket;
final int _concurrency;
final _pkgLastUpdated = <String, _PkgUpdatedEvent>{};

ApiExporter({
required Bucket bucket,
int concurrency = _defaultBucketUpdateConcurrency,
}) : _bucket = bucket,
_concurrency = concurrency;

/// Runs a forever loop and tries to get a global lock.
///
/// Once it has the claim, it scans the package entities and uploads
/// the package API JSONs to the bucket.
/// Tracks the package updates for the next up-to 24 hours and writes
/// the API JSONs after every few minutes.
///
/// When other process has the claim, the loop waits a minute before
/// attempting to get the claim.
Future<Never> uploadInForeverLoop() async {
final lock = GlobalLock.create(
'$runtimeVersion/package/update-api-bucket',
expiration: Duration(minutes: 20),
);
while (true) {
try {
await lock.withClaim((claim) async {
await incrementalPkgScanAndUpload(claim);
});
} catch (e, st) {
_logger.warning('Package API bucket update failed.', e, st);
}
// Wait for 1 minutes for sanity, before trying again.
await Future.delayed(Duration(minutes: 1));
}
}

/// Gets and uploads the package name completion data.
Future<void> uploadPkgNameCompletionData() async {
final bytes = await searchBackend.getPackageNameCompletitionDataJsonGz();
final bytesAndHash = _BytesAndHash(bytes);
for (final objectName in _apiPkgNameCompletitionDataNames()) {
if (await _isSameContent(objectName, bytesAndHash)) {
continue;
}
await uploadWithRetry(
_bucket,
objectName,
bytes.length,
() => Stream.value(bytes),
metadata: ObjectMetadata(
contentType: 'application/json; charset="utf-8"',
contentEncoding: 'gzip',
cacheControl:
'public, max-age=${_pkgNameCompletitionDataMaxAge.inSeconds}',
),
);
}
}

/// Note: there is no global locking here, the full scan should be called
/// only once every day, and it may be racing against the incremental
/// updates.
@visibleForTesting
Future<void> fullPkgScanAndUpload() async {
final pool = Pool(_concurrency);
final futures = <Future>[];
await for (final mp in dbService.query<ModeratedPackage>().run()) {
final f = pool.withResource(() => _deletePackageFromBucket(mp.name!));
futures.add(f);
}
await Future.wait(futures);
futures.clear();

await for (final package in dbService.query<Package>().run()) {
final f = pool.withResource(() async {
if (package.isVisible) {
await _uploadPackageToBucket(package.name!);
} else {
await _deletePackageFromBucket(package.name!);
}
});
futures.add(f);
}
await Future.wait(futures);
await pool.close();
}

@visibleForTesting
Future<void> incrementalPkgScanAndUpload(
GlobalLockClaim claim, {
Duration sleepDuration = const Duration(minutes: 2),
}) async {
final pool = Pool(_concurrency);
// The claim will be released after a day, another process may
// start to upload the API JSONs from scratch again.
final workUntil = clock.now().add(Duration(days: 1));

// start monitoring with a window of 7 days lookback
var lastQueryStarted = clock.now().subtract(Duration(days: 7));
while (claim.valid) {
final now = clock.now().toUtc();
if (now.isAfter(workUntil)) {
break;
}

// clear old entries from last seen cache
_pkgLastUpdated.removeWhere((key, event) =>
now.difference(event.updated) > const Duration(hours: 1));

lastQueryStarted = now;
final futures = <Future>[];
final eventsSince = lastQueryStarted.subtract(Duration(minutes: 5));
await for (final event in _queryRecentPkgUpdatedEvents(eventsSince)) {
if (!claim.valid) {
break;
}
final f = pool.withResource(() async {
if (!claim.valid) {
return;
}
final last = _pkgLastUpdated[event.package];
if (last != null && last.updated.isAtOrAfter(event.updated)) {
return;
}
_pkgLastUpdated[event.package] = event;
if (event.isVisible) {
await _uploadPackageToBucket(event.package);
} else {
await _deletePackageFromBucket(event.package);
}
});
futures.add(f);
}
await Future.wait(futures);
futures.clear();
await Future.delayed(sleepDuration);
}
await pool.close();
}

/// Uploads the package version API response bytes to the bucket, mirroring
/// the endpoint name in the file location.
Future<void> _uploadPackageToBucket(String package) async {
final data = await retry(() => packageBackend.listVersions(package));
final rawBytes = jsonUtf8Encoder.convert(data.toJson());
final bytes = gzip.encode(rawBytes);
final bytesAndHash = _BytesAndHash(bytes);

isoos marked this conversation as resolved.
Show resolved Hide resolved
for (final objectName in _apiPkgObjectNames(package)) {
if (await _isSameContent(objectName, bytesAndHash)) {
continue;
}

await uploadWithRetry(
_bucket,
objectName,
bytes.length,
() => Stream.value(bytes),
metadata: ObjectMetadata(
contentType: 'application/json; charset="utf-8"',
contentEncoding: 'gzip',
cacheControl: 'public, max-age=${_pkgApiMaxCacheAge.inSeconds}',
),
);
}
}

Future<void> _deletePackageFromBucket(String package) async {
for (final objectName in _apiPkgObjectNames(package)) {
await _bucket.tryDelete(objectName);
}
}

Stream<_PkgUpdatedEvent> _queryRecentPkgUpdatedEvents(DateTime since) async* {
final q1 = dbService.query<ModeratedPackage>()
..filter('moderated >=', since)
..order('-moderated');
yield* q1.run().map((mp) => mp.asPkgUpdatedEvent());

final q2 = dbService.query<Package>()
..filter('updated >=', since)
..order('-updated');
yield* q2.run().map((p) => p.asPkgUpdatedEvent());
}

/// Deletes obsolete runtime-versions from the bucket.
Future<void> deleteObsoleteRuntimeContent() async {
final versions = <String>{};

// Objects in the bucket are stored under the following pattern:
// `current/api/<package>`
// `<runtimeVersion>/api/<package>`
// Thus, we list with `/` as delimiter and get a list of runtimeVersions
await for (final d in _bucket.list(prefix: '', delimiter: '/')) {
if (!d.isDirectory) {
_logger.warning(
'Bucket `${_bucket.bucketName}` should not contain any top-level object: `${d.name}`');
continue;
}

// Remove trailing slash from object prefix, to get a runtimeVersion
if (!d.name.endsWith('/')) {
_logger.warning(
'Unexpected top-level directory name in bucket `${_bucket.bucketName}`: `${d.name}`');
return;
}
final rtVersion = d.name.substring(0, d.name.length - 1);
if (runtimeVersionPattern.matchAsPrefix(rtVersion) == null) {
continue;
}

// Check if the runtimeVersion should be GC'ed
if (shouldGCVersion(rtVersion)) {
versions.add(rtVersion);
}
}

for (final v in versions) {
await deleteBucketFolderRecursively(_bucket, '$v/', concurrency: 4);
}
}

Future<bool> _isSameContent(
String objectName, _BytesAndHash bytesAndHash) async {
final info = await _bucket.tryInfo(objectName);
if (info == null) {
return false;
}
if (info.length != bytesAndHash.length) {
return false;
}
if (bytesAndHash.md5Hash.length != info.md5Hash.length) {
return false;
}
// making sure the timing is fixed
var isSame = true;
for (var i = 0; i < bytesAndHash.md5Hash.length; i++) {
if (bytesAndHash.md5Hash[i] != info.md5Hash[i]) {
isSame = false;
}
}
return isSame;
}
}

typedef _PkgUpdatedEvent = ({String package, DateTime updated, bool isVisible});

extension on ModeratedPackage {
_PkgUpdatedEvent asPkgUpdatedEvent() =>
(package: name!, updated: moderated, isVisible: false);
}

extension on Package {
_PkgUpdatedEvent asPkgUpdatedEvent() =>
(package: name!, updated: updated!, isVisible: isVisible);
}

class _BytesAndHash {
final List<int> bytes;

_BytesAndHash(this.bytes);

late final length = bytes.length;
late final md5Hash = md5.convert(bytes).bytes;
}
Loading