From 99cadc186a198edbef2a4cd76a3af840b12f33c6 Mon Sep 17 00:00:00 2001 From: Gyubong Lee Date: Thu, 12 Oct 2023 06:30:01 +0000 Subject: [PATCH] Remove useless Raft RedisConnection --- configs/manager/halfstack.toml | 15 ++++++- src/ai/backend/common/defs.py | 1 - src/ai/backend/manager/api/context.py | 1 - src/ai/backend/manager/config.py | 11 +++++- src/ai/backend/manager/server.py | 56 +++++++-------------------- 5 files changed, 37 insertions(+), 47 deletions(-) diff --git a/configs/manager/halfstack.toml b/configs/manager/halfstack.toml index 28929c08a91..5f9610456b3 100644 --- a/configs/manager/halfstack.toml +++ b/configs/manager/halfstack.toml @@ -34,7 +34,6 @@ hide-agents = true agent-selection-resource-priority = ["cuda", "rocm", "tpu", "cpu", "mem"] [raft] -raft-servers = "localhost:60151,localhost:60152,localhost:60153" raft-servers-start-index = 0 heartbeat-tick = 3 election-tick = 10 @@ -42,6 +41,20 @@ log-dir = "./" slog-level = "debug" log-level = "debug" +[[raft.peers]] +host = "127.0.0.1" +port = 60151 +node-id = 1 + +[[raft.peers]] +host = "127.0.0.1" +port = 60152 +node-id = 2 + +[[raft.peers]] +host = "127.0.0.1" +port = 60153 +node-id = 3 [docker-registry] ssl-verify = false diff --git a/src/ai/backend/common/defs.py b/src/ai/backend/common/defs.py index 023298883a1..9dadda4c90b 100644 --- a/src/ai/backend/common/defs.py +++ b/src/ai/backend/common/defs.py @@ -7,7 +7,6 @@ REDIS_IMAGE_DB: Final = 3 REDIS_STREAM_DB: Final = 4 REDIS_STREAM_LOCK: Final = 5 -REDIS_RAFT_DB: Final = 6 DEFAULT_FILE_IO_TIMEOUT: Final = 10 diff --git a/src/ai/backend/manager/api/context.py b/src/ai/backend/manager/api/context.py index 0dcd910c654..edbd09dea6c 100644 --- a/src/ai/backend/manager/api/context.py +++ b/src/ai/backend/manager/api/context.py @@ -61,7 +61,6 @@ class RootContext(BaseContext): redis_image: RedisConnectionInfo redis_stream: RedisConnectionInfo redis_lock: RedisConnectionInfo - redis_raft: RedisConnectionInfo shared_config: SharedConfig local_config: LocalConfig cors_options: CORSOptions diff --git a/src/ai/backend/manager/config.py b/src/ai/backend/manager/config.py index b53345e8c72..c9187770cba 100644 --- a/src/ai/backend/manager/config.py +++ b/src/ai/backend/manager/config.py @@ -297,7 +297,16 @@ t.Key("raft", default=None): ( t.Dict( { - t.Key("raft-servers"): t.Null | tx.DelimiterSeperatedList(tx.HostPortPair), + t.Key("peers"): t.List( + t.Dict( + { + t.Key("node-id"): t.Int, + t.Key("host"): t.String, + t.Key("port"): t.Int, + } + ) + ), + # t.Key("raft-servers"): t.Null | tx.DelimiterSeperatedList(tx.HostPortPair), t.Key("heartbeat-tick", default=None): t.Int | t.Null, t.Key("election-tick", default=None): t.Int | t.Null, t.Key("min-election-tick", default=None): t.Int | t.Null, diff --git a/src/ai/backend/manager/server.py b/src/ai/backend/manager/server.py index 08db344b19f..1b47dead5dc 100644 --- a/src/ai/backend/manager/server.py +++ b/src/ai/backend/manager/server.py @@ -31,13 +31,13 @@ import aiotools import click from aiohttp import web +from aiotools import process_index from raftify.config import RaftifyConfig from raftify.deserializer import init_rraft_py_deserializer from raftify.peers import Peer, Peers from raftify.raft_client import RaftClient from raftify.raft_cluster import RaftCluster from raftify.utils import SocketAddr -from redis.asyncio import Redis from setproctitle import setproctitle from ai.backend.common import redis_helper @@ -47,13 +47,11 @@ from ai.backend.common.defs import ( REDIS_IMAGE_DB, REDIS_LIVE_DB, - REDIS_RAFT_DB, REDIS_STAT_DB, REDIS_STREAM_DB, REDIS_STREAM_LOCK, ) from ai.backend.common.events import EventDispatcher, EventProducer, KernelLifecycleEventReason -from ai.backend.common.lock import RedisLock from ai.backend.common.logging import BraceStyleAdapter, Logger from ai.backend.common.plugin.hook import ALL_COMPLETED, PASSED, HookPluginContext from ai.backend.common.plugin.monitor import INCREMENT @@ -361,17 +359,12 @@ async def redis_ctx(root_ctx: RootContext) -> AsyncIterator[None]: root_ctx.shared_config.data["redis"], db=REDIS_STREAM_LOCK, ) - root_ctx.redis_raft = redis_helper.get_redis_object( - root_ctx.shared_config.data["redis"], - db=REDIS_RAFT_DB, - ) for redis_info in ( root_ctx.redis_live, root_ctx.redis_stat, root_ctx.redis_image, root_ctx.redis_stream, root_ctx.redis_lock, - root_ctx.redis_raft, ): await redis_helper.ping_redis_connection(redis_info.client) yield @@ -380,7 +373,6 @@ async def redis_ctx(root_ctx: RootContext) -> AsyncIterator[None]: await root_ctx.redis_stat.close() await root_ctx.redis_live.close() await root_ctx.redis_lock.close() - await root_ctx.redis_raft.close() @actxmgr @@ -673,7 +665,16 @@ async def raft_ctx(root_ctx: RootContext) -> AsyncIterator[None]: raft_configs = local_config.get("raft") if raft_configs is not None: - raft_server_addrs = raft_configs.pop("raft-servers") + peers = Peers( + { + int(entry["node-id"]): Peer(addr=SocketAddr(entry["host"], entry["port"])) + for entry in raft_configs.pop("peers") + } + ) + + raft_servers_start_index = int(raft_configs["raft-servers-start-index"]) + node_id = raft_servers_start_index + process_index.get() + 1 + current_node = [peer for id, peer in peers.data.items() if id == node_id][0] raft_configs_dict = { key.replace("-", "_"): grant_type(raft_configs[key]) for key in raft_configs @@ -681,43 +682,12 @@ async def raft_ctx(root_ctx: RootContext) -> AsyncIterator[None]: logger = RaftifyLogger(raft_configs_dict.get("log_level")) - assert raft_server_addrs is not None - raft_server_socket_addrs = [SocketAddr(addr.host, addr.port) for addr in raft_server_addrs] - - raft_servers_start_index = int(raft_configs["raft-servers-start-index"]) - redis_client = root_ctx.redis_raft.client - assert isinstance(redis_client, Redis) - - while True: - lock = RedisLock("next-node-id-lock", root_ctx.redis_raft) - async with lock: - raw_next_id = await redis_client.get("next-node-id") - node_id = int(raw_next_id) if raw_next_id else 1 - - if node_id >= raft_servers_start_index: - await redis_client.set("next-node-id", node_id + 1) - break - else: - logger.info("Waiting for raft servers including leader node to start...") - await asyncio.sleep(3) - - assert node_id <= len( - raft_server_addrs - ), "node_id should be less than or equal to the number of raft servers" - raft_cfg = RaftifyConfig( log_dir=raft_configs_dict["log_dir"], raft_config=RaftifyConfig.new_raft_config(raft_configs_dict), ) - peers = Peers( - { - (node_idx + 1): Peer(addr=addr) - for node_idx, addr in enumerate(raft_server_socket_addrs) - } - ) - - raft_addr = raft_server_addrs[node_id - 1] + raft_addr = current_node.addr store = HashStore(logger) @@ -744,7 +714,7 @@ async def raft_ctx(root_ctx: RootContext) -> AsyncIterator[None]: # Leader manager process is runnging on the other host. if raft_servers_start_index != 0: # Assume leader process is running. - cluster.get_peers().connect(1, raft_server_socket_addrs[0]) + cluster.get_peers().connect(1, peers[1].addr) leader_client = RaftClient(peers[1].addr) # TODO: Improve below logic