Skip to content

Commit

Permalink
refactor: Utilization idle checker calculates resource name to avoid …
Browse files Browse the repository at this point in the history
…hard code
  • Loading branch information
fregataa committed Aug 12, 2024
1 parent 0b50036 commit 08ff6d2
Showing 1 changed file with 127 additions and 71 deletions.
198 changes: 127 additions & 71 deletions src/ai/backend/manager/idle.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,41 @@
import logging
import math
from abc import ABCMeta, abstractmethod
from collections import UserDict, defaultdict
from collections import UserDict
from collections.abc import (
Mapping,
Sequence,
)
from datetime import datetime, timedelta
from decimal import Decimal
from typing import (
TYPE_CHECKING,
Annotated,
Any,
ClassVar,
DefaultDict,
Final,
List,
Mapping,
MutableMapping,
NamedTuple,
Optional,
Sequence,
Set,
Type,
Union,
cast,
)

import aiotools
import sqlalchemy as sa
import trafaret as t
from aiotools import TaskGroupError
from dateutil.relativedelta import relativedelta
from pydantic import (
Field,
)
from sqlalchemy.engine import Row

import ai.backend.common.validators as tx
from ai.backend.common import msgpack, redis_helper
from ai.backend.common import typed_validators as tv
from ai.backend.common.config import BaseSchema, config_key_to_snake_case
from ai.backend.common.defs import REDIS_LIVE_DB, REDIS_STAT_DB
from ai.backend.common.distributed import GlobalTimer
from ai.backend.common.events import (
Expand All @@ -55,6 +61,7 @@
AccessKey,
BinarySize,
RedisConnectionInfo,
ResourceSlot,
SessionTypes,
)
from ai.backend.common.utils import nmget
Expand Down Expand Up @@ -130,14 +137,17 @@ class UtilizationResourceReport(UserDict):
def from_avg_threshold(
cls,
avg_utils: Mapping[str, float],
thresholds: Mapping[str, Union[int, float, Decimal, None]],
thresholds: ResourceThresholds,
exclusions: set[str],
) -> UtilizationResourceReport:
data: dict[str, UtilizationExtraInfo] = {
k: UtilizationExtraInfo(float(avg_utils[k]), float(threshold))
for k, threshold in thresholds.items()
if (threshold is not None) and (k not in exclusions)
}
data: dict[str, UtilizationExtraInfo] = {}
for resource_name, val in thresholds.model_dump().items():
_resource_name = cast(str, resource_name)
if val["average"] is None or _resource_name in exclusions:
continue
data[_resource_name] = UtilizationExtraInfo(
float(avg_utils[_resource_name]), float(val["average"])
)
return cls(data)

def to_dict(self, apply_unit: bool = True) -> dict[str, UtilizationExtraInfo]:
Expand Down Expand Up @@ -266,6 +276,7 @@ async def _do_idle_check(
kernels.c.session_type,
kernels.c.created_at,
kernels.c.occupied_slots,
kernels.c.requested_slots,
kernels.c.cluster_size,
users.c.created_at.label("user_created_at"),
])
Expand Down Expand Up @@ -737,6 +748,77 @@ async def get_checker_result(
return msgpack.unpackb(data) if data is not None else None


_resource_name_postfix = ("_util", "_mem", "_used")


def _get_trimmed_resource_name(name: str) -> str:
for p in _resource_name_postfix:
if name.endswith(p):
return name.rstrip(p)
return name


class ResourceThresholdValue(BaseSchema):
average: Annotated[
int | float | Decimal | None,
Field(
default=None,
description="Threshold value. Default is 'null', which means the idle checker does not take into account the resource.",
),
]
name: Annotated[
str | None,
Field(
default=None,
description=f"Resource name without postfix. Default is 'null'. If this value is 'null', any of {_resource_name_postfix} postfix of the resource name is removed.",
),
]


class ResourceThresholds(BaseSchema):
cpu_util: Annotated[
ResourceThresholdValue, Field(default_factory=lambda: ResourceThresholdValue())
]
mem: Annotated[ResourceThresholdValue, Field(default_factory=lambda: ResourceThresholdValue())]
cuda_util: Annotated[
ResourceThresholdValue, Field(default_factory=lambda: ResourceThresholdValue())
]
cuda_mem: Annotated[
ResourceThresholdValue, Field(default_factory=lambda: ResourceThresholdValue())
]
atom_mem: Annotated[
ResourceThresholdValue, Field(default_factory=lambda: ResourceThresholdValue())
]

def get_unique_resource_names(self) -> set[str]:
result: set[str] = set()
for resource_name, val in self.model_dump().items():
if (name := val["name"]) is not None:
result.add(name)
else:
result.add(_get_trimmed_resource_name(resource_name))
return result


class UtilizationConfig(BaseSchema):
time_window: tv.TimeDuration
initial_grace_period: tv.TimeDuration
thresholds_check_operator: Annotated[
ThresholdOperator,
Field(
default=ThresholdOperator.AND,
description=f"One of {[v.value for v in ThresholdOperator]}. Default is 'or'.",
),
]
resource_thresholds: Annotated[
ResourceThresholds,
Field(
default_factory=ResourceThresholds,
description="Resource thresholds used to check utilization idleness.",
),
]


class UtilizationIdleChecker(BaseIdleChecker):
"""
Checks the idleness of a session by the average utilization of compute devices.
Expand All @@ -748,56 +830,32 @@ class UtilizationIdleChecker(BaseIdleChecker):
report_key: ClassVar[str] = "utilization"
extra_info_key: ClassVar[str] = "utilization_extra"

_config_iv = t.Dict(
{
t.Key("time-window", default="10m"): tx.TimeDuration(),
t.Key("initial-grace-period", default="5m"): tx.TimeDuration(),
t.Key("thresholds-check-operator", default=ThresholdOperator.AND): tx.Enum(
ThresholdOperator
),
t.Key("resource-thresholds", default=None): t.Null
| t.Dict(
{
t.Key("cpu_util", default=None): t.Null | t.Dict({t.Key("average"): t.Float}),
t.Key("mem", default=None): t.Null | t.Dict({t.Key("average"): t.Float}),
t.Key("cuda_util", default=None): t.Null | t.Dict({t.Key("average"): t.Float}),
t.Key("cuda_mem", default=None): t.Null | t.Dict({t.Key("average"): t.Float}),
t.Key("atom_mem", default=None): t.Null | t.Dict({t.Key("average"): t.Float}),
},
),
},
).allow_extra("*")

resource_thresholds: MutableMapping[str, Union[int, float, Decimal, None]]
resource_thresholds: ResourceThresholds
resource_names: set[str]
thresholds_check_operator: ThresholdOperator
time_window: timedelta
initial_grace_period: timedelta
_evhandlers: List[EventHandler[None, AbstractEvent]]
slot_prefix_to_utilization_metric_map: Mapping[str, Set[str]] = {
"cpu": {"cpu_util"},
"mem": {"mem"},
"cuda": {"cuda_util", "cuda_mem"},
"atom": {"atom_mem"},
}

async def populate_config(self, raw_config: Mapping[str, Any]) -> None:
config = self._config_iv.check(raw_config)
raw_resource_thresholds = config.get("resource-thresholds")
if raw_resource_thresholds is not None:
self.resource_thresholds = {
k: nmget(v, "average") for k, v in raw_resource_thresholds.items()
}
else:
resources: list[str] = []
for r in self.slot_prefix_to_utilization_metric_map.values():
resources = [*resources, *r]
self.resource_thresholds = {r: None for r in resources}
self.thresholds_check_operator: ThresholdOperator = config.get("thresholds-check-operator")
self.time_window = config.get("time-window")
self.initial_grace_period = config.get("initial-grace-period")
config = UtilizationConfig(**config_key_to_snake_case(raw_config))
self.resource_thresholds = config.resource_thresholds
self.resource_names: set[str] = set(self.resource_thresholds.model_fields.keys())
self.thresholds_check_operator = config.thresholds_check_operator

def _to_timedelta(val: tv.TVariousDelta) -> timedelta:
match val:
case timedelta():
return val
case relativedelta():
raise ValueError("Should not use 'yr' or 'mo' unit.")

self.time_window = _to_timedelta(config.time_window)
self.initial_grace_period = _to_timedelta(config.initial_grace_period)

thresholds_log = " ".join([
f"{k}({threshold})," for k, threshold in self.resource_thresholds.items()
f"{k}({threshold['average']}),"
for k, threshold in self.resource_thresholds.model_dump().items()
])
log.info(
f"UtilizationIdleChecker(%): {thresholds_log} "
Expand Down Expand Up @@ -847,7 +905,8 @@ async def check_idleness(
interval = IdleCheckerHost.check_interval
# time_window: Utilization is calculated within this window.
time_window: timedelta = self.get_time_window(policy)
occupied_slots = kernel["occupied_slots"]
occupied_slots = cast(ResourceSlot, kernel["occupied_slots"])
requested_slots = cast(ResourceSlot, kernel["requested_slots"])
unavailable_resources: Set[str] = set()

util_series_key = f"session.{session_id}.util_series"
Expand Down Expand Up @@ -915,17 +974,17 @@ async def check_idleness(
return True

# Merge same type of (exclusive) resources as a unique resource with the values added.
# Example: {cuda.device: 0, cuda.shares: 0.5} -> {cuda: 0.5}.
unique_res_map: DefaultDict[str, Decimal] = defaultdict(Decimal)
for slot_name, alloc in occupied_slots.items():
unique_key = slot_name.split(".")[0]
unique_res_map[unique_key] += alloc
unique_resource_names: set[str] = set()
for slot_name in requested_slots.keys():
_slot_name = cast(str, slot_name)
unique_key, _, _ = _slot_name.partition(".")
unique_resource_names.add(unique_key)

# Do not take into account unallocated resources. For example, do not garbage collect
# a session without GPU even if cuda_util is configured in resource-thresholds.
for slot_prefix, util_metric in self.slot_prefix_to_utilization_metric_map.items():
if unique_res_map.get(slot_prefix, 0) == 0:
unavailable_resources.update(util_metric)
for _resource_name in self.resource_thresholds.get_unique_resource_names():
if _resource_name not in unique_resource_names:
unavailable_resources.add(_resource_name)

# Get current utilization data from all containers of the session.
if kernel["cluster_size"] > 1:
Expand All @@ -948,7 +1007,7 @@ async def check_idleness(
try:
util_series: dict[str, list[float]] = msgpack.unpackb(raw_util_series, use_list=True)
except TypeError:
util_series = {k: [] for k in self.resource_thresholds.keys()}
util_series = {k: [] for k in self.resource_names}

do_idle_check: bool = True

Expand Down Expand Up @@ -1036,7 +1095,7 @@ async def get_current_utilization(
will return the averaged values over the kernels for each utilization.
"""
try:
utilizations = {k: 0.0 for k in self.resource_thresholds.keys()}
utilizations = {k: 0.0 for k in self.resource_names}
live_stat = {}
divider = len(kernel_ids) if kernel_ids else 1
for kernel_id in kernel_ids:
Expand All @@ -1054,14 +1113,11 @@ async def get_current_utilization(
return None
live_stat = cast(dict[str, Any], msgpack.unpackb(raw_live_stat))
kernel_utils = {
k: float(nmget(live_stat, f"{k}.pct", 0.0))
for k in self.resource_thresholds.keys()
k: float(nmget(live_stat, f"{k}.pct", 0.0)) for k in self.resource_names
}

utilizations = {
k: utilizations[k] + kernel_utils[k] for k in self.resource_thresholds.keys()
}
utilizations = {k: utilizations[k] / divider for k in self.resource_thresholds.keys()}
utilizations = {k: utilizations[k] + kernel_utils[k] for k in self.resource_names}
utilizations = {k: utilizations[k] / divider for k in self.resource_names}

# NOTE: Manual calculation of mem utilization.
# mem.capacity does not report total amount of memory allocated to
Expand Down

0 comments on commit 08ff6d2

Please sign in to comment.