From d33e37298937fbc5284bfd11447032de97ee79b2 Mon Sep 17 00:00:00 2001 From: Gyubong Lee Date: Thu, 12 Oct 2023 07:50:46 +0000 Subject: [PATCH] Improve leader not found error handling --- src/ai/backend/manager/config.py | 2 +- src/ai/backend/manager/server.py | 27 +++++++++++++++++++-------- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/src/ai/backend/manager/config.py b/src/ai/backend/manager/config.py index c9187770cba..c6989ea8ca9 100644 --- a/src/ai/backend/manager/config.py +++ b/src/ai/backend/manager/config.py @@ -306,7 +306,7 @@ } ) ), - # t.Key("raft-servers"): t.Null | tx.DelimiterSeperatedList(tx.HostPortPair), + t.Key("cluster-leader-id", default=1): t.Int, t.Key("heartbeat-tick", default=None): t.Int | t.Null, t.Key("election-tick", default=None): t.Int | t.Null, t.Key("min-election-tick", default=None): t.Int | t.Null, diff --git a/src/ai/backend/manager/server.py b/src/ai/backend/manager/server.py index 1b47dead5dc..06eadeecf25 100644 --- a/src/ai/backend/manager/server.py +++ b/src/ai/backend/manager/server.py @@ -30,6 +30,7 @@ import aiomonitor import aiotools import click +import grpc from aiohttp import web from aiotools import process_index from raftify.config import RaftifyConfig @@ -667,8 +668,10 @@ async def raft_ctx(root_ctx: RootContext) -> AsyncIterator[None]: if raft_configs is not None: peers = Peers( { - int(entry["node-id"]): Peer(addr=SocketAddr(entry["host"], entry["port"])) - for entry in raft_configs.pop("peers") + int(peer_config["node-id"]): Peer( + addr=SocketAddr(peer_config["host"], peer_config["port"]) + ) + for peer_config in raft_configs.pop("peers") } ) @@ -711,14 +714,22 @@ async def raft_ctx(root_ctx: RootContext) -> AsyncIterator[None]: logger.info("Running in follower mode") cluster.run_raft(node_id) - # Leader manager process is runnging on the other host. + leader_id = raft_configs["cluster-leader-id"] + + leader_client = RaftClient(peers[leader_id].addr) + if raft_servers_start_index != 0: - # Assume leader process is running. - cluster.get_peers().connect(1, peers[1].addr) + cluster.get_peers().connect(leader_id, peers[leader_id].addr) + + while True: + try: + await leader_client.member_bootstrap_ready(node_id, timeout=5.0) + break + except grpc.aio.AioRpcError: + logger.info("Waiting for leader node to be ready...") + await asyncio.sleep(3) + continue - leader_client = RaftClient(peers[1].addr) - # TODO: Improve below logic - await leader_client.member_bootstrap_ready(node_id, timeout=5.0) asyncio.create_task(cluster.wait_for_termination()) assert root_ctx.raft_ctx.cluster.raft_node is not None, "RaftNode not initialized properly!"