diff --git a/changes/2341.feature.md b/changes/2341.feature.md new file mode 100644 index 0000000000..cd72c0dbdb --- /dev/null +++ b/changes/2341.feature.md @@ -0,0 +1 @@ +Support GitHub Container Registry \ No newline at end of file diff --git a/src/ai/backend/manager/container_registry/__init__.py b/src/ai/backend/manager/container_registry/__init__.py index a3cf414d1f..1a4ea41a9e 100644 --- a/src/ai/backend/manager/container_registry/__init__.py +++ b/src/ai/backend/manager/container_registry/__init__.py @@ -28,6 +28,10 @@ def get_container_registry_cls(registry_info: Mapping[str, Any]) -> Type[BaseCon from .harbor import HarborRegistry_v2 cr_cls = HarborRegistry_v2 + elif registry_type == "github": + from .github import GitHubRegistry + + cr_cls = GitHubRegistry elif registry_type == "local": from .local import LocalRegistry diff --git a/src/ai/backend/manager/container_registry/base.py b/src/ai/backend/manager/container_registry/base.py index 87b139ebc1..7fa8b921eb 100644 --- a/src/ai/backend/manager/container_registry/base.py +++ b/src/ai/backend/manager/container_registry/base.py @@ -6,7 +6,7 @@ from abc import ABCMeta, abstractmethod from contextlib import asynccontextmanager as actxmgr from contextvars import ContextVar -from typing import Any, AsyncIterator, Dict, Final, Mapping, Optional, cast +from typing import Any, AsyncIterator, Dict, Final, Mapping, Optional, Sequence, cast import aiohttp import aiotools @@ -178,7 +178,6 @@ async def _scan_image( sess: aiohttp.ClientSession, image: str, ) -> None: - log.info("_scan_image()") rqst_args = await registry_login( sess, self.registry_url, @@ -217,7 +216,6 @@ async def _scan_tag( image: str, tag: str, ) -> None: - manifests = {} async with concurrency_sema.get(): rqst_args["headers"]["Accept"] = self.MEDIA_TYPE_DOCKER_MANIFEST_LIST async with sess.get( @@ -230,62 +228,163 @@ async def _scan_tag( content_type = resp.headers["Content-Type"] resp.raise_for_status() resp_json = await resp.json() - match content_type: - case self.MEDIA_TYPE_DOCKER_MANIFEST_LIST: - manifest_list = resp_json["manifests"] - request_type = self.MEDIA_TYPE_DOCKER_MANIFEST - case self.MEDIA_TYPE_OCI_INDEX: - manifest_list = [ - item - for item in resp_json["manifests"] - if "annotations" not in item # skip attestation manifests - ] - request_type = self.MEDIA_TYPE_OCI_MANIFEST - case _: - log.warn("Unknown content type: {}", content_type) - raise RuntimeError( - "The registry does not support the standard way of " - "listing multiarch images." - ) - rqst_args["headers"]["Accept"] = request_type - for manifest in manifest_list: - platform_arg = ( - f"{manifest['platform']['os']}/{manifest['platform']['architecture']}" - ) - if variant := manifest["platform"].get("variant", None): - platform_arg += f"/{variant}" - architecture = manifest["platform"]["architecture"] - architecture = arch_name_aliases.get(architecture, architecture) - async with sess.get( - self.registry_url / f"v2/{image}/manifests/{manifest['digest']}", **rqst_args - ) as resp: - data = await resp.json() - config_digest = data["config"]["digest"] - size_bytes = sum(layer["size"] for layer in data["layers"]) + data["config"]["size"] - async with sess.get( - self.registry_url / f"v2/{image}/blobs/{config_digest}", **rqst_args - ) as resp: - resp.raise_for_status() - data = json.loads(await resp.read()) - labels = {} - # we should favor `config` instead of `container_config` since `config` can contain additional datas - # set when commiting image via `--change` flag - if _config_labels := data.get("config", {}).get("Labels"): - labels = _config_labels - elif _container_config_labels := data.get("container_config", {}).get("Labels"): - labels = _container_config_labels - - if not labels: - log.warning( - "Labels section not found on image {}:{}/{}", image, tag, architecture - ) - manifests[architecture] = { - "size": size_bytes, - "labels": labels, - "digest": config_digest, - } - await self._read_manifest(image, tag, manifests) + async with aiotools.TaskGroup() as tg: + match content_type: + case self.MEDIA_TYPE_DOCKER_MANIFEST: + await self._process_docker_v2_image( + tg, sess, rqst_args, image, tag, resp_json + ) + case self.MEDIA_TYPE_DOCKER_MANIFEST_LIST: + await self._process_docker_v2_multiplatform_image( + tg, sess, rqst_args, image, tag, resp_json + ) + case self.MEDIA_TYPE_OCI_INDEX: + await self._process_oci_index( + tg, sess, rqst_args, image, tag, resp_json + ) + case _: + log.warn("Unknown content type: {}", content_type) + raise RuntimeError( + "The registry does not support the standard way of " + "listing multiarch images." + ) + + async def _read_manifest_list( + self, + sess: aiohttp.ClientSession, + manifest_list: Sequence[Any], + rqst_args: Mapping[str, Any], + image: str, + tag: str, + ) -> None: + """ + Understands images defined under [OCI image manifest](https://github.com/opencontainers/image-spec/blob/main/manifest.md#example-image-manifest) or + [Docker image manifest list](https://github.com/openshift/docker-distribution/blob/master/docs/spec/manifest-v2-2.md#example-manifest-list) + and imports Backend.AI compatible images. + """ + manifests = {} + for manifest in manifest_list: + platform_arg = f"{manifest['platform']['os']}/{manifest['platform']['architecture']}" + if variant := manifest["platform"].get("variant", None): + platform_arg += f"/{variant}" + architecture = manifest["platform"]["architecture"] + architecture = arch_name_aliases.get(architecture, architecture) + + async with sess.get( + self.registry_url / f"v2/{image}/manifests/{manifest['digest']}", + **rqst_args, + ) as resp: + manifest_info = await resp.json() + + manifests[architecture] = await self._preprocess_manifest( + sess, manifest_info, rqst_args, image + ) + + if not manifests[architecture]["labels"]: + log.warning("Labels section not found on image {}:{}/{}", image, tag, architecture) + + await self._read_manifest(image, tag, manifests) + + async def _preprocess_manifest( + self, + sess: aiohttp.ClientSession, + manifest: Mapping[str, Any], + rqst_args: Mapping[str, Any], + image: str, + ) -> dict[str, Any]: + """ + Extracts informations from + [Docker iamge manifest](https://github.com/openshift/docker-distribution/blob/master/docs/spec/manifest-v2-2.md#example-image-manifest) + required by Backend.AI. + """ + config_digest = manifest["config"]["digest"] + size_bytes = sum(layer["size"] for layer in manifest["layers"]) + manifest["config"]["size"] + + async with sess.get( + self.registry_url / f"v2/{image}/blobs/{config_digest}", **rqst_args + ) as resp: + resp.raise_for_status() + data = json.loads(await resp.read()) + labels = {} + + # we should favor `config` instead of `container_config` since `config` can contain additional datas + # set when commiting image via `--change` flag + if _config_labels := data.get("config", {}).get("Labels"): + labels = _config_labels + elif _container_config_labels := data.get("container_config", {}).get("Labels"): + labels = _container_config_labels + + return { + "size": size_bytes, + "labels": labels, + "digest": config_digest, + } + + async def _process_oci_index( + self, + tg: aiotools.TaskGroup, + sess: aiohttp.ClientSession, + rqst_args: Mapping[str, Any], + image: str, + tag: str, + image_info: Mapping[str, Any], + ) -> None: + manifest_list = [ + item + for item in image_info["manifests"] + if "annotations" not in item # skip attestation manifests + ] + rqst_args["headers"]["Accept"] = self.MEDIA_TYPE_OCI_MANIFEST + + await self._read_manifest_list(sess, manifest_list, rqst_args, image, tag) + + async def _process_docker_v2_multiplatform_image( + self, + tg: aiotools.TaskGroup, + sess: aiohttp.ClientSession, + rqst_args: Mapping[str, Any], + image: str, + tag: str, + image_info: Mapping[str, Any], + ) -> None: + manifest_list = image_info["manifests"] + rqst_args["headers"]["Accept"] = self.MEDIA_TYPE_DOCKER_MANIFEST + + await self._read_manifest_list( + sess, + manifest_list, + rqst_args, + image, + tag, + ) + + async def _process_docker_v2_image( + self, + tg: aiotools.TaskGroup, + sess: aiohttp.ClientSession, + rqst_args: Mapping[str, Any], + image: str, + tag: str, + image_info: Mapping[str, Any], + ) -> None: + config_digest = image_info["config"]["digest"] + rqst_args["headers"]["Accept"] = self.MEDIA_TYPE_DOCKER_MANIFEST + + async with sess.get( + self.registry_url / f"v2/{image}/blobs/{config_digest}", + **rqst_args, + ) as resp: + resp.raise_for_status() + blob_data = json.loads(await resp.read()) + + manifest_arch = blob_data["architecture"] + architecture = arch_name_aliases.get(manifest_arch, manifest_arch) + + manifests = { + architecture: await self._preprocess_manifest(sess, image_info, rqst_args, image), + } + await self._read_manifest(image, tag, manifests) async def _read_manifest( self, @@ -294,6 +393,9 @@ async def _read_manifest( manifests: dict[str, dict], skip_reason: Optional[str] = None, ) -> None: + """ + Detects if image is compatible with Backend.AI and injects the matadata to database if it complies. + """ if not manifests: if not skip_reason: skip_reason = "missing/deleted" diff --git a/src/ai/backend/manager/container_registry/github.py b/src/ai/backend/manager/container_registry/github.py new file mode 100644 index 0000000000..0d2bb06b88 --- /dev/null +++ b/src/ai/backend/manager/container_registry/github.py @@ -0,0 +1,51 @@ +import logging +from typing import AsyncIterator + +import aiohttp + +from ai.backend.common.logging import BraceStyleAdapter + +from .base import ( + BaseContainerRegistry, +) + +log = BraceStyleAdapter(logging.getLogger(__spec__.name)) # type: ignore[name-defined] + + +class GitHubRegistry(BaseContainerRegistry): + async def fetch_repositories( + self, + sess: aiohttp.ClientSession, + ) -> AsyncIterator[str]: + name, access_token, type_ = ( + self.registry_info["username"], + self.registry_info["password"], + self.registry_info["entity_type"], + ) + + base_url = f"https://api.github.com/{type_}/{name}/packages" + + headers = { + "Authorization": f"Bearer {access_token}", + "Accept": "application/vnd.github.v3+json", + } + page = 1 + + while True: + async with sess.get( + base_url, + headers=headers, + params={"package_type": "container", "per_page": 30, "page": page}, + ) as response: + if response.status == 200: + data = await response.json() + for repo in data: + yield f"{self.registry_info["username"]}/{repo["name"]}" + if "next" in response.links: + page += 1 + else: + break + else: + raise RuntimeError( + f"Failed to fetch repositories! {response.status} error occured." + ) diff --git a/src/ai/backend/manager/container_registry/harbor.py b/src/ai/backend/manager/container_registry/harbor.py index 2a4b09c7f5..fcdb7afc0f 100644 --- a/src/ai/backend/manager/container_registry/harbor.py +++ b/src/ai/backend/manager/container_registry/harbor.py @@ -243,15 +243,15 @@ async def _scan_image( match image_info["manifest_media_type"]: case self.MEDIA_TYPE_OCI_INDEX: await self._process_oci_index( - tg, sess, rqst_args, image, image_info + tg, sess, rqst_args, image, tag, image_info ) case self.MEDIA_TYPE_DOCKER_MANIFEST_LIST: await self._process_docker_v2_multiplatform_image( - tg, sess, rqst_args, image, image_info + tg, sess, rqst_args, image, tag, image_info ) case self.MEDIA_TYPE_DOCKER_MANIFEST: await self._process_docker_v2_image( - tg, sess, rqst_args, image, image_info + tg, sess, rqst_args, image, tag, image_info ) case _ as media_type: raise RuntimeError( @@ -292,15 +292,19 @@ async def _scan_tag( resp.raise_for_status() resp_json = await resp.json() async with aiotools.TaskGroup() as tg: + tag = resp_json["tags"][0]["name"] + match resp_json["manifest_media_type"]: case self.MEDIA_TYPE_OCI_INDEX: - await self._process_oci_index(tg, sess, rqst_args, image, resp_json) + await self._process_oci_index(tg, sess, rqst_args, image, tag, resp_json) case self.MEDIA_TYPE_DOCKER_MANIFEST_LIST: await self._process_docker_v2_multiplatform_image( - tg, sess, rqst_args, image, resp_json + tg, sess, rqst_args, image, tag, resp_json ) case self.MEDIA_TYPE_DOCKER_MANIFEST: - await self._process_docker_v2_image(tg, sess, rqst_args, image, resp_json) + await self._process_docker_v2_image( + tg, sess, rqst_args, image, tag, resp_json + ) case _ as media_type: raise RuntimeError(f"Unsupported artifact media-type: {media_type}") @@ -310,6 +314,7 @@ async def _process_oci_index( sess: aiohttp.ClientSession, _rqst_args: Mapping[str, Any], image: str, + tag: str, image_info: Mapping[str, Any], ) -> None: rqst_args = dict(_rqst_args) @@ -317,7 +322,6 @@ async def _process_oci_index( rqst_args["headers"] = {} rqst_args["headers"].update({"Accept": "application/vnd.oci.image.manifest.v1+json"}) digests: list[tuple[str, str]] = [] - tag_name = image_info["tags"][0]["name"] for reference in image_info["references"]: if ( reference["platform"]["os"] == "unknown" @@ -335,7 +339,7 @@ async def _process_oci_index( rqst_args, image, digest=digest, - tag=tag_name, + tag=tag, architecture=architecture, ) ) @@ -346,6 +350,7 @@ async def _process_docker_v2_multiplatform_image( sess: aiohttp.ClientSession, _rqst_args: Mapping[str, Any], image: str, + tag: str, image_info: Mapping[str, Any], ) -> None: rqst_args = dict(_rqst_args) @@ -355,7 +360,6 @@ async def _process_docker_v2_multiplatform_image( "Accept": "application/vnd.docker.distribution.manifest.v2+json" }) digests: list[tuple[str, str]] = [] - tag_name = image_info["tags"][0]["name"] for reference in image_info["references"]: if ( reference["platform"]["os"] == "unknown" @@ -373,7 +377,7 @@ async def _process_docker_v2_multiplatform_image( rqst_args, image, digest=digest, - tag=tag_name, + tag=tag, architecture=architecture, ) ) @@ -384,6 +388,7 @@ async def _process_docker_v2_image( sess: aiohttp.ClientSession, _rqst_args: Mapping[str, Any], image: str, + tag: str, image_info: Mapping[str, Any], ) -> None: rqst_args = dict(_rqst_args) @@ -394,14 +399,13 @@ async def _process_docker_v2_image( }) if (reporter := progress_reporter.get()) is not None: reporter.total_progress += 1 - tag_name = image_info["tags"][0]["name"] async with aiotools.TaskGroup() as tg: tg.create_task( self._harbor_scan_tag_single_arch( sess, rqst_args, image, - tag=tag_name, + tag, ) )