Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement agent status check CLI command #2332

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions changes/2332.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement agent status check CLI command
100 changes: 100 additions & 0 deletions src/ai/backend/agent/cli.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,107 @@
import asyncio
import pathlib
import sys
from pathlib import Path
from pprint import pformat
from typing import Any, cast

import click
from tabulate import tabulate

from ai.backend.agent.config import load_local_config
from ai.backend.cli.types import CliContextInfo
from ai.backend.common import config
from ai.backend.common.types import LogSeverity


@click.group()
def main():
"""The root entrypoint for unified CLI of agent"""
pass


async def inspect_server_status(cli_ctx: CliContextInfo, agent_pid: int) -> None:
command = f"ps -p '{agent_pid}' -f"
process = await asyncio.create_subprocess_shell(command, stdout=asyncio.subprocess.PIPE)
stdout, stderr = await process.communicate()
if stderr:
raise RuntimeError(f"Failed to execute the command: {command}")

lines = stdout.decode().splitlines()
process_list = []

for line in lines[1:]:
columns = line.split()
# Combine all text following UID, PID, PPID, C, STIME, TTY, TIME into CMD
process_info = columns[:7] + [" ".join(columns[7:])]
process_list.append(process_info)

print(tabulate(process_list, headers=lines[0].split(), tablefmt="pretty"))
pass


@main.command()
@click.pass_obj
@click.option(
"-f",
"--config-path",
"--config",
type=click.Path(
file_okay=True,
dir_okay=False,
exists=True,
path_type=pathlib.Path,
),
default=None,
help="The config file path. (default: ./agent.toml and /etc/backend.ai/agent.toml)",
)
@click.option(
"--debug",
is_flag=True,
help="Set the logging level to DEBUG",
)
@click.option(
"-s",
"--systemctl",
is_flag=True,
help="Include the systemctl status command result in the output",
)
@click.option(
"--log-level",
type=click.Choice([*LogSeverity], case_sensitive=False),
default=LogSeverity.INFO,
help="Set the logging verbosity level",
)
def status(
cli_ctx: CliContextInfo,
config_path: Path,
log_level: LogSeverity,
debug: bool = False,
systemctl: bool = False,
) -> None:
"""
Collect and print each agent process's status.
"""
try:
local_config = cast(Any, load_local_config(config_path, log_level, debug))
except config.ConfigurationError as e:
print(
"ConfigurationError: Could not read or validate the agent local config.",
file=sys.stderr,
)
print(pformat(e.invalid_data), file=sys.stderr)
raise click.Abort()

pid_filepath = local_config["agent"]["pid-file"]

if not pid_filepath.is_file():
print(
'ConfigurationError: "pid-file" not found in the configuration file.',
file=sys.stderr,
)
raise click.Abort()

with open(pid_filepath, "r") as file:
agent_pid = int(file.read())

asyncio.run(inspect_server_status(cli_ctx, agent_pid))
50 changes: 50 additions & 0 deletions src/ai/backend/agent/config.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import os
from pathlib import Path
from pprint import pprint

import trafaret as t

from ai.backend.common import config
from ai.backend.common import validators as tx
from ai.backend.common.types import LogSeverity

from .affinity_map import AffinityPolicy
from .stats import StatModes
Expand Down Expand Up @@ -138,3 +141,50 @@
t.Key("kernel-uid", optional=True): t.ToInt,
t.Key("kernel-gid", optional=True): t.ToInt,
}).allow_extra("*")


def load_local_config(
config_path: Path, log_level: LogSeverity, debug: bool = False
) -> dict[str, t.Any]:
# Determine where to read configuration.
raw_cfg, cfg_src_path = config.read_from_file(config_path, "agent")

# Override the read config with environment variables (for legacy).
config.override_with_env(raw_cfg, ("etcd", "namespace"), "BACKEND_NAMESPACE")
config.override_with_env(raw_cfg, ("etcd", "addr"), "BACKEND_ETCD_ADDR")
config.override_with_env(raw_cfg, ("etcd", "user"), "BACKEND_ETCD_USER")
config.override_with_env(raw_cfg, ("etcd", "password"), "BACKEND_ETCD_PASSWORD")
config.override_with_env(
raw_cfg, ("agent", "rpc-listen-addr", "host"), "BACKEND_AGENT_HOST_OVERRIDE"
)
config.override_with_env(raw_cfg, ("agent", "rpc-listen-addr", "port"), "BACKEND_AGENT_PORT")
config.override_with_env(raw_cfg, ("agent", "pid-file"), "BACKEND_PID_FILE")
config.override_with_env(raw_cfg, ("container", "port-range"), "BACKEND_CONTAINER_PORT_RANGE")
config.override_with_env(raw_cfg, ("container", "bind-host"), "BACKEND_BIND_HOST_OVERRIDE")
config.override_with_env(raw_cfg, ("container", "sandbox-type"), "BACKEND_SANDBOX_TYPE")
config.override_with_env(raw_cfg, ("container", "scratch-root"), "BACKEND_SCRATCH_ROOT")

if debug:
log_level = LogSeverity.DEBUG
config.override_key(raw_cfg, ("debug", "enabled"), log_level == LogSeverity.DEBUG)
config.override_key(raw_cfg, ("logging", "level"), log_level)
config.override_key(raw_cfg, ("logging", "pkg-ns", "ai.backend"), log_level)

# Validate and fill configurations
# (allow_extra will make configs to be forward-copmatible)
cfg = config.check(raw_cfg, agent_local_config_iv)

if cfg["agent"]["backend"] == AgentBackend.KUBERNETES:
if cfg["container"]["scratch-type"] == "k8s-nfs" and (
cfg["container"]["scratch-nfs-address"] is None
or cfg["container"]["scratch-nfs-options"] is None
):
raise ValueError("scratch-nfs-address and scratch-nfs-options are required for k8s-nfs")
if cfg["agent"]["backend"] == AgentBackend.DOCKER:
config.check(raw_cfg, docker_extra_config_iv)
if "debug" in cfg and cfg["debug"]["enabled"]:
print("== Agent configuration ==")
pprint(cfg)
cfg["_src"] = cfg_src_path

return cfg
93 changes: 22 additions & 71 deletions src/ai/backend/agent/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from ipaddress import _BaseAddress as BaseIPAddress
from ipaddress import ip_network
from pathlib import Path
from pprint import pformat, pprint
from pprint import pformat
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -75,13 +75,12 @@
from . import __version__ as VERSION
from .config import (
agent_etcd_config_iv,
agent_local_config_iv,
container_etcd_config_iv,
docker_extra_config_iv,
load_local_config,
)
from .exception import ResourceError
from .monitor import AgentErrorPluginContext, AgentStatsPluginContext
from .types import AgentBackend, LifecycleEvent, VolumeInfo
from .types import LifecycleEvent, VolumeInfo
from .utils import get_arch_name, get_subnet_ip

if TYPE_CHECKING:
Expand Down Expand Up @@ -949,7 +948,7 @@ async def server_main(
"--config",
type=click.Path(exists=True, dir_okay=False, path_type=Path),
default=None,
help="The config file path. (default: ./agent.conf and /etc/backend.ai/agent.conf)",
help="The config file path. (default: ./agent.toml and /etc/backend.ai/agent.toml)",
)
@click.option(
"--debug",
Expand All @@ -970,68 +969,20 @@ def main(
debug: bool = False,
) -> int:
"""Start the agent service as a foreground process."""
# Determine where to read configuration.
try:
raw_cfg, cfg_src_path = config.read_from_file(config_path, "agent")
local_config = cast(Any, load_local_config(config_path, log_level, debug))
except config.ConfigurationError as e:
print(
"ConfigurationError: Could not read or validate the storage-proxy local config:",
file=sys.stderr,
)
print(pformat(e.invalid_data), file=sys.stderr)
raise click.Abort()

# Override the read config with environment variables (for legacy).
config.override_with_env(raw_cfg, ("etcd", "namespace"), "BACKEND_NAMESPACE")
config.override_with_env(raw_cfg, ("etcd", "addr"), "BACKEND_ETCD_ADDR")
config.override_with_env(raw_cfg, ("etcd", "user"), "BACKEND_ETCD_USER")
config.override_with_env(raw_cfg, ("etcd", "password"), "BACKEND_ETCD_PASSWORD")
config.override_with_env(
raw_cfg, ("agent", "rpc-listen-addr", "host"), "BACKEND_AGENT_HOST_OVERRIDE"
)
config.override_with_env(raw_cfg, ("agent", "rpc-listen-addr", "port"), "BACKEND_AGENT_PORT")
config.override_with_env(raw_cfg, ("agent", "pid-file"), "BACKEND_PID_FILE")
config.override_with_env(raw_cfg, ("container", "port-range"), "BACKEND_CONTAINER_PORT_RANGE")
config.override_with_env(raw_cfg, ("container", "bind-host"), "BACKEND_BIND_HOST_OVERRIDE")
config.override_with_env(raw_cfg, ("container", "sandbox-type"), "BACKEND_SANDBOX_TYPE")
config.override_with_env(raw_cfg, ("container", "scratch-root"), "BACKEND_SCRATCH_ROOT")

if debug:
log_level = LogSeverity.DEBUG
config.override_key(raw_cfg, ("debug", "enabled"), log_level == LogSeverity.DEBUG)
config.override_key(raw_cfg, ("logging", "level"), log_level)
config.override_key(raw_cfg, ("logging", "pkg-ns", "ai.backend"), log_level)

# Validate and fill configurations
# (allow_extra will make configs to be forward-copmatible)
try:
cfg = config.check(raw_cfg, agent_local_config_iv)
if cfg["agent"]["backend"] == AgentBackend.KUBERNETES:
if cfg["container"]["scratch-type"] == "k8s-nfs" and (
cfg["container"]["scratch-nfs-address"] is None
or cfg["container"]["scratch-nfs-options"] is None
):
raise ValueError(
"scratch-nfs-address and scratch-nfs-options are required for k8s-nfs"
)
if cfg["agent"]["backend"] == AgentBackend.DOCKER:
config.check(raw_cfg, docker_extra_config_iv)
if "debug" in cfg and cfg["debug"]["enabled"]:
print("== Agent configuration ==")
pprint(cfg)
cfg["_src"] = cfg_src_path
except config.ConfigurationError as e:
print("ConfigurationError: Validation of agent local config has failed:", file=sys.stderr)
print("ConfigurationError: Validation of agent local config has failed.", file=sys.stderr)
print(pformat(e.invalid_data), file=sys.stderr)
raise click.Abort()

# FIXME: Remove this after ARM64 support lands on Jail
current_arch = get_arch_name()
if cfg["container"]["sandbox-type"] == "jail" and current_arch != "x86_64":
if local_config["container"]["sandbox-type"] == "jail" and current_arch != "x86_64":
print(f"ConfigurationError: Jail sandbox is not supported on architecture {current_arch}")
raise click.Abort()

rpc_host = cfg["agent"]["rpc-listen-addr"].host
rpc_host = local_config["agent"]["rpc-listen-addr"].host
if isinstance(rpc_host, BaseIPAddress) and (rpc_host.is_unspecified or rpc_host.is_link_local):
print(
"ConfigurationError: "
Expand All @@ -1040,22 +991,22 @@ def main(
)
raise click.Abort()

if os.getuid() != 0 and cfg["container"]["stats-type"] == "cgroup":
if os.getuid() != 0 and local_config["container"]["stats-type"] == "cgroup":
print(
"Cannot use cgroup statistics collection mode unless the agent runs as root.",
file=sys.stderr,
)
raise click.Abort()

if os.getuid() != 0 and cfg["container"]["scratch-type"] == "hostfile":
if os.getuid() != 0 and local_config["container"]["scratch-type"] == "hostfile":
print(
"Cannot use hostfile scratch type unless the agent runs as root.",
file=sys.stderr,
)
raise click.Abort()

if cli_ctx.invoked_subcommand is None:
if cfg["debug"]["coredump"]["enabled"]:
if local_config["debug"]["coredump"]["enabled"]:
if not sys.platform.startswith("linux"):
print(
"ConfigurationError: Storing container coredumps is only supported in Linux.",
Expand All @@ -1071,20 +1022,20 @@ def main(
file=sys.stderr,
)
raise click.Abort()
cfg["debug"]["coredump"]["core_path"] = Path(core_pattern).parent
local_config["debug"]["coredump"]["core_path"] = Path(core_pattern).parent

cfg["agent"]["pid-file"].write_text(str(os.getpid()))
image_commit_path = cfg["agent"]["image-commit-path"]
local_config["agent"]["pid-file"].write_text(str(os.getpid()))
image_commit_path = local_config["agent"]["image-commit-path"]
image_commit_path.mkdir(parents=True, exist_ok=True)
ipc_base_path = cfg["agent"]["ipc-base-path"]
ipc_base_path = local_config["agent"]["ipc-base-path"]
log_sockpath = ipc_base_path / f"agent-logger-{os.getpid()}.sock"
log_sockpath.parent.mkdir(parents=True, exist_ok=True)
log_endpoint = f"ipc://{log_sockpath}"
cfg["logging"]["endpoint"] = log_endpoint
local_config["logging"]["endpoint"] = log_endpoint
try:
logger = Logger(cfg["logging"], is_master=True, log_endpoint=log_endpoint)
logger = Logger(local_config["logging"], is_master=True, log_endpoint=log_endpoint)
with logger:
ns = cfg["etcd"]["namespace"]
ns = local_config["etcd"]["namespace"]
setproctitle(f"backend.ai: agent {ns}")
log.info("Backend.AI Agent {0}", VERSION)
log.info("runtime: {0}", utils.env_info())
Expand All @@ -1093,22 +1044,22 @@ def main(
if log_level == "DEBUG":
log_config.debug("debug mode enabled.")

if cfg["agent"]["event-loop"] == "uvloop":
if local_config["agent"]["event-loop"] == "uvloop":
import uvloop

uvloop.install()
log.info("Using uvloop as the event loop backend")
aiotools.start_server(
server_main_logwrapper,
num_workers=1,
args=(cfg, log_endpoint),
args=(local_config, log_endpoint),
wait_timeout=5.0,
)
log.info("exit.")
finally:
if cfg["agent"]["pid-file"].is_file():
if local_config["agent"]["pid-file"].is_file():
# check is_file() to prevent deleting /dev/null!
cfg["agent"]["pid-file"].unlink()
local_config["agent"]["pid-file"].unlink()
else:
# Click is going to invoke a subcommand.
pass
Expand Down
2 changes: 1 addition & 1 deletion src/ai/backend/agent/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ async def watcher_server(loop, pidx, args):
"--config",
type=click.Path(exists=True, dir_okay=False),
default=None,
help="The config file path. (default: ./agent.conf and /etc/backend.ai/agent.conf)",
help="The config file path. (default: ./agent.toml and /etc/backend.ai/agent.toml)",
)
@click.option(
"--debug",
Expand Down
Loading