Skip to content

Commit

Permalink
refactor: Implement per project image scanning and fix local containe…
Browse files Browse the repository at this point in the history
…r registry type handling
  • Loading branch information
jopemachine committed Oct 25, 2024
1 parent aca8c6c commit b4c8741
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 77 deletions.
7 changes: 1 addition & 6 deletions src/ai/backend/agent/docker/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -824,13 +824,8 @@ async def start_container(
container_log_file_count = 5
container_log_file_size = BinarySize(container_log_size // container_log_file_count)

if self.image_ref.is_local:
image = self.image_ref.short
else:
image = self.image_ref.canonical

container_config: MutableMapping[str, Any] = {
"Image": image,
"Image": self.image_ref.canonical,
"Tty": True,
"OpenStdin": True,
"Privileged": False,
Expand Down
13 changes: 9 additions & 4 deletions src/ai/backend/common/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ class ImageRef:
"""

name: str
project: str
project: str | None
tag: str
registry: str
architecture: str
Expand All @@ -383,7 +383,7 @@ class ImageRef:
def from_image_str(
cls,
image_str: str,
project: str,
project: str | None,
registry: str,
*,
architecture: str = "x86_64",
Expand All @@ -395,7 +395,9 @@ def from_image_str(

parsed = cls.parse_image_str(image_str, registry)

if parsed.project_and_image_name == project:
if not project:
image_name = parsed.project_and_image_name
elif parsed.project_and_image_name == project:
image_name = ""
else:
if not parsed.project_and_image_name.startswith(f"{project}/"):
Expand Down Expand Up @@ -565,8 +567,11 @@ def merge_aliases(genned_aliases_1, genned_aliases_2) -> Mapping[str, "ImageRef"

@property
def canonical(self) -> str:
# e.g., cr.backend.ai/stable/python:3.9-ubuntu
join = functools.partial(join_non_empty, sep="/")
if self.is_local:
return f"{join(self.project, self.name)}:{self.tag}"

# e.g., cr.backend.ai/stable/python:3.9-ubuntu
return f"{join(self.registry, self.project, self.name)}:{self.tag}"

@property
Expand Down
90 changes: 35 additions & 55 deletions src/ai/backend/manager/container_registry/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import sqlalchemy as sa
import trafaret as t
import yarl
from sqlalchemy.orm import load_only

from ai.backend.common.bgtask import ProgressReporter
from ai.backend.common.docker import (
Expand Down Expand Up @@ -147,66 +146,47 @@ async def commit_rescan_result(self) -> None:
image_row.resources = update["resources"]
image_row.is_local = is_local

registries = cast(
list[ContainerRegistryRow],
(
await session.scalars(
sa.select(ContainerRegistryRow).options(
load_only(
ContainerRegistryRow.project,
ContainerRegistryRow.registry_name,
ContainerRegistryRow.url,
)
)
)
).all(),
)

for image_identifier, update in _all_updates.items():
for registry in registries:
try:
parsed_img = ImageRef.from_image_str(
image_identifier.canonical, registry.project, registry.registry_name
)
except ProjectMismatchWithCanonical:
continue
except ValueError as e:
skip_reason = str(e)
progress_msg = f"Skipped image - {image_identifier.canonical}/{image_identifier.architecture} ({skip_reason})"
log.warning(progress_msg)
break

session.add(
ImageRow(
name=parsed_img.canonical,
project=registry.project,
registry=parsed_img.registry,
registry_id=registry.id,
image=join_non_empty(parsed_img.project, parsed_img.name, sep="/"),
tag=parsed_img.tag,
architecture=image_identifier.architecture,
is_local=is_local,
config_digest=update["config_digest"],
size_bytes=update["size_bytes"],
type=ImageType.COMPUTE,
accelerators=update.get("accels"),
labels=update["labels"],
resources=update["resources"],
)
try:
parsed_img = ImageRef.from_image_str(
image_identifier.canonical,
self.registry_info.project,
self.registry_info.registry_name,
is_local=is_local,
)
progress_msg = f"Updated image - {parsed_img.canonical}/{image_identifier.architecture} ({update["config_digest"]})"
log.info(progress_msg)
break

else:
skip_reason = "No container registry found matching the image."
except ProjectMismatchWithCanonical:
continue
except ValueError as e:
skip_reason = str(e)
progress_msg = f"Skipped image - {image_identifier.canonical}/{image_identifier.architecture} ({skip_reason})"
log.warning(progress_msg)
continue

session.add(
ImageRow(
name=parsed_img.canonical,
project=self.registry_info.project,
registry=parsed_img.registry,
registry_id=self.registry_info.id,
image=join_non_empty(parsed_img.project, parsed_img.name, sep="/"),
tag=parsed_img.tag,
architecture=image_identifier.architecture,
is_local=is_local,
config_digest=update["config_digest"],
size_bytes=update["size_bytes"],
type=ImageType.COMPUTE,
accelerators=update.get("accels"),
labels=update["labels"],
resources=update["resources"],
)
)
progress_msg = f"Updated image - {parsed_img.canonical}/{image_identifier.architecture} ({update["config_digest"]})"
log.info(progress_msg)

if (reporter := progress_reporter.get()) is not None:
await reporter.update(1, message=progress_msg)
if (reporter := progress_reporter.get()) is not None:
await reporter.update(1, message=progress_msg)

await session.flush()
await session.flush()

async def scan_single_ref(self, image: str) -> None:
all_updates_token = all_updates.set({})
Expand Down
2 changes: 1 addition & 1 deletion src/ai/backend/manager/models/container_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class ContainerRegistryRow(Base):
nullable=False,
index=True,
)
project = sa.Column("project", sa.String(length=255), index=True, nullable=False)
project = sa.Column("project", sa.String(length=255), index=True, nullable=True)
username = sa.Column("username", sa.String(length=255), nullable=True)
password = sa.Column("password", sa.String, nullable=True)
ssl_verify = sa.Column(
Expand Down
44 changes: 33 additions & 11 deletions src/ai/backend/manager/models/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
)
from uuid import UUID

import aiotools
import sqlalchemy as sa
import trafaret as t
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncSession
Expand Down Expand Up @@ -109,7 +108,7 @@ async def rescan_images(
result = await session.execute(sa.select(ContainerRegistryRow))
latest_registry_config = cast(
dict[str, ContainerRegistryRow],
{row.registry_name: row for row in result.scalars().all()},
{f"{row.registry_name}/{row.project}": row for row in result.scalars().all()},
)

# TODO: delete images from registries removed from the previous config?
Expand All @@ -118,7 +117,9 @@ async def rescan_images(
registries = latest_registry_config
else:
# find if it's a full image ref of one of configured registries
for registry_name, registry_info in latest_registry_config.items():
for registry_key, registry_info in latest_registry_config.items():
registry_name, _project = registry_key.split("/", maxsplit=1)

if registry_or_image.startswith(registry_name + "/"):
repo_with_tag = registry_or_image.removeprefix(registry_name + "/")
log.debug(
Expand All @@ -134,16 +135,25 @@ async def rescan_images(
# treat it as a normal registry name
registry = registry_or_image
try:
registries = {registry: latest_registry_config[registry]}
registries = {}
for registry_key, registry_info in latest_registry_config.items():
registry_name, _ = registry_key.split("/", maxsplit=1)

if registry == registry_name:
registries[registry_key] = latest_registry_config[registry_key]

log.debug("running a per-registry metadata scan")
except KeyError:
raise RuntimeError("It is an unknown registry.", registry)
async with aiotools.TaskGroup() as tg:
for registry_name, registry_info in registries.items():
log.info('Scanning kernel images from the registry "{0}"', registry_name)
scanner_cls = get_container_registry_cls(registry_info)
scanner = scanner_cls(db, registry_name, registry_info)
tg.create_task(scanner.rescan_single_registry(reporter))

for registry_key, registry_info in registries.items():
registry_name, _project = registry_key.split("/", maxsplit=1)

log.info('Scanning kernel images from the registry "{0}"', registry_name)
scanner_cls = get_container_registry_cls(registry_info)
scanner = scanner_cls(db, registry_name, registry_info)
await scanner.rescan_single_registry(reporter)

# TODO: delete images removed from registry?


Expand All @@ -157,7 +167,7 @@ class ImageRow(Base):
__tablename__ = "images"
id = IDColumn("id")
name = sa.Column("name", sa.String, nullable=False, index=True)
project = sa.Column("project", sa.String, nullable=False)
project = sa.Column("project", sa.String, nullable=True)
image = sa.Column("image", sa.String, nullable=False, index=True)
created_at = sa.Column(
"created_at",
Expand Down Expand Up @@ -237,6 +247,18 @@ def trimmed_digest(self) -> str:

@property
def image_ref(self) -> ImageRef:
if self.is_local:
image_name, tag = ImageRef.parse_image_tag(self.name)

return ImageRef(
image_name,
self.project,
tag,
self.registry,
self.architecture,
self.is_local,
)

# Empty image name
if self.project == self.image:
image_name = ""
Expand Down

0 comments on commit b4c8741

Please sign in to comment.