Skip to content

Commit

Permalink
Improve leader not found error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
jopemachine committed Oct 16, 2023
1 parent 99cadc1 commit d33e372
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/ai/backend/manager/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@
}
)
),
# t.Key("raft-servers"): t.Null | tx.DelimiterSeperatedList(tx.HostPortPair),
t.Key("cluster-leader-id", default=1): t.Int,
t.Key("heartbeat-tick", default=None): t.Int | t.Null,
t.Key("election-tick", default=None): t.Int | t.Null,
t.Key("min-election-tick", default=None): t.Int | t.Null,
Expand Down
27 changes: 19 additions & 8 deletions src/ai/backend/manager/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import aiomonitor
import aiotools
import click
import grpc
from aiohttp import web
from aiotools import process_index
from raftify.config import RaftifyConfig
Expand Down Expand Up @@ -667,8 +668,10 @@ async def raft_ctx(root_ctx: RootContext) -> AsyncIterator[None]:
if raft_configs is not None:
peers = Peers(
{
int(entry["node-id"]): Peer(addr=SocketAddr(entry["host"], entry["port"]))
for entry in raft_configs.pop("peers")
int(peer_config["node-id"]): Peer(
addr=SocketAddr(peer_config["host"], peer_config["port"])
)
for peer_config in raft_configs.pop("peers")
}
)

Expand Down Expand Up @@ -711,14 +714,22 @@ async def raft_ctx(root_ctx: RootContext) -> AsyncIterator[None]:
logger.info("Running in follower mode")
cluster.run_raft(node_id)

# Leader manager process is runnging on the other host.
leader_id = raft_configs["cluster-leader-id"]

leader_client = RaftClient(peers[leader_id].addr)

if raft_servers_start_index != 0:
# Assume leader process is running.
cluster.get_peers().connect(1, peers[1].addr)
cluster.get_peers().connect(leader_id, peers[leader_id].addr)

while True:
try:
await leader_client.member_bootstrap_ready(node_id, timeout=5.0)
break
except grpc.aio.AioRpcError:
logger.info("Waiting for leader node to be ready...")
await asyncio.sleep(3)
continue

leader_client = RaftClient(peers[1].addr)
# TODO: Improve below logic
await leader_client.member_bootstrap_ready(node_id, timeout=5.0)
asyncio.create_task(cluster.wait_for_termination())

assert root_ctx.raft_ctx.cluster.raft_node is not None, "RaftNode not initialized properly!"
Expand Down

0 comments on commit d33e372

Please sign in to comment.