-
Notifications
You must be signed in to change notification settings - Fork 0
/
coordinator.py
48 lines (41 loc) · 1.62 KB
/
coordinator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import httpx
import random
class Coordinator:
def __init__(self, config, cluster, rangemap):
self.config = config
self.cluster = cluster
self.rangemap = rangemap
def matches(self, query, nodes):
node = self.random_healthy_node(nodes)
if not node: return []
try:
req = httpx.get(f"{node}/search", params={"q": query})
if req.status_code == 200:
# todo decode
return req.json()
except Exception as err:
print(f"Unexpected {err=}, when proxying to {node} with query {query} {type(err)=}")
return []
def random_healthy_node(self, node_hosts):
nodes = []
# todo change cluster.nodes to hash
for node in self.cluster.nodes:
if node["host"] in node_hosts and node["status"]["healthy"]: nodes.append(node["host"])
if len(nodes) == 0: return
return random.choice(nodes)
def add(self, word, nodes):
return self._write_word("POST", nodes, word)
def remove(self, word, nodes):
return self._write_word("DELETE", nodes, word)
def _write_word(self, method, nodes, word):
# TODO: proxy in parallel to <replication_factor> healthy nodes, not just one
node = self.random_healthy_node(nodes)
if not node: return False
return self._proxy_request(method, f"{node}/api/v1/words", { "word": word })
def _proxy_request(self, method, path, word, data=None):
try:
code = httpx.request(method, f"{node}/api/v1/words", data=data).status_code
if code >= 200 and code < 300: return True
except Exception as err:
print(f"Unexpected {err=}, when proxying {method} to {node} with word {word} {type(err)=}")
return False