From 9f599cd8d52d8df8aa5f376929a1de677df2fe8e Mon Sep 17 00:00:00 2001 From: octodog Date: Mon, 21 Oct 2024 22:38:34 +0900 Subject: [PATCH] refactor: Utilization idle checker calculates resource name to avoid hard code (#2702) (#2929) Co-authored-by: Sanghun Lee --- src/ai/backend/common/config.py | 11 ++ src/ai/backend/manager/idle.py | 258 ++++++++++++++++++++++---------- 2 files changed, 189 insertions(+), 80 deletions(-) diff --git a/src/ai/backend/common/config.py b/src/ai/backend/common/config.py index 30a0513ec1..70191dab85 100644 --- a/src/ai/backend/common/config.py +++ b/src/ai/backend/common/config.py @@ -5,6 +5,7 @@ from pathlib import Path from typing import Any, Dict, Mapping, MutableMapping, Optional, Tuple, Union, cast +import humps import tomli import trafaret as t from pydantic import ( @@ -254,3 +255,13 @@ def set_if_not_set(table: MutableMapping[str, Any], key_path: Tuple[str, ...], v table = table[k] if table.get(key_path[-1]) is None: table[key_path[-1]] = value + + +def config_key_to_snake_case(o: Any) -> Any: + match o: + case dict(): + return {humps.dekebabize(k): config_key_to_snake_case(v) for k, v in o.items()} + case list() | tuple() | set(): + return [config_key_to_snake_case(i) for i in o] + case _: + return o diff --git a/src/ai/backend/manager/idle.py b/src/ai/backend/manager/idle.py index 55f6e931b4..2f27b6785c 100644 --- a/src/ai/backend/manager/idle.py +++ b/src/ai/backend/manager/idle.py @@ -8,24 +8,22 @@ from collections import UserDict, defaultdict from collections.abc import ( Mapping, - MutableMapping, Sequence, ) from datetime import datetime, timedelta from decimal import Decimal from typing import ( TYPE_CHECKING, + Annotated, Any, ClassVar, - DefaultDict, Final, List, NamedTuple, Optional, - Set, + Self, Type, TypedDict, - Union, cast, ) @@ -33,11 +31,20 @@ import sqlalchemy as sa import trafaret as t from aiotools import TaskGroupError +from dateutil.relativedelta import relativedelta +from pydantic import ( + BaseModel, + Field, + GetCoreSchemaHandler, +) +from pydantic_core import core_schema from redis.asyncio import Redis from sqlalchemy.engine import Row import ai.backend.common.validators as tx from ai.backend.common import msgpack, redis_helper +from ai.backend.common import typed_validators as tv +from ai.backend.common.config import BaseSchema, config_key_to_snake_case from ai.backend.common.defs import REDIS_LIVE_DB, REDIS_STAT_DB from ai.backend.common.distributed import GlobalTimer from ai.backend.common.events import ( @@ -58,6 +65,7 @@ AccessKey, BinarySize, RedisConnectionInfo, + ResourceSlot, SessionTypes, ) from ai.backend.common.utils import nmget @@ -134,14 +142,16 @@ class UtilizationResourceReport(UserDict): def from_avg_threshold( cls, avg_utils: Mapping[str, float], - thresholds: Mapping[str, Union[int, float, Decimal, None]], + thresholds: ResourceThresholds, exclusions: set[str], ) -> UtilizationResourceReport: - data: dict[str, UtilizationExtraInfo] = { - k: UtilizationExtraInfo(float(avg_utils[k]), float(threshold)) - for k, threshold in thresholds.items() - if (threshold is not None) and (k not in exclusions) - } + data: dict[str, UtilizationExtraInfo] = {} + for resource_name, val in thresholds.unique_resource_name_map.items(): + _resource_name = cast(str, resource_name) + if val.average is None or _resource_name in exclusions: + continue + avg_util = avg_utils.get(_resource_name, 0) + data[_resource_name] = UtilizationExtraInfo(float(avg_util), float(val.average)) return cls(data) def to_dict(self, apply_unit: bool = True) -> dict[str, UtilizationExtraInfo]: @@ -162,7 +172,7 @@ class AppStreamingStatus(enum.Enum): HAS_ACTIVE_CONNECTIONS = 1 -class ThresholdOperator(enum.Enum): +class ThresholdOperator(enum.StrEnum): AND = "and" OR = "or" @@ -276,6 +286,7 @@ async def _do_idle_check( kernels.c.session_type, kernels.c.created_at, kernels.c.occupied_slots, + kernels.c.requested_slots, kernels.c.cluster_size, users.c.created_at.label("user_created_at"), ]) @@ -803,6 +814,98 @@ async def get_checker_result( return msgpack.unpackb(data) if data is not None else None +_metric_name_postfix = ("_util", "_mem", "_used") + + +def _get_resource_name_from_metric_key(name: str) -> str: + for p in _metric_name_postfix: + if name.endswith(p): + return name.rstrip(p) + return name + + +class ResourceThresholdValue(BaseModel): + average: Annotated[ + int | float | Decimal | None, + Field( + default=None, + description="Threshold value. Default is 'null', which means the idle checker does not take into account the resource.", + ), + ] + name: Annotated[ + str | None, + Field( + default=None, + description=( + f"Unique resource name that does not have any {_metric_name_postfix} postfix. " + f"Default is 'null'. If this value is 'null', any of {_metric_name_postfix} postfix of the resource name is removed." + ), + ), + ] + + +class ResourceThresholds(dict[str, ResourceThresholdValue]): + @classmethod + def default_factory(cls) -> Self: + return cls( + cpu_util=ResourceThresholdValue(average=None, name=None), + mem=ResourceThresholdValue(average=None, name=None), + cuda_util=ResourceThresholdValue(average=None, name=None), + cuda_mem=ResourceThresholdValue(average=None, name=None), + ) + + @property + def unique_resource_name_map(self) -> Mapping[str, ResourceThresholdValue]: + ret: dict[str, ResourceThresholdValue] = {} + for resource_name_or_metric_key, val in self.items(): + if (name := val.name) is not None: + ret[name] = val + else: + ret[_get_resource_name_from_metric_key(resource_name_or_metric_key)] = val + return ret + + @classmethod + def threshold_validator(cls, value: dict[str, Any]) -> Self: + return cls({k: ResourceThresholdValue(**v) for k, v in value.items()}) + + @classmethod + def __get_pydantic_core_schema__( + cls, + _source_type: type[Any], + _handler: GetCoreSchemaHandler, + ) -> core_schema.CoreSchema: + schema = core_schema.chain_schema([ + core_schema.dict_schema( + keys_schema=core_schema.str_schema(), values_schema=core_schema.any_schema() + ), + core_schema.no_info_plain_validator_function(cls.threshold_validator), + ]) + + return core_schema.json_or_python_schema( + json_schema=schema, + python_schema=schema, + ) + + +class UtilizationConfig(BaseSchema): + time_window: tv.TimeDuration + initial_grace_period: tv.TimeDuration + thresholds_check_operator: Annotated[ + ThresholdOperator, + Field( + default=ThresholdOperator.AND, + description=f"One of {[v.value for v in ThresholdOperator]}. Default is `and`.", + ), + ] + resource_thresholds: Annotated[ + ResourceThresholds, + Field( + default_factory=ResourceThresholds.default_factory, + description="Resource thresholds used to check utilization idleness.", + ), + ] + + class UtilizationIdleChecker(BaseIdleChecker): """ Checks the idleness of a session by the average utilization of compute devices. @@ -814,56 +917,31 @@ class UtilizationIdleChecker(BaseIdleChecker): report_key: ClassVar[str] = "utilization" extra_info_key: ClassVar[str] = "utilization_extra" - _config_iv = t.Dict( - { - t.Key("time-window", default="10m"): tx.TimeDuration(), - t.Key("initial-grace-period", default="5m"): tx.TimeDuration(), - t.Key("thresholds-check-operator", default=ThresholdOperator.AND): tx.Enum( - ThresholdOperator - ), - t.Key("resource-thresholds", default=None): t.Null - | t.Dict( - { - t.Key("cpu_util", default=None): t.Null | t.Dict({t.Key("average"): t.Float}), - t.Key("mem", default=None): t.Null | t.Dict({t.Key("average"): t.Float}), - t.Key("cuda_util", default=None): t.Null | t.Dict({t.Key("average"): t.Float}), - t.Key("cuda_mem", default=None): t.Null | t.Dict({t.Key("average"): t.Float}), - t.Key("atom_mem", default=None): t.Null | t.Dict({t.Key("average"): t.Float}), - }, - ), - }, - ).allow_extra("*") - - resource_thresholds: MutableMapping[str, Union[int, float, Decimal, None]] + resource_thresholds: ResourceThresholds + resource_names_to_check: set[str] thresholds_check_operator: ThresholdOperator time_window: timedelta initial_grace_period: timedelta _evhandlers: List[EventHandler[None, AbstractEvent]] - slot_prefix_to_utilization_metric_map: Mapping[str, Set[str]] = { - "cpu": {"cpu_util"}, - "mem": {"mem"}, - "cuda": {"cuda_util", "cuda_mem"}, - "atom": {"atom_mem"}, - } async def populate_config(self, raw_config: Mapping[str, Any]) -> None: - config = self._config_iv.check(raw_config) - raw_resource_thresholds = config.get("resource-thresholds") - if raw_resource_thresholds is not None: - self.resource_thresholds = { - k: nmget(v, "average") for k, v in raw_resource_thresholds.items() - } - else: - resources: list[str] = [] - for r in self.slot_prefix_to_utilization_metric_map.values(): - resources = [*resources, *r] - self.resource_thresholds = {r: None for r in resources} - self.thresholds_check_operator: ThresholdOperator = config.get("thresholds-check-operator") - self.time_window = config.get("time-window") - self.initial_grace_period = config.get("initial-grace-period") + config = UtilizationConfig(**config_key_to_snake_case(raw_config)) + self.resource_thresholds = config.resource_thresholds + self.resource_names_to_check: set[str] = set(self.resource_thresholds.keys()) + self.thresholds_check_operator = ThresholdOperator(config.thresholds_check_operator) + + def _to_timedelta(val: tv.TVariousDelta) -> timedelta: + match val: + case timedelta(): + return val + case relativedelta(): + raise ValueError("Should not use 'yr' or 'mo' unit.") + + self.time_window = _to_timedelta(config.time_window) + self.initial_grace_period = _to_timedelta(config.initial_grace_period) thresholds_log = " ".join([ - f"{k}({threshold})," for k, threshold in self.resource_thresholds.items() + f"{k}({threshold.average})," for k, threshold in self.resource_thresholds.items() ]) log.info( f"UtilizationIdleChecker(%): {thresholds_log} " @@ -916,8 +994,9 @@ async def check_idleness( interval = IdleCheckerHost.check_interval # time_window: Utilization is calculated within this window. time_window: timedelta = self.get_time_window(policy) - occupied_slots = kernel["occupied_slots"] - unavailable_resources: Set[str] = set() + occupied_slots = cast(ResourceSlot, kernel["occupied_slots"]) + requested_slots = cast(ResourceSlot, kernel["requested_slots"]) + excluded_resources: set[str] = set() util_series_key = f"session.{session_id}.util_series" util_first_collected_key = self._get_first_collected_key(session_id) @@ -983,18 +1062,22 @@ async def check_idleness( if db_now <= total_initial_grace_period_end: return True - # Merge same type of (exclusive) resources as a unique resource with the values added. - # Example: {cuda.device: 0, cuda.shares: 0.5} -> {cuda: 0.5}. - unique_res_map: DefaultDict[str, Decimal] = defaultdict(Decimal) - for slot_name, alloc in occupied_slots.items(): - unique_key = slot_name.split(".")[0] - unique_res_map[unique_key] += alloc + # Register requested resources. + requested_resource_names: set[str] = set() + for slot_name, val in requested_slots.items(): + if Decimal(val) == 0: + # The resource is not allocated to this session. + continue + _slot_name = cast(str, slot_name) + resource_name, _, _ = _slot_name.partition(".") + if resource_name: + requested_resource_names.add(resource_name) # Do not take into account unallocated resources. For example, do not garbage collect # a session without GPU even if cuda_util is configured in resource-thresholds. - for slot_prefix, util_metric in self.slot_prefix_to_utilization_metric_map.items(): - if unique_res_map.get(slot_prefix, 0) == 0: - unavailable_resources.update(util_metric) + for _resource_name in self.resource_thresholds.unique_resource_name_map.keys(): + if _resource_name not in requested_resource_names: + excluded_resources.add(_resource_name) # Get current utilization data from all containers of the session. if kernel["cluster_size"] > 1: @@ -1010,19 +1093,35 @@ async def check_idleness( return True # Update utilization time-series data. - raw_util_series = await redis_helper.execute( - self._redis_live, lambda r: r.get(util_series_key) + raw_util_series = cast( + Optional[bytes], + await redis_helper.execute(self._redis_live, lambda r: r.get(util_series_key)), ) - try: - util_series: dict[str, list[float]] = msgpack.unpackb(raw_util_series, use_list=True) - except TypeError: - util_series = {k: [] for k in self.resource_thresholds.keys()} + def default_util_series() -> dict[str, list[float]]: + return {resource: [] for resource in requested_resource_names} + + if raw_util_series is not None: + try: + raw_data: dict[str, list[float]] = msgpack.unpackb(raw_util_series, use_list=True) + util_series: dict[str, list[float]] = { + resource: v + for resource, v in raw_data.items() + if resource in requested_resource_names + } + except TypeError: + util_series = default_util_series() + else: + util_series = default_util_series() do_idle_check: bool = True for k in util_series: - util_series[k].append(current_utilizations[k]) + try: + current_util = current_utilizations[k] + except KeyError: + continue + util_series[k].append(current_util) if len(util_series[k]) > window_size: util_series[k].pop(0) else: @@ -1058,7 +1157,7 @@ def _avg(util_list: list[float]) -> float: avg_utils: Mapping[str, float] = {k: _avg(v) for k, v in util_series.items()} util_avg_thresholds = UtilizationResourceReport.from_avg_threshold( - avg_utils, self.resource_thresholds, unavailable_resources + avg_utils, self.resource_thresholds, excluded_resources ) report = { "thresholds_check_operator": self.thresholds_check_operator.value, @@ -1107,7 +1206,7 @@ async def get_current_utilization( will return the averaged values over the kernels for each utilization. """ try: - utilizations = {k: 0.0 for k in self.resource_thresholds.keys()} + utilizations: defaultdict[str, float] = defaultdict(float) live_stat = {} divider = len(kernel_ids) if kernel_ids else 1 for kernel_id in kernel_ids: @@ -1126,13 +1225,12 @@ async def get_current_utilization( live_stat = cast(dict[str, Any], msgpack.unpackb(raw_live_stat)) kernel_utils = { k: float(nmget(live_stat, f"{k}.pct", 0.0)) - for k in self.resource_thresholds.keys() + for k in self.resource_names_to_check } - utilizations = { - k: utilizations[k] + kernel_utils[k] for k in self.resource_thresholds.keys() - } - utilizations = {k: utilizations[k] / divider for k in self.resource_thresholds.keys()} + for resource, val in kernel_utils.items(): + utilizations[resource] = utilizations[resource] + val + total_utilizations = {k: v / divider for k, v in utilizations.items()} # NOTE: Manual calculation of mem utilization. # mem.capacity does not report total amount of memory allocated to @@ -1140,8 +1238,8 @@ async def get_current_utilization( # executing. So, we just replace it with the value of occupied slot. mem_slots = float(occupied_slots.get("mem", 0)) mem_current = float(nmget(live_stat, "mem.current", 0.0)) - utilizations["mem"] = mem_current / mem_slots * 100 if mem_slots > 0 else 0 - return utilizations + total_utilizations["mem"] = mem_current / mem_slots * 100 if mem_slots > 0 else 0 + return total_utilizations except Exception as e: _msg = f"Unable to collect utilization for idleness check (kernels:{kernel_ids})" log.warning(_msg, exc_info=e)