Skip to content

Commit

Permalink
Draft of what a full sync could look like
Browse files Browse the repository at this point in the history
  • Loading branch information
jonasfj committed Oct 31, 2024
1 parent 596ff08 commit e069030
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 2 deletions.
80 changes: 80 additions & 0 deletions app/lib/package/api_export/export_api_to_bucket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import 'package:gcloud/storage.dart';
import 'package:logging/logging.dart';
import 'package:meta/meta.dart';
import 'package:pool/pool.dart';
import 'package:pub_dev/service/security_advisories/backend.dart';
import 'package:pub_dev/service/security_advisories/sync_security_advisories.dart';
import 'package:pub_dev/shared/parallel_foreach.dart';
import 'package:retry/retry.dart';

import '../../search/backend.dart';
Expand Down Expand Up @@ -78,6 +81,83 @@ class ApiExporter {
.write(await searchBackend.getPackageNameCompletionData());
}

Future<void> fullSync() async {
final invisiblePackageNames = await dbService
.query<ModeratedPackage>()
.run()
.map((mp) => mp.name!)
.toSet();

final allPackageNames = <String>{};
final packageQuery = dbService.query<Package>();
await packageQuery.run().parallelForEach(_concurrency, (pkg) async {
final name = pkg.name!;
if (pkg.isNotVisible) {
invisiblePackageNames.add(name);
return;
}
allPackageNames.add(name);

// TODO: Consider retries around all this logic
await syncPackage(name);
});

final visibilityConflicts =
allPackageNames.intersection(invisiblePackageNames);
if (visibilityConflicts.isNotEmpty) {
// TODO: Shout into logs
}

await _api.garbageCollect(allPackageNames);
}

/// Sync package and into [ExportedApi], this will GC, etc.
///
/// This is intended when:
/// * Running a full background synchronization.
/// * When a change in [Package.updated] is detected (maybe???)
/// * A package is moderated, or other admin action is applied.
Future<void> syncPackage(String package) async {
final versionListing = await packageBackend.listVersions(package);
// TODO: Consider skipping the cache when fetching security advisories
final advisories = await securityAdvisoryBackend.listAdvisoriesResponse(
package,
);

await Future.wait(versionListing.versions.map((v) async {
// TODO: Will v.version work here, is the canonicalized version number?
final (bucket, prefix) =
packageBackend.packageStorage.getBucketAndPrefix(package, v.version);

await _api.package(package).tarball(v.version).copyFrom(bucket, prefix);
}));

await _api.package(package).advisories.write(advisories);
await _api.package(package).versions.write(versionListing);

// TODO: Is this the canonoical version? (probably)
final allVersions = versionListing.versions.map((v) => v.version).toSet();
await _api.package(package).garbageCollect(allVersions);
}

/// Upload a single version of a new package.
///
/// This is intended to be used when a new version of a package has been
/// published.
Future<void> uploadSingleVersion(
String package,
String version,
) async {
final versionListing = await packageBackend.listVersions(package);

// TODO: Will v.version work here, is the canonicalized version number?
final (bucket, prefix) =
packageBackend.packageStorage.getBucketAndPrefix(package, version);
await _api.package(package).tarball(version).copyFrom(bucket, prefix);

await _api.package(package).versions.write(versionListing);
}

/// Note: there is no global locking here, the full scan should be called
/// only once every day, and it may be racing against the incremental
/// updates.
Expand Down
4 changes: 2 additions & 2 deletions app/lib/package/api_export/exported_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ final class ExportedPackage {
/// Interace for writing `/api/archives/<package>-<version>.tar.gz`.
ExportedBlob tarball(String version) => ExportedBlob._(
_owner,
'/api/archives/$_package-$version.tar.gz',
'/api/archives/$_package-${Uri.encodeComponent(version)}.tar.gz',
'$_package-$version.tar.gz',
'application/octet',
Duration(hours: 2),
Expand All @@ -288,7 +288,7 @@ final class ExportedPackage {
return;
}
final version = item.name.without(prefix: pfx, suffix: '.tar.gz');
if (allVersionNumbers.contains(version)) {
if (allVersionNumbers.contains(Uri.decodeComponent(version))) {
return;
}
if (await _owner._bucket.tryInfo(item.name) case final info?) {
Expand Down
9 changes: 9 additions & 0 deletions app/lib/package/tarball_storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ class TarballStorage {
return await _publicBucket.tryInfo(objectName);
}

/// Return canonical bucket and location for the archive for [package] and
/// [version].
(Bucket, String) getBucketAndPrefix(String package, String version) {
return (
_canonicalBucket,
tarballObjectName(package, version),
);
}

/// Returns the publicly available download URL from the storage bucket.
Future<Uri> getPublicDownloadUrl(String package, String version) async {
final object = tarballObjectName(package, Uri.encodeComponent(version));
Expand Down

0 comments on commit e069030

Please sign in to comment.