Skip to content

Commit

Permalink
Merge branch '23.09' into backport/2740-to-23.09
Browse files Browse the repository at this point in the history
  • Loading branch information
kyujin-cho authored Sep 4, 2024
2 parents 3372206 + aa8b69d commit 81f8f6f
Show file tree
Hide file tree
Showing 20 changed files with 145 additions and 24 deletions.
1 change: 1 addition & 0 deletions changes/2028.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix handling of undefined values in the ModifyImage GraphQL mutation.
1 change: 1 addition & 0 deletions changes/2754.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Correct `msgpack` deserialization of `ResourceSlot`.
1 change: 1 addition & 0 deletions changes/2771.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make the regex patterns to update configuration files working with multiline texts correctly in the TUI installer
1 change: 1 addition & 0 deletions changes/2786.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Handle container port mismatch when creating kernel.
1 change: 1 addition & 0 deletions changes/2796.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add an explicit configuration `scaling-group-type` to `agent.toml` so that the agent could distinguish whether itself belongs to an SFTP resource group or not
1 change: 1 addition & 0 deletions changes/2797.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Explicitly set the protected service ports depending on the resource group type and the service types
8 changes: 7 additions & 1 deletion configs/agent/sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,16 @@ agent-sock-port = 6007
# This affects the per-node configuration scope.
# id = "i-something-special"

# Set the scaling group of this agent.
# Set the scaling group (aka resource group) of this agent.
# This affects the per-sgroup configuration scope.
scaling-group = "default"

# Set the type of scaling group (aka resource group) of this agent.
# - "compute": The agent hosts computing workloads, facing the internal cluster nodes.
# - "storage": The agent hosts storage-access containers, directly facing public/user-side netweorks.
# [default: "compute"]
# scaling-group-type = "compute"

# Set the volume mount path for the agent node.
# mount-path = "/vfroot"

Expand Down
16 changes: 16 additions & 0 deletions src/ai/backend/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,22 @@ def update_user_bootstrap_script(self, script: str) -> None:
"""
self.kernel_config["bootstrap_script"] = script

@property
@abstractmethod
def repl_ports(self) -> Sequence[int]:
"""
Return the list of intrinsic REPL ports to exclude from public mapping.
"""
raise NotImplementedError

@property
@abstractmethod
def protected_services(self) -> Sequence[str]:
"""
Return the list of protected (intrinsic) service names to exclude from public mapping.
"""
raise NotImplementedError

@abstractmethod
async def apply_network(self, cluster_info: ClusterInfo) -> None:
"""
Expand Down
7 changes: 6 additions & 1 deletion src/ai/backend/agent/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from ai.backend.common import config
from ai.backend.common import validators as tx
from ai.backend.common.types import ResourceGroupType

from .affinity_map import AffinityPolicy
from .stats import StatModes
Expand Down Expand Up @@ -38,6 +39,9 @@
t.Key("region", default=None): t.Null | t.String,
t.Key("instance-type", default=None): t.Null | t.String,
t.Key("scaling-group", default="default"): t.String,
t.Key("scaling-group-type", default=ResourceGroupType.COMPUTE): t.Enum(
*(e.value for e in ResourceGroupType)
),
t.Key("pid-file", default=os.devnull): tx.Path(
type="file", allow_nonexisting=True, allow_devnull=True
),
Expand Down Expand Up @@ -65,7 +69,8 @@
t.Key("bind-host", default=""): t.String(allow_blank=True),
t.Key("advertised-host", default=None): t.Null | t.String(),
t.Key("port-range", default=(30000, 31000)): tx.PortRange,
t.Key("stats-type", default="docker"): t.Null | t.Enum(*[e.value for e in StatModes]),
t.Key("stats-type", default=StatModes.DOCKER): t.Null
| t.Enum(*(e.value for e in StatModes)),
t.Key("sandbox-type", default="docker"): t.Enum("docker", "jail"),
t.Key("jail-args", default=[]): t.List(t.String),
t.Key("scratch-type"): t.Enum("hostdir", "hostfile", "memory", "k8s-nfs"),
Expand Down
50 changes: 43 additions & 7 deletions src/ai/backend/agent/docker/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from aiodocker.exceptions import DockerError
from aiomonitor.task import preserve_termination_log
from async_timeout import timeout
from typing_extensions import override

from ai.backend.common.cgroup import get_cgroup_mount_point
from ai.backend.common.docker import MAX_KERNELSPEC, MIN_KERNELSPEC, ImageRef
Expand All @@ -61,6 +62,7 @@
KernelId,
MountPermission,
MountTypes,
ResourceGroupType,
ResourceSlot,
Sentinel,
ServicePort,
Expand Down Expand Up @@ -687,6 +689,21 @@ def _populate_ssh_config():
)
return kernel_obj

@property
@override
def repl_ports(self) -> Sequence[int]:
return (2000, 2001)

@property
@override
def protected_services(self) -> Sequence[str]:
rgtype: ResourceGroupType = self.local_config["agent"]["scaling-group-type"]
match rgtype:
case ResourceGroupType.COMPUTE:
return ()
case ResourceGroupType.STORAGE:
return ("ttyd",)

async def start_container(
self,
kernel_obj: AbstractKernel,
Expand All @@ -703,11 +720,13 @@ async def start_container(
# PHASE 4: Run!
container_bind_host = self.local_config["container"]["bind-host"]
advertised_kernel_host = self.local_config["container"].get("advertised-host")
repl_ports = [2000, 2001]
if len(service_ports) + len(repl_ports) > len(self.port_pool):
raise RuntimeError("Container ports are not sufficiently available.")
exposed_ports = repl_ports
host_ports = [self.port_pool.pop() for _ in repl_ports]
if len(service_ports) + len(self.repl_ports) > len(self.port_pool):
raise RuntimeError(
f"Container ports are not sufficiently available. (remaining ports: {self.port_pool})"
)
exposed_ports = [*self.repl_ports]
host_ports = [self.port_pool.pop() for _ in self.repl_ports]
host_ips = []
for sport in service_ports:
exposed_ports.extend(sport["container_ports"])
if (
Expand All @@ -723,6 +742,18 @@ async def start_container(
else:
hport = self.port_pool.pop()
host_ports.append(hport)
protected_service_ports: set[int] = set()
for sport in service_ports:
if sport["name"] in self.protected_services:
protected_service_ports.update(sport["container_ports"])
for eport in exposed_ports:
if eport in self.repl_ports: # always protected
host_ips.append("127.0.0.1")
elif eport in protected_service_ports: # check if protected by resource group type
host_ips.append("127.0.0.1")
else:
host_ips.append(str(container_bind_host))
assert len(host_ips) == len(host_ports) == len(exposed_ports)

container_log_size = self.local_config["agent"]["container-logs"]["max-length"]
container_log_file_count = 5
Expand Down Expand Up @@ -750,8 +781,8 @@ async def start_container(
"HostConfig": {
"Init": True,
"PortBindings": {
f"{eport}/tcp": [{"HostPort": str(hport), "HostIp": str(container_bind_host)}]
for eport, hport in zip(exposed_ports, host_ports)
f"{eport}/tcp": [{"HostPort": str(hport), "HostIp": hip}]
for eport, hport, hip in zip(exposed_ports, host_ports, host_ips)
},
"PublishAllPorts": False, # we manage port mapping manually!
"CapAdd": [
Expand Down Expand Up @@ -942,6 +973,11 @@ async def start_container(
container_id=cid, message="Container port not found"
)
host_port = int(ports[0]["HostPort"])
if host_port != host_ports[idx]:
raise ContainerCreationError(
container_id=cid,
message=f"Port mapping mismatch. {host_port = }, {host_ports[idx] = }",
)
assert host_port == host_ports[idx]
if port == 2000: # intrinsic
repl_in_port = host_port
Expand Down
2 changes: 1 addition & 1 deletion src/ai/backend/agent/docker/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async def create_code_runner(
self.kernel_id,
self.session_id,
event_producer,
kernel_host=self.data["kernel_host"],
kernel_host="127.0.0.1", # repl ports are always bound to 127.0.0.1
repl_in_port=self.data["repl_in_port"],
repl_out_port=self.data["repl_out_port"],
exec_timeout=0,
Expand Down
12 changes: 12 additions & 0 deletions src/ai/backend/agent/dummy/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
Tuple,
)

from typing_extensions import override

from ai.backend.common.config import read_from_file
from ai.backend.common.docker import ImageRef
from ai.backend.common.events import EventProducer
Expand Down Expand Up @@ -108,6 +110,16 @@ async def prepare_scratch(self) -> None:
async def get_intrinsic_mounts(self) -> Sequence[Mount]:
return []

@property
@override
def repl_ports(self) -> Sequence[int]:
return (2000, 2001)

@property
@override
def protected_services(self) -> Sequence[str]:
return ()

async def apply_network(self, cluster_info: ClusterInfo) -> None:
return

Expand Down
14 changes: 13 additions & 1 deletion src/ai/backend/agent/kubernetes/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import pkg_resources
from kubernetes_asyncio import client as kube_client
from kubernetes_asyncio import config as kube_config
from typing_extensions import override

from ai.backend.common.asyncio import current_loop
from ai.backend.common.docker import ImageRef
Expand Down Expand Up @@ -303,6 +304,17 @@ async def get_intrinsic_mounts(self) -> Sequence[Mount]:

return mounts

@property
@override
def repl_ports(self) -> Sequence[int]:
return (2000, 2001)

@property
@override
def protected_services(self) -> Sequence[str]:
# NOTE: Currently K8s does not support binding container ports to 127.0.0.1 when using NodePort.
return ()

async def apply_network(self, cluster_info: ClusterInfo) -> None:
pass

Expand Down Expand Up @@ -655,7 +667,7 @@ async def start_container(
await kube_config.load_kube_config()
core_api = kube_client.CoreV1Api()
apps_api = kube_client.AppsV1Api()
exposed_ports = [2000, 2001]
exposed_ports = [*self.repl_ports]
for sport in service_ports:
exposed_ports.extend(sport["container_ports"])

Expand Down
2 changes: 1 addition & 1 deletion src/ai/backend/agent/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def check_cgroup_available():
return not is_containerized() and sys.platform.startswith("linux")


class StatModes(enum.Enum):
class StatModes(enum.StrEnum):
CGROUP = "cgroup"
DOCKER = "docker"

Expand Down
7 changes: 6 additions & 1 deletion src/ai/backend/common/msgpack.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import msgpack as _msgpack
import temporenc

from .types import BinarySize
from .types import BinarySize, ResourceSlot

__all__ = ("packb", "unpackb")

Expand All @@ -27,6 +27,7 @@ class ExtTypes(enum.IntEnum):
POSIX_PATH = 4
PURE_POSIX_PATH = 5
ENUM = 6
RESOURCE_SLOT = 8
BACKENDAI_BINARY_SIZE = 16


Expand All @@ -46,6 +47,8 @@ def _default(obj: object) -> Any:
return _msgpack.ExtType(ExtTypes.POSIX_PATH, os.fsencode(obj))
case PurePosixPath():
return _msgpack.ExtType(ExtTypes.PURE_POSIX_PATH, os.fsencode(obj))
case ResourceSlot():
return _msgpack.ExtType(ExtTypes.RESOURCE_SLOT, pickle.dumps(obj, protocol=5))
case enum.Enum():
return _msgpack.ExtType(ExtTypes.ENUM, pickle.dumps(obj, protocol=5))
raise TypeError(f"Unknown type: {obj!r} ({type(obj)})")
Expand All @@ -65,6 +68,8 @@ def _ext_hook(code: int, data: bytes) -> Any:
return PurePosixPath(os.fsdecode(data))
case ExtTypes.ENUM:
return pickle.loads(data)
case ExtTypes.RESOURCE_SLOT:
return pickle.loads(data)
case ExtTypes.BACKENDAI_BINARY_SIZE:
return pickle.loads(data)
return _msgpack.ExtType(code, data)
Expand Down
8 changes: 7 additions & 1 deletion src/ai/backend/common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
"SlotName",
"IntrinsicSlotNames",
"ResourceSlot",
"ResourceGroupType",
"ReadableCIDR",
"HardwareMetadata",
"ModelServiceStatus",
Expand Down Expand Up @@ -287,7 +288,12 @@ class SessionResult(str, enum.Enum):
FAILURE = "failure"


class ClusterMode(str, enum.Enum):
class ResourceGroupType(enum.StrEnum):
COMPUTE = enum.auto()
STORAGE = enum.auto()


class ClusterMode(enum.StrEnum):
SINGLE_NODE = "single-node"
MULTI_NODE = "multi-node"

Expand Down
12 changes: 6 additions & 6 deletions src/ai/backend/install/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,15 +341,15 @@ async def configure_manager(self) -> None:
self.sed_in_place_multi(
toml_path,
[
(re.compile("^num-proc = .*"), "num-proc = 1"),
(re.compile("^num-proc = .*", flags=re.M), "num-proc = 1"),
("port = 8120", f"port = {halfstack.etcd_addr[0].face.port}"),
("port = 8100", f"port = {halfstack.postgres_addr.face.port}"),
(
"port = 8081",
f"port = {self.install_info.service_config.manager_addr.bind.port}",
),
(
re.compile("^(# )?ipc-base-path =.*"),
re.compile("^(# )?ipc-base-path =.*", flags=re.M),
f'ipc-base-path = "{self.install_info.service_config.manager_ipc_base_path}"',
),
],
Expand Down Expand Up @@ -424,15 +424,15 @@ async def configure_agent(self) -> None:
("port = 6001", f"port = {service.agent_rpc_addr.bind.port}"),
("port = 6009", f"port = {service.agent_watcher_addr.bind.port}"),
(
re.compile("^(# )?ipc-base-path = .*"),
re.compile("^(# )?ipc-base-path = .*", flags=re.M),
f'ipc-base-path = "{service.agent_ipc_base_path}"',
),
(
re.compile("^(# )?var-base-path = .*"),
re.compile("^(# )?var-base-path = .*", flags=re.M),
f'var-base-path = "{service.agent_var_base_path}"',
),
(
re.compile("(# )?mount_path = .*"),
re.compile("(# )?mount_path = .*", flags=re.M),
f'"{self.install_info.base_path / service.vfolder_relpath}"',
),
],
Expand All @@ -445,7 +445,7 @@ async def configure_agent(self) -> None:
# "cuda.devices = 0" as the agent capacity, but it will still run.
self.sed_in_place(
toml_path,
re.compile("^(# )?allow-compute-plugins = .*"),
re.compile("^(# )?allow-compute-plugins = .*", flags=re.M),
'allow-compute-plugins = ["ai.backend.accelerator.cuda_open"]',
)
# TODO: let the installer enable the CUDA plugin only when it verifies CUDA availability or
Expand Down
2 changes: 1 addition & 1 deletion src/ai/backend/manager/models/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,7 @@ async def mutate(
)
set_if_set(props, data, "labels", clean_func=lambda v: {pair.key: pair.value for pair in v})

if props.resource_limits is not None:
if props.resource_limits is not Undefined:
resources_data = {}
for limit_option in props.resource_limits:
limit_data = {}
Expand Down
4 changes: 2 additions & 2 deletions src/ai/backend/storage/vast/vastdata_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
log = BraceStyleAdapter(logging.getLogger(__spec__.name)) # type: ignore[name-defined]


DEFAULT_ACCESS_TOKEN_SPAN: Final = timedelta(hours=1)
DEFAULT_REFRESH_TOKEN_SPAN: Final = timedelta(hours=24)
DEFAULT_ACCESS_TOKEN_SPAN: Final = timedelta(minutes=1)
DEFAULT_REFRESH_TOKEN_SPAN: Final = timedelta(minutes=10)


VASTQuotaID = NewType("VASTQuotaID", str)
Expand Down
Loading

0 comments on commit 81f8f6f

Please sign in to comment.