Skip to content

Commit

Permalink
airbyte-ci/base-image: cache dockerhub image listing
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere committed Oct 28, 2024
1 parent 170f138 commit 197eba5
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 26 deletions.
3 changes: 3 additions & 0 deletions airbyte-ci/connectors/base_images/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ poetry run mypy base_images --check-untyped-defs
```
## CHANGELOG

### 1.1.0
- Add a cache ttl for base image listing to avoid DockerHub rate limiting.

### 1.0.4
- Upgrade Dagger to `0.13.3`

Expand Down
19 changes: 11 additions & 8 deletions airbyte-ci/connectors/base_images/base_images/utils/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import getpass
import os
import time
import uuid
from typing import List, Tuple

Expand Down Expand Up @@ -35,16 +36,16 @@ class CraneClient:
"gcr.io/go-containerregistry/crane/debug:v0.15.1@sha256:f6ddf8e2c47df889e06e33c3e83b84251ac19c8728a670ff39f2ca9e90c4f905"
)

def __init__(self, dagger_client: dagger.Client, docker_credentials: Tuple[str, str]):
def __init__(self, dagger_client: dagger.Client, docker_credentials: Tuple[str, str], cache_ttl_seconds: int = 0):
self.docker_hub_username_secret = dagger_client.set_secret("DOCKER_HUB_USERNAME", docker_credentials[0])
self.docker_hub_username_password = dagger_client.set_secret("DOCKER_HUB_PASSWORD", docker_credentials[1])

self.bare_container = (
dagger_client.container().from_(self.CRANE_IMAGE_ADDRESS)
# We don't want to cache any subsequent commands that might run in this container
# because we want to have fresh output data every time we run this command.
.with_env_variable("CACHE_BUSTER", str(uuid.uuid4()))
)
if cache_ttl_seconds == 0:
cache_buster = str(uuid.uuid4())
else:
cache_buster = str(int(time.time()) // cache_ttl_seconds)

self.bare_container = dagger_client.container().from_(self.CRANE_IMAGE_ADDRESS).with_env_variable("CACHE_BUSTER", cache_buster)

self.authenticated_container = self.login()

Expand All @@ -56,7 +57,6 @@ def login(self) -> dagger.Container:
)

async def digest(self, repository_and_tag: str) -> str:
console.log(f"Fetching digest for {repository_and_tag}...")
return (await self.authenticated_container.with_exec(["digest", repository_and_tag], use_entrypoint=True).stdout()).strip()

async def ls(self, registry_name: str, repository_name: str) -> List[str]:
Expand Down Expand Up @@ -87,7 +87,10 @@ async def get_all_images(self) -> List[published_image.PublishedImage]:
# We want the digest to uniquely identify the image, so we need to fetch it separately with `crane digest`
available_addresses_without_digest = [f"{repository_address}:{tag}" for tag in all_tags]
available_addresses_with_digest = []
console.log(f"Fetching digests for {len(available_addresses_without_digest)} images...")
# TODO: This is a bottleneck, we should parallelize this
for address in available_addresses_without_digest:
digest = await self.crane_client.digest(address)
available_addresses_with_digest.append(f"{address}@{digest}")
console.log(f"Found digests for {len(available_addresses_with_digest)} images in {repository_address}")
return [published_image.PublishedImage.from_address(address) for address in available_addresses_with_digest]
44 changes: 30 additions & 14 deletions airbyte-ci/connectors/base_images/base_images/version_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,31 +105,39 @@ def get_changelog_entries(ConnectorBaseImageClass: Type[AirbyteConnectorBaseImag

@staticmethod
async def get_all_published_base_images(
dagger_client: dagger.Client, docker_credentials: Tuple[str, str], ConnectorBaseImageClass: Type[AirbyteConnectorBaseImage]
dagger_client: dagger.Client,
docker_credentials: Tuple[str, str],
ConnectorBaseImageClass: Type[AirbyteConnectorBaseImage],
cache_ttl_seconds: int = 0,
) -> List[published_image.PublishedImage]:
"""Returns all the published base images for a given base image version class.
Args:
dagger_client (dagger.Client): The dagger client used to build the registry.
docker_credentials (Tuple[str, str]): The docker credentials used to fetch published images from DockerHub.
ConnectorBaseImageClass (Type[AirbyteConnectorBaseImage]): The base image version class bound to the registry.
cache_ttl_seconds (int, optional): The cache time to live in seconds for crane output. Defaults to 0.
Returns:
List[published_image.PublishedImage]: The published base images for a given base image version class.
"""
crane_client = docker.CraneClient(dagger_client, docker_credentials)
crane_client = docker.CraneClient(dagger_client, docker_credentials, cache_ttl_seconds=cache_ttl_seconds)
remote_registry = docker.RemoteRepository(crane_client, consts.REMOTE_REGISTRY, ConnectorBaseImageClass.repository) # type: ignore
return await remote_registry.get_all_images()

@staticmethod
async def load(
ConnectorBaseImageClass: Type[AirbyteConnectorBaseImage], dagger_client: dagger.Client, docker_credentials: Tuple[str, str]
ConnectorBaseImageClass: Type[AirbyteConnectorBaseImage],
dagger_client: dagger.Client,
docker_credentials: Tuple[str, str],
cache_ttl_seconds: int = 0,
) -> VersionRegistry:
"""Instantiates a registry by fetching available versions from the remote registry and loading the changelog from disk.
Args:
ConnectorBaseImageClass (Type[AirbyteConnectorBaseImage]): The base image version class bound to the registry.
dagger_client (dagger.Client): The dagger client used to build the registry.
docker_credentials (Tuple[str, str]): The docker credentials used to fetch published images from DockerHub.
cache_ttl_seconds (int, optional): The cache time to live in seconds for crane output. Defaults to 0.
Returns:
VersionRegistry: The registry.
"""
Expand All @@ -141,7 +149,7 @@ async def load(

# Instantiate a crane client and a remote registry to fetch published images from DockerHub
published_docker_images = await VersionRegistry.get_all_published_base_images(
dagger_client, docker_credentials, ConnectorBaseImageClass
dagger_client, docker_credentials, ConnectorBaseImageClass, cache_ttl_seconds=cache_ttl_seconds
)

# Build a dict of published images by version number for easier lookup
Expand Down Expand Up @@ -246,16 +254,24 @@ def latest_not_pre_released_published_entry(self) -> Optional[VersionRegistryEnt
return None


async def get_python_registry(dagger_client: dagger.Client, docker_credentials: Tuple[str, str]) -> VersionRegistry:
return await VersionRegistry.load(AirbytePythonConnectorBaseImage, dagger_client, docker_credentials)
async def get_python_registry(
dagger_client: dagger.Client, docker_credentials: Tuple[str, str], cache_ttl_seconds: int = 0
) -> VersionRegistry:
return await VersionRegistry.load(
AirbytePythonConnectorBaseImage, dagger_client, docker_credentials, cache_ttl_seconds=cache_ttl_seconds
)


async def get_manifest_only_registry(dagger_client: dagger.Client, docker_credentials: Tuple[str, str]) -> VersionRegistry:
return await VersionRegistry.load(AirbyteManifestOnlyConnectorBaseImage, dagger_client, docker_credentials)
async def get_manifest_only_registry(
dagger_client: dagger.Client, docker_credentials: Tuple[str, str], cache_ttl_seconds: int = 0
) -> VersionRegistry:
return await VersionRegistry.load(
AirbyteManifestOnlyConnectorBaseImage, dagger_client, docker_credentials, cache_ttl_seconds=cache_ttl_seconds
)


async def get_registry_for_language(
dagger_client: dagger.Client, language: ConnectorLanguage, docker_credentials: Tuple[str, str]
dagger_client: dagger.Client, language: ConnectorLanguage, docker_credentials: Tuple[str, str], cache_ttl_seconds: int = 0
) -> VersionRegistry:
"""Returns the registry for a given language.
It is meant to be used externally to get the registry for a given connector language.
Expand All @@ -264,17 +280,17 @@ async def get_registry_for_language(
dagger_client (dagger.Client): The dagger client used to build the registry.
language (ConnectorLanguage): The connector language.
docker_credentials (Tuple[str, str]): The docker credentials used to fetch published images from DockerHub.
cache_ttl_seconds (int, optional): The cache time to live in seconds for crane output. Defaults to 0.
Raises:
NotImplementedError: Raised if the registry for the given language is not implemented yet.
Returns:
VersionRegistry: The registry for the given language.
"""
if language in [ConnectorLanguage.PYTHON, ConnectorLanguage.LOW_CODE]:
return await get_python_registry(dagger_client, docker_credentials)
return await get_python_registry(dagger_client, docker_credentials, cache_ttl_seconds=cache_ttl_seconds)
elif language is ConnectorLanguage.MANIFEST_ONLY:
return await get_manifest_only_registry(dagger_client, docker_credentials)
return await get_manifest_only_registry(dagger_client, docker_credentials, cache_ttl_seconds=cache_ttl_seconds)
else:
raise NotImplementedError(f"Registry for language {language} is not implemented yet.")

Expand Down
2 changes: 1 addition & 1 deletion airbyte-ci/connectors/base_images/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "airbyte-connectors-base-images"
version = "1.0.4"
version = "1.1.0"
description = "This package is used to generate and publish the base images for Airbyte Connectors."
authors = ["Augustin Lafanechere <augustin@airbyte.io>"]
readme = "README.md"
Expand Down
1 change: 1 addition & 0 deletions airbyte-ci/connectors/pipelines/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,7 @@ airbyte-ci connectors --language=low-code migrate-to-manifest-only

| Version | PR | Description |
| ------- | ---------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------- |
| 4.41.8 | [#47447](https://github.com/airbytehq/airbyte/pull/47447) | Use `cache_ttl` for base image registry listing in `up-to-date`. |
| 4.41.7 | [#47444](https://github.com/airbytehq/airbyte/pull/47444) | Remove redundant `--ignore-connector` error from up-to-date. `--metadata-query` can be used instead. |
| 4.41.6 | [#47308](https://github.com/airbytehq/airbyte/pull/47308) | Connector testing: skip incremental acceptance test when the connector is not released. |
| 4.41.5 | [#47255](https://github.com/airbytehq/airbyte/pull/47255) | Fix `DisableProgressiveRollout` following Dagger API change. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing import TYPE_CHECKING

from jinja2 import Environment, PackageLoader, select_autoescape
from pipelines.airbyte_ci.connectors.build_image.steps.python_connectors import BuildConnectorImages
from pipelines.airbyte_ci.connectors.build_image.steps import run_connector_build
from pipelines.airbyte_ci.connectors.context import ConnectorContext
from pipelines.airbyte_ci.connectors.reports import ConnectorReport
from pipelines.airbyte_ci.steps.base_image import UpdateBaseImageMetadata
Expand Down Expand Up @@ -126,7 +126,7 @@ async def run_connector_up_to_date_pipeline(
# to fill the PR body with the correct information about what exactly got updated.
if create_pull_request:
# Building connector images is also universal across connector technologies.
build_result = await BuildConnectorImages(context).run()
build_result = await run_connector_build(context)
step_results.append(build_result)
dependency_updates: List[DependencyUpdate] = []

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ class NoBaseImageAddressInMetadataError(Exception):


class UpdateBaseImageMetadata(StepModifyingFiles):

BASE_IMAGE_LIST_CACHE_TTL_SECONDS = 60 * 60 * 24 # 1 day

context: ConnectorContext

title = "Upgrade the base image to the latest version in metadata.yaml"
Expand All @@ -45,6 +48,7 @@ async def get_latest_base_image_address(self) -> Optional[str]:
self.dagger_client,
self.context.connector.language,
(self.context.docker_hub_username.value, self.context.docker_hub_password.value),
cache_ttl_seconds=self.BASE_IMAGE_LIST_CACHE_TTL_SECONDS,
)
return version_registry_for_language.latest_not_pre_released_published_entry.published_docker_image.address
except NotImplementedError:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-ci/connectors/pipelines/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "pipelines"
version = "4.41.7"
version = "4.41.8"
description = "Packaged maintained by the connector operations team to perform CI for connectors' pipelines"
authors = ["Airbyte <contact@airbyte.io>"]

Expand Down

0 comments on commit 197eba5

Please sign in to comment.