-
Notifications
You must be signed in to change notification settings - Fork 20
/
matcher_queue.py
executable file
·203 lines (167 loc) · 5.86 KB
/
matcher_queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
#!/usr/bin/python3
import json
import os.path
import queue
import socketserver
import threading
from time import sleep
import requests.exceptions
from matcher import chat, database, mail, overpass, space_alert
from matcher.job_queue import JobManager
from matcher.view import app
app.config.from_object("config.default")
database.init_app(app, echo=False)
job_manager = JobManager()
def wait_for_slot(send_queue):
print("get status")
try:
status = overpass.get_status()
except overpass.OverpassError as e:
r = e.args[0]
body = f"URL: {r.url}\n\nresponse:\n{r.text}"
mail.send_mail("Overpass API unavailable", body)
send_queue.put({"type": "error", "msg": "Can't access overpass API"})
return False
except requests.exceptions.Timeout:
body = "Timeout talking to overpass API"
mail.send_mail("Overpass API timeout", body)
send_queue.put({"type": "error", "msg": "Can't access overpass API"})
return False
print("status:", status)
if not status["slots"]:
return True
secs = status["slots"][0]
if secs <= 0:
return True
send_queue.put({"type": "status", "wait": secs})
sleep(secs)
return True
def to_client(send_queue, msg_type, msg):
msg["type"] = msg_type
send_queue.put(msg)
def process_queue_loop():
with app.app_context():
while True:
process_queue()
def process_queue():
area, item = job_manager.get_next_job()
place = item["place"]
send_queue = item["queue"]
for num, chunk in enumerate(item["chunks"]):
oql = chunk.get("oql")
if not oql:
continue
filename = "overpass/" + chunk["filename"]
msg = {
"num": num,
"filename": chunk["filename"],
"place": place,
}
if not os.path.exists(filename):
space_alert.check_free_space(app.config)
if not wait_for_slot(send_queue):
return
to_client(send_queue, "run_query", msg)
while True:
print("run query")
try:
r = overpass.run_query(oql)
break
except overpass.RateLimited:
print("rate limited")
wait_for_slot(send_queue)
print("query complete")
with open(filename, "wb") as out:
out.write(r.content)
space_alert.check_free_space(app.config)
print(msg)
to_client(send_queue, "chunk", msg)
print("item complete")
send_queue.put({"type": "done"})
def get_pins(place):
"""Build pins from items in database."""
pins = []
for item in place.items:
lat, lon = item.coords()
pin = {
"qid": item.qid,
"lat": lat,
"lon": lon,
"label": item.label(),
}
if item.tags:
pin["tags"] = list(item.tags)
pins.append(pin)
return pins
class RequestHandler(socketserver.BaseRequestHandler):
def send_msg(self, msg):
return chat.send_json(self.request, msg)
def match_place(self, msg):
osm_type, osm_id = msg["osm_type"], msg["osm_id"]
t = threading.current_thread()
job_need_start = False
if not self.job_thread:
job_need_start = True
kwargs = {
key: msg.get(key) for key in ("user", "remote_addr", "user_agent")
}
kwargs["want_isa"] = set(msg.get("want_isa") or [])
self.job_thread = job_manager.new_job(osm_type, osm_id, **kwargs)
status_queue = queue.Queue()
updates = self.job_thread.subscribe(t.name, status_queue)
if job_need_start:
self.job_thread.start()
while True:
msg = updates.get()
try:
self.send_msg(msg)
if msg["type"] in ("done", "error"):
break
except BrokenPipeError:
self.job_thread.unsubscribe(t.name)
break
def handle_message(self, msg):
print(f"handle: {msg!r}")
if msg == "ping":
self.send_msg({"type": "pong"})
return
if msg.startswith("match"):
json_msg = json.loads(msg[6:])
self.job_thread = job_manager.get_job(
json_msg["osm_type"], json_msg["osm_id"]
)
return self.match_place(json_msg)
if msg == "jobs":
self.send_msg({"type": "jobs", "items": job_manager.job_list()})
return
if msg.startswith("stop"):
json_msg = json.loads(msg[5:])
job_manager.stop_job(json_msg["osm_type"], json_msg["osm_id"])
self.send_msg({"type": "stop", "success": True})
return
def handle(self):
print("New connection from %s:%s" % self.client_address)
msg = chat.read_line(self.request)
with app.app_context():
try:
return self.handle_message(msg)
except Exception as e:
error_str = f"{type(e).__name__}: {e}"
self.send_msg({"type": "error", "msg": error_str})
info = "matcher queue"
mail.send_traceback(info, prefix="matcher queue")
def main():
HOST, PORT = "localhost", 6030
overpass_thread = threading.Thread(target=process_queue_loop)
overpass_thread.daemon = True
overpass_thread.start()
socketserver.ThreadingTCPServer.allow_reuse_address = True
server = socketserver.ThreadingTCPServer((HOST, PORT), RequestHandler)
ip, port = server.server_address
server_thread = threading.Thread(target=server.serve_forever)
server_thread.name = "server thread"
server_thread.start()
print("Server loop running in thread:", server_thread.name)
server_thread.join()
if __name__ == "__main__":
main()