Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support GitHub Container Registry #2341

Closed
1 change: 1 addition & 0 deletions changes/2341.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support GitHub Container Registry
4 changes: 4 additions & 0 deletions src/ai/backend/manager/container_registry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ def get_container_registry_cls(registry_info: Mapping[str, Any]) -> Type[BaseCon
from .harbor import HarborRegistry_v2

cr_cls = HarborRegistry_v2
elif registry_type == "github":
from .github import GitHubRegistry

cr_cls = GitHubRegistry
elif registry_type == "local":
from .local import LocalRegistry

Expand Down
218 changes: 160 additions & 58 deletions src/ai/backend/manager/container_registry/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from abc import ABCMeta, abstractmethod
from contextlib import asynccontextmanager as actxmgr
from contextvars import ContextVar
from typing import Any, AsyncIterator, Dict, Final, Mapping, Optional, cast
from typing import Any, AsyncIterator, Dict, Final, Mapping, Optional, Sequence, cast

import aiohttp
import aiotools
Expand Down Expand Up @@ -178,7 +178,6 @@ async def _scan_image(
sess: aiohttp.ClientSession,
image: str,
) -> None:
log.info("_scan_image()")
rqst_args = await registry_login(
sess,
self.registry_url,
Expand Down Expand Up @@ -217,7 +216,6 @@ async def _scan_tag(
image: str,
tag: str,
) -> None:
manifests = {}
async with concurrency_sema.get():
rqst_args["headers"]["Accept"] = self.MEDIA_TYPE_DOCKER_MANIFEST_LIST
async with sess.get(
Expand All @@ -230,62 +228,163 @@ async def _scan_tag(
content_type = resp.headers["Content-Type"]
resp.raise_for_status()
resp_json = await resp.json()
match content_type:
case self.MEDIA_TYPE_DOCKER_MANIFEST_LIST:
manifest_list = resp_json["manifests"]
request_type = self.MEDIA_TYPE_DOCKER_MANIFEST
case self.MEDIA_TYPE_OCI_INDEX:
manifest_list = [
item
for item in resp_json["manifests"]
if "annotations" not in item # skip attestation manifests
]
request_type = self.MEDIA_TYPE_OCI_MANIFEST
case _:
log.warn("Unknown content type: {}", content_type)
raise RuntimeError(
"The registry does not support the standard way of "
"listing multiarch images."
)
rqst_args["headers"]["Accept"] = request_type
for manifest in manifest_list:
platform_arg = (
f"{manifest['platform']['os']}/{manifest['platform']['architecture']}"
)
if variant := manifest["platform"].get("variant", None):
platform_arg += f"/{variant}"
architecture = manifest["platform"]["architecture"]
architecture = arch_name_aliases.get(architecture, architecture)
async with sess.get(
self.registry_url / f"v2/{image}/manifests/{manifest['digest']}", **rqst_args
) as resp:
data = await resp.json()
config_digest = data["config"]["digest"]
size_bytes = sum(layer["size"] for layer in data["layers"]) + data["config"]["size"]
async with sess.get(
self.registry_url / f"v2/{image}/blobs/{config_digest}", **rqst_args
) as resp:
resp.raise_for_status()
data = json.loads(await resp.read())
labels = {}
# we should favor `config` instead of `container_config` since `config` can contain additional datas
# set when commiting image via `--change` flag
if _config_labels := data.get("config", {}).get("Labels"):
labels = _config_labels
elif _container_config_labels := data.get("container_config", {}).get("Labels"):
labels = _container_config_labels

if not labels:
log.warning(
"Labels section not found on image {}:{}/{}", image, tag, architecture
)

manifests[architecture] = {
"size": size_bytes,
"labels": labels,
"digest": config_digest,
}
await self._read_manifest(image, tag, manifests)
async with aiotools.TaskGroup() as tg:
match content_type:
case self.MEDIA_TYPE_DOCKER_MANIFEST:
await self._process_docker_v2_image(
tg, sess, rqst_args, image, tag, resp_json
)
case self.MEDIA_TYPE_DOCKER_MANIFEST_LIST:
await self._process_docker_v2_multiplatform_image(
tg, sess, rqst_args, image, tag, resp_json
)
case self.MEDIA_TYPE_OCI_INDEX:
await self._process_oci_index(
tg, sess, rqst_args, image, tag, resp_json
)
case _:
log.warn("Unknown content type: {}", content_type)
raise RuntimeError(
"The registry does not support the standard way of "
"listing multiarch images."
)

async def _read_manifest_list(
self,
sess: aiohttp.ClientSession,
manifest_list: Sequence[Any],
rqst_args: Mapping[str, Any],
image: str,
tag: str,
) -> None:
"""
Understands images defined under [OCI image manifest](https://github.com/opencontainers/image-spec/blob/main/manifest.md#example-image-manifest) or
[Docker image manifest list](https://github.com/openshift/docker-distribution/blob/master/docs/spec/manifest-v2-2.md#example-manifest-list)
and imports Backend.AI compatible images.
"""
manifests = {}
for manifest in manifest_list:
platform_arg = f"{manifest['platform']['os']}/{manifest['platform']['architecture']}"
if variant := manifest["platform"].get("variant", None):
platform_arg += f"/{variant}"
architecture = manifest["platform"]["architecture"]
architecture = arch_name_aliases.get(architecture, architecture)

async with sess.get(
self.registry_url / f"v2/{image}/manifests/{manifest['digest']}",
**rqst_args,
) as resp:
manifest_info = await resp.json()

manifests[architecture] = await self._preprocess_manifest(
sess, manifest_info, rqst_args, image
)

if not manifests[architecture]["labels"]:
log.warning("Labels section not found on image {}:{}/{}", image, tag, architecture)

await self._read_manifest(image, tag, manifests)

async def _preprocess_manifest(
self,
sess: aiohttp.ClientSession,
manifest: Mapping[str, Any],
rqst_args: Mapping[str, Any],
image: str,
) -> dict[str, Any]:
"""
Extracts informations from
[Docker iamge manifest](https://github.com/openshift/docker-distribution/blob/master/docs/spec/manifest-v2-2.md#example-image-manifest)
required by Backend.AI.
"""
config_digest = manifest["config"]["digest"]
size_bytes = sum(layer["size"] for layer in manifest["layers"]) + manifest["config"]["size"]

async with sess.get(
self.registry_url / f"v2/{image}/blobs/{config_digest}", **rqst_args
) as resp:
resp.raise_for_status()
data = json.loads(await resp.read())
labels = {}

# we should favor `config` instead of `container_config` since `config` can contain additional datas
# set when commiting image via `--change` flag
if _config_labels := data.get("config", {}).get("Labels"):
labels = _config_labels
elif _container_config_labels := data.get("container_config", {}).get("Labels"):
labels = _container_config_labels

return {
"size": size_bytes,
"labels": labels,
"digest": config_digest,
}

async def _process_oci_index(
self,
tg: aiotools.TaskGroup,
sess: aiohttp.ClientSession,
rqst_args: Mapping[str, Any],
image: str,
tag: str,
image_info: Mapping[str, Any],
) -> None:
manifest_list = [
item
for item in image_info["manifests"]
if "annotations" not in item # skip attestation manifests
]
rqst_args["headers"]["Accept"] = self.MEDIA_TYPE_OCI_MANIFEST

await self._read_manifest_list(sess, manifest_list, rqst_args, image, tag)

async def _process_docker_v2_multiplatform_image(
self,
tg: aiotools.TaskGroup,
sess: aiohttp.ClientSession,
rqst_args: Mapping[str, Any],
image: str,
tag: str,
image_info: Mapping[str, Any],
) -> None:
manifest_list = image_info["manifests"]
rqst_args["headers"]["Accept"] = self.MEDIA_TYPE_DOCKER_MANIFEST

await self._read_manifest_list(
sess,
manifest_list,
rqst_args,
image,
tag,
)

async def _process_docker_v2_image(
self,
tg: aiotools.TaskGroup,
sess: aiohttp.ClientSession,
rqst_args: Mapping[str, Any],
image: str,
tag: str,
image_info: Mapping[str, Any],
) -> None:
config_digest = image_info["config"]["digest"]
rqst_args["headers"]["Accept"] = self.MEDIA_TYPE_DOCKER_MANIFEST

async with sess.get(
self.registry_url / f"v2/{image}/blobs/{config_digest}",
**rqst_args,
) as resp:
resp.raise_for_status()
blob_data = json.loads(await resp.read())

manifest_arch = blob_data["architecture"]
architecture = arch_name_aliases.get(manifest_arch, manifest_arch)

manifests = {
architecture: await self._preprocess_manifest(sess, image_info, rqst_args, image),
}
await self._read_manifest(image, tag, manifests)

async def _read_manifest(
self,
Expand All @@ -294,6 +393,9 @@ async def _read_manifest(
manifests: dict[str, dict],
skip_reason: Optional[str] = None,
) -> None:
"""
Detects if image is compatible with Backend.AI and injects the matadata to database if it complies.
"""
if not manifests:
if not skip_reason:
skip_reason = "missing/deleted"
Expand Down
51 changes: 51 additions & 0 deletions src/ai/backend/manager/container_registry/github.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import logging
from typing import AsyncIterator

import aiohttp

from ai.backend.common.logging import BraceStyleAdapter

from .base import (
BaseContainerRegistry,
)

log = BraceStyleAdapter(logging.getLogger(__spec__.name)) # type: ignore[name-defined]


class GitHubRegistry(BaseContainerRegistry):
async def fetch_repositories(
self,
sess: aiohttp.ClientSession,
) -> AsyncIterator[str]:
name, access_token, type_ = (
self.registry_info["username"],
self.registry_info["password"],
self.registry_info["entity_type"],
)

base_url = f"https://api.github.com/{type_}/{name}/packages"

headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/vnd.github.v3+json",
}
page = 1

while True:
async with sess.get(
base_url,
headers=headers,
params={"package_type": "container", "per_page": 30, "page": page},
) as response:
if response.status == 200:
data = await response.json()
for repo in data:
yield f"{self.registry_info["username"]}/{repo["name"]}"
if "next" in response.links:
page += 1
else:
break
else:
raise RuntimeError(
f"Failed to fetch repositories! {response.status} error occured."
)
Loading