Skip to content

Commit

Permalink
Remove useless Raft RedisConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
jopemachine committed Oct 16, 2023
1 parent 920231a commit 99cadc1
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 47 deletions.
15 changes: 14 additions & 1 deletion configs/manager/halfstack.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,27 @@ hide-agents = true
agent-selection-resource-priority = ["cuda", "rocm", "tpu", "cpu", "mem"]

[raft]
raft-servers = "localhost:60151,localhost:60152,localhost:60153"
raft-servers-start-index = 0
heartbeat-tick = 3
election-tick = 10
log-dir = "./"
slog-level = "debug"
log-level = "debug"

[[raft.peers]]
host = "127.0.0.1"
port = 60151
node-id = 1

[[raft.peers]]
host = "127.0.0.1"
port = 60152
node-id = 2

[[raft.peers]]
host = "127.0.0.1"
port = 60153
node-id = 3

[docker-registry]
ssl-verify = false
Expand Down
1 change: 0 additions & 1 deletion src/ai/backend/common/defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
REDIS_IMAGE_DB: Final = 3
REDIS_STREAM_DB: Final = 4
REDIS_STREAM_LOCK: Final = 5
REDIS_RAFT_DB: Final = 6


DEFAULT_FILE_IO_TIMEOUT: Final = 10
1 change: 0 additions & 1 deletion src/ai/backend/manager/api/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ class RootContext(BaseContext):
redis_image: RedisConnectionInfo
redis_stream: RedisConnectionInfo
redis_lock: RedisConnectionInfo
redis_raft: RedisConnectionInfo
shared_config: SharedConfig
local_config: LocalConfig
cors_options: CORSOptions
Expand Down
11 changes: 10 additions & 1 deletion src/ai/backend/manager/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,16 @@
t.Key("raft", default=None): (
t.Dict(
{
t.Key("raft-servers"): t.Null | tx.DelimiterSeperatedList(tx.HostPortPair),
t.Key("peers"): t.List(
t.Dict(
{
t.Key("node-id"): t.Int,
t.Key("host"): t.String,
t.Key("port"): t.Int,
}
)
),
# t.Key("raft-servers"): t.Null | tx.DelimiterSeperatedList(tx.HostPortPair),
t.Key("heartbeat-tick", default=None): t.Int | t.Null,
t.Key("election-tick", default=None): t.Int | t.Null,
t.Key("min-election-tick", default=None): t.Int | t.Null,
Expand Down
56 changes: 13 additions & 43 deletions src/ai/backend/manager/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
import aiotools
import click
from aiohttp import web
from aiotools import process_index
from raftify.config import RaftifyConfig
from raftify.deserializer import init_rraft_py_deserializer
from raftify.peers import Peer, Peers
from raftify.raft_client import RaftClient
from raftify.raft_cluster import RaftCluster
from raftify.utils import SocketAddr
from redis.asyncio import Redis
from setproctitle import setproctitle

from ai.backend.common import redis_helper
Expand All @@ -47,13 +47,11 @@
from ai.backend.common.defs import (
REDIS_IMAGE_DB,
REDIS_LIVE_DB,
REDIS_RAFT_DB,
REDIS_STAT_DB,
REDIS_STREAM_DB,
REDIS_STREAM_LOCK,
)
from ai.backend.common.events import EventDispatcher, EventProducer, KernelLifecycleEventReason
from ai.backend.common.lock import RedisLock
from ai.backend.common.logging import BraceStyleAdapter, Logger
from ai.backend.common.plugin.hook import ALL_COMPLETED, PASSED, HookPluginContext
from ai.backend.common.plugin.monitor import INCREMENT
Expand Down Expand Up @@ -361,17 +359,12 @@ async def redis_ctx(root_ctx: RootContext) -> AsyncIterator[None]:
root_ctx.shared_config.data["redis"],
db=REDIS_STREAM_LOCK,
)
root_ctx.redis_raft = redis_helper.get_redis_object(
root_ctx.shared_config.data["redis"],
db=REDIS_RAFT_DB,
)
for redis_info in (
root_ctx.redis_live,
root_ctx.redis_stat,
root_ctx.redis_image,
root_ctx.redis_stream,
root_ctx.redis_lock,
root_ctx.redis_raft,
):
await redis_helper.ping_redis_connection(redis_info.client)
yield
Expand All @@ -380,7 +373,6 @@ async def redis_ctx(root_ctx: RootContext) -> AsyncIterator[None]:
await root_ctx.redis_stat.close()
await root_ctx.redis_live.close()
await root_ctx.redis_lock.close()
await root_ctx.redis_raft.close()


@actxmgr
Expand Down Expand Up @@ -673,51 +665,29 @@ async def raft_ctx(root_ctx: RootContext) -> AsyncIterator[None]:
raft_configs = local_config.get("raft")

if raft_configs is not None:
raft_server_addrs = raft_configs.pop("raft-servers")
peers = Peers(
{
int(entry["node-id"]): Peer(addr=SocketAddr(entry["host"], entry["port"]))
for entry in raft_configs.pop("peers")
}
)

raft_servers_start_index = int(raft_configs["raft-servers-start-index"])
node_id = raft_servers_start_index + process_index.get() + 1
current_node = [peer for id, peer in peers.data.items() if id == node_id][0]

raft_configs_dict = {
key.replace("-", "_"): grant_type(raft_configs[key]) for key in raft_configs
}

logger = RaftifyLogger(raft_configs_dict.get("log_level"))

assert raft_server_addrs is not None
raft_server_socket_addrs = [SocketAddr(addr.host, addr.port) for addr in raft_server_addrs]

raft_servers_start_index = int(raft_configs["raft-servers-start-index"])
redis_client = root_ctx.redis_raft.client
assert isinstance(redis_client, Redis)

while True:
lock = RedisLock("next-node-id-lock", root_ctx.redis_raft)
async with lock:
raw_next_id = await redis_client.get("next-node-id")
node_id = int(raw_next_id) if raw_next_id else 1

if node_id >= raft_servers_start_index:
await redis_client.set("next-node-id", node_id + 1)
break
else:
logger.info("Waiting for raft servers including leader node to start...")
await asyncio.sleep(3)

assert node_id <= len(
raft_server_addrs
), "node_id should be less than or equal to the number of raft servers"

raft_cfg = RaftifyConfig(
log_dir=raft_configs_dict["log_dir"],
raft_config=RaftifyConfig.new_raft_config(raft_configs_dict),
)

peers = Peers(
{
(node_idx + 1): Peer(addr=addr)
for node_idx, addr in enumerate(raft_server_socket_addrs)
}
)

raft_addr = raft_server_addrs[node_id - 1]
raft_addr = current_node.addr

store = HashStore(logger)

Expand All @@ -744,7 +714,7 @@ async def raft_ctx(root_ctx: RootContext) -> AsyncIterator[None]:
# Leader manager process is runnging on the other host.
if raft_servers_start_index != 0:
# Assume leader process is running.
cluster.get_peers().connect(1, raft_server_socket_addrs[0])
cluster.get_peers().connect(1, peers[1].addr)

leader_client = RaftClient(peers[1].addr)
# TODO: Improve below logic
Expand Down

0 comments on commit 99cadc1

Please sign in to comment.