Skip to content

Commit

Permalink
Draft of what a full sync could look like
Browse files Browse the repository at this point in the history
  • Loading branch information
jonasfj committed Nov 6, 2024
1 parent 1bf2454 commit fb3a1f7
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 0 deletions.
116 changes: 116 additions & 0 deletions app/lib/package/api_export/export_api_to_bucket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import 'package:gcloud/storage.dart';
import 'package:logging/logging.dart';
import 'package:meta/meta.dart';
import 'package:pool/pool.dart';
import 'package:pub_dev/service/security_advisories/backend.dart';
import 'package:pub_dev/shared/parallel_foreach.dart';
import 'package:retry/retry.dart';

import '../../search/backend.dart';
Expand Down Expand Up @@ -78,6 +80,120 @@ class ApiExporter {
.write(await searchBackend.getPackageNameCompletionData());
}

Future<void> fullSync() async {
final invisiblePackageNames = await dbService
.query<ModeratedPackage>()
.run()
.map((mp) => mp.name!)
.toSet();

final allPackageNames = <String>{};
final packageQuery = dbService.query<Package>();
await packageQuery.run().parallelForEach(_concurrency, (pkg) async {
final name = pkg.name!;
if (pkg.isNotVisible) {
invisiblePackageNames.add(name);
return;
}
allPackageNames.add(name);

// TODO: Consider retries around all this logic
await syncPackage(name);
});

final visibilityConflicts =
allPackageNames.intersection(invisiblePackageNames);
if (visibilityConflicts.isNotEmpty) {
// TODO: Shout into logs
}

await _api.garbageCollect(allPackageNames);
}

/// Sync package and into [ExportedApi], this will GC, etc.
///
/// This is intended when:
/// * Running a full background synchronization.
/// * When a change in [Package.updated] is detected (maybe???)
/// * A package is moderated, or other admin action is applied.
Future<void> syncPackage(String package) async {
final versionListing = await packageBackend.listVersions(package);
// TODO: Consider skipping the cache when fetching security advisories
final advisories = await securityAdvisoryBackend.listAdvisoriesResponse(
package,
);

await Future.wait(versionListing.versions.map((v) async {
final version = v.version;

// TODO: Will v.version work here, is the canonicalized version number?
final absoluteObjectName =
packageBackend.tarballStorage.getCanonicalBucketAbsoluteObjectName(
package,
version,
);
final info =
await packageBackend.tarballStorage.getCanonicalBucketArchiveInfo(
package,
version,
);
if (info == null) {
throw AssertionError(
'Expected an archive for "$package" and "$version" at '
'"$absoluteObjectName"',
);
}

await _api.package(package).tarball(version).copyFrom(
absoluteObjectName,
info,
);
}));

await _api.package(package).advisories.write(advisories);
await _api.package(package).versions.write(versionListing);

// TODO: Is this the canonoical version? (probably)
final allVersions = versionListing.versions.map((v) => v.version).toSet();
await _api.package(package).garbageCollect(allVersions);
}

/// Upload a single version of a new package.
///
/// This is intended to be used when a new version of a package has been
/// published.
Future<void> uploadSingleVersion(
String package,
String version,
) async {
final versionListing = await packageBackend.listVersions(package);

// TODO: Will v.version work here, is the canonicalized version number?
final absoluteObjectName =
packageBackend.tarballStorage.getCanonicalBucketAbsoluteObjectName(
package,
version,
);
final info =
await packageBackend.tarballStorage.getCanonicalBucketArchiveInfo(
package,
version,
);
if (info == null) {
throw AssertionError(
'Expected an archive for "$package" and "$version" at '
'"$absoluteObjectName"',
);
}

await _api.package(package).tarball(version).copyFrom(
absoluteObjectName,
info,
);

await _api.package(package).versions.write(versionListing);
}

/// Note: there is no global locking here, the full scan should be called
/// only once every day, and it may be racing against the incremental
/// updates.
Expand Down
8 changes: 8 additions & 0 deletions app/lib/package/tarball_storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ class TarballStorage {
return await _canonicalBucket.tryInfo(objectName);
}

/// Gets `gs:/<bucket>/<objectName>` for [package] and [version] in the
/// canonical bucket.
///
/// Returns the absolute objectName on the form created by
/// [Bucket.absoluteObjectName].
String getCanonicalBucketAbsoluteObjectName(String package, String version) =>
_canonicalBucket.absoluteObjectName(tarballObjectName(package, version));

/// Gets the object info of the archive file from the public bucket.
Future<ObjectInfo?> getPublicBucketArchiveInfo(
String package, String version) async {
Expand Down

0 comments on commit fb3a1f7

Please sign in to comment.