Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jopemachine committed Jun 28, 2024
1 parent 4862bad commit ea09bab
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 168 deletions.
234 changes: 181 additions & 53 deletions src/ai/backend/manager/container_registry/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ async def _scan_tag(
image: str,
tag: str,
) -> None:
manifests = {}
async with concurrency_sema.get():
rqst_args["headers"]["Accept"] = self.MEDIA_TYPE_DOCKER_MANIFEST_LIST
async with sess.get(
Expand All @@ -230,63 +229,192 @@ async def _scan_tag(
content_type = resp.headers["Content-Type"]
resp.raise_for_status()
resp_json = await resp.json()
match content_type:
# TODO: Support `self.MEDIA_TYPE_DOCKER_MANIFEST`
case self.MEDIA_TYPE_DOCKER_MANIFEST_LIST:
manifest_list = resp_json["manifests"]
request_type = self.MEDIA_TYPE_DOCKER_MANIFEST
case self.MEDIA_TYPE_OCI_INDEX:
manifest_list = [
item
for item in resp_json["manifests"]
if "annotations" not in item # skip attestation manifests
]
request_type = self.MEDIA_TYPE_OCI_MANIFEST
case _:
log.warn("Unknown content type: {}", content_type)
raise RuntimeError(
"The registry does not support the standard way of "
"listing multiarch images."
)
rqst_args["headers"]["Accept"] = request_type
for manifest in manifest_list:
platform_arg = (
f"{manifest['platform']['os']}/{manifest['platform']['architecture']}"
)
if variant := manifest["platform"].get("variant", None):
platform_arg += f"/{variant}"
architecture = manifest["platform"]["architecture"]
architecture = arch_name_aliases.get(architecture, architecture)
async with sess.get(
self.registry_url / f"v2/{image}/manifests/{manifest['digest']}", **rqst_args
) as resp:
data = await resp.json()
config_digest = data["config"]["digest"]
size_bytes = sum(layer["size"] for layer in data["layers"]) + data["config"]["size"]
async with sess.get(
self.registry_url / f"v2/{image}/blobs/{config_digest}", **rqst_args
) as resp:
resp.raise_for_status()
data = json.loads(await resp.read())
labels = {}
# we should favor `config` instead of `container_config` since `config` can contain additional datas
# set when commiting image via `--change` flag
if _config_labels := data.get("config", {}).get("Labels"):
labels = _config_labels
elif _container_config_labels := data.get("container_config", {}).get("Labels"):
labels = _container_config_labels

if not labels:
log.warning(
"Labels section not found on image {}:{}/{}", image, tag, architecture
)

manifests[architecture] = {
async with aiotools.TaskGroup() as tg:
match content_type:
case self.MEDIA_TYPE_DOCKER_MANIFEST:
await self._process_docker_v2_image(
tg, sess, rqst_args, image, tag, resp_json
)
case self.MEDIA_TYPE_DOCKER_MANIFEST_LIST:
await self._process_docker_v2_multiplatform_image(
tg, sess, rqst_args, image, tag, resp_json
)
case self.MEDIA_TYPE_OCI_INDEX:
await self._process_oci_index(
tg, sess, rqst_args, image, tag, resp_json
)
case _:
log.warn("Unknown content type: {}", content_type)
raise RuntimeError(
"The registry does not support the standard way of "
"listing multiarch images."
)

async def _process_oci_index(
self,
tg: aiotools.TaskGroup,
sess: aiohttp.ClientSession,
rqst_args: Mapping[str, Any],
image: str,
tag: str,
image_info: Mapping[str, Any],
) -> None:
manifests = {}
manifest_list = [
item
for item in image_info["manifests"]
if "annotations" not in item # skip attestation manifests
]
rqst_args["headers"]["Accept"] = self.MEDIA_TYPE_OCI_MANIFEST

for manifest in manifest_list:
platform_arg = f"{manifest['platform']['os']}/{manifest['platform']['architecture']}"
if variant := manifest["platform"].get("variant", None):
platform_arg += f"/{variant}"
architecture = manifest["platform"]["architecture"]
architecture = arch_name_aliases.get(architecture, architecture)

async with sess.get(
self.registry_url / f"v2/{image}/manifests/{manifest['digest']}", **rqst_args
) as resp:
data = await resp.json()
config_digest = data["config"]["digest"]
size_bytes = sum(layer["size"] for layer in data["layers"]) + data["config"]["size"]

async with sess.get(
self.registry_url / f"v2/{image}/blobs/{config_digest}", **rqst_args
) as resp:
resp.raise_for_status()
data = json.loads(await resp.read())
labels = {}
# we should favor `config` instead of `container_config` since `config` can contain additional datas
# set when commiting image via `--change` flag
if _config_labels := data.get("config", {}).get("Labels"):
labels = _config_labels
elif _container_config_labels := data.get("container_config", {}).get("Labels"):
labels = _container_config_labels

if not labels:
log.warning("Labels section not found on image {}:{}/{}", image, tag, architecture)

manifests[architecture] = {
"size": size_bytes,
"labels": labels,
"digest": config_digest,
}
await self._read_manifest(image, tag, manifests)

async def _process_docker_v2_multiplatform_image(
self,
tg: aiotools.TaskGroup,
sess: aiohttp.ClientSession,
rqst_args: Mapping[str, Any],
image: str,
tag: str,
image_info: Mapping[str, Any],
) -> None:
manifests = {}
manifest_list = image_info["manifests"]
rqst_args["headers"]["Accept"] = self.MEDIA_TYPE_DOCKER_MANIFEST

for manifest in manifest_list:
platform_arg = f"{manifest['platform']['os']}/{manifest['platform']['architecture']}"
if variant := manifest["platform"].get("variant", None):
platform_arg += f"/{variant}"
architecture = manifest["platform"]["architecture"]
architecture = arch_name_aliases.get(architecture, architecture)

async with sess.get(
self.registry_url / f"v2/{image}/manifests/{manifest['digest']}", **rqst_args
) as resp:
data = await resp.json()
config_digest = data["config"]["digest"]
size_bytes = sum(layer["size"] for layer in data["layers"]) + data["config"]["size"]

async with sess.get(
self.registry_url / f"v2/{image}/blobs/{config_digest}", **rqst_args
) as resp:
resp.raise_for_status()
data = json.loads(await resp.read())
labels = {}
# we should favor `config` instead of `container_config` since `config` can contain additional datas
# set when commiting image via `--change` flag
if _config_labels := data.get("config", {}).get("Labels"):
labels = _config_labels
elif _container_config_labels := data.get("container_config", {}).get("Labels"):
labels = _container_config_labels

if not labels:
log.warning("Labels section not found on image {}:{}/{}", image, tag, architecture)

manifests[architecture] = {
"size": size_bytes,
"labels": labels,
"digest": config_digest,
}
await self._read_manifest(image, tag, manifests)

async def _process_docker_v2_image(
self,
tg: aiotools.TaskGroup,
sess: aiohttp.ClientSession,
rqst_args: Mapping[str, Any],
image: str,
tag: str,
image_info: Mapping[str, Any],
) -> None:
config_digest = image_info["config"]["digest"]
rqst_args["headers"]["Accept"] = self.MEDIA_TYPE_DOCKER_MANIFEST

async with sess.get(
self.registry_url / f"v2/{image}/blobs/{config_digest}",
**rqst_args,
) as resp:
resp.raise_for_status()
blob_data = json.loads(await resp.read())

manifest_os = blob_data["os"]
manifest_arch = blob_data["architecture"]
architecture = arch_name_aliases.get(manifest_arch, manifest_arch)
manifest_variant = blob_data.get("variant", None)

platform_arg = f"{manifest_os}/{manifest_arch}"
if manifest_variant:
platform_arg += f"/{manifest_variant}"

size_bytes = (
sum(layer["size"] for layer in image_info["layers"]) + image_info["config"]["size"]
)

async with sess.get(
self.registry_url / f"v2/{image}/blobs/{config_digest}", **rqst_args
) as resp:
resp.raise_for_status()
data = json.loads(await resp.read())
labels = {}

# we should favor `config` instead of `container_config` since `config` can contain additional datas
# set when commiting image via `--change` flag
if _config_labels := data.get("config", {}).get("Labels"):
labels = _config_labels
elif _container_config_labels := data.get("container_config", {}).get("Labels"):
labels = _container_config_labels

if not labels:
log.warning("Labels section not found on image {}:{}/{}", image, tag, architecture)

await self._read_manifest(
image,
tag,
{
architecture: {
"size": size_bytes,
"labels": labels,
"digest": config_digest,
}
await self._read_manifest(image, tag, manifests)
},
)

async def _read_manifest(
self,
Expand Down
Loading

0 comments on commit ea09bab

Please sign in to comment.