-
Notifications
You must be signed in to change notification settings - Fork 2
/
slave_sync_notify.py
314 lines (269 loc) · 15.2 KB
/
slave_sync_notify.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
import os
import traceback
import subprocess
import logging
import pytz
import socket
from datetime import datetime,date,timedelta
import time
from slave_sync_task import (
update_feature_job,update_feature_metadata_job,remove_feature_job,update_auth_job,update_access_rules_job,
update_wmsstore_job,update_wmslayer_job,remove_wmslayer_job,remove_wmsstore_job,
update_layergroup_job,remove_layergroup_job,
JOB_DEF_INDEX,JOB_TYPE_INDEX,jobname,
empty_gwc_layer_job,empty_gwc_group_job,empty_gwc_feature_job,update_workspace_job,
update_livelayer_job,remove_livelayer_job,empty_gwc_livelayer_job,update_livestore_job,remove_livestore_job,
)
from slave_sync_env import (
CODE_BRANCH,LISTEN_CHANNELS,get_version,SLAVE_NAME,SYNC_STATUS_PATH,now
)
PATH = os.path.dirname(os.path.realpath(__file__))
CACHE_PATH = SYNC_STATUS_PATH
MASTER_PGSQL_HOST = os.environ.get("MASTER_PGSQL_HOST")
MASTER_PGSQL_DATABASE = os.environ.get("MASTER_PGSQL_DATABASE")
MASTER_PGSQL_SCHEMA = os.environ.get("MASTER_PGSQL_SCHEMA","public")
MASTER_PGSQL_PORT = os.environ.get("MASTER_PGSQL_PORT","5432")
MASTER_PGSQL_USERNAME = os.environ.get("MASTER_PGSQL_USERNAME")
MASTER_PGSQL_PASSWORD = os.environ.get("MASTER_PGSQL_PASSWORD")
feedback_disabled = not all([MASTER_PGSQL_HOST,MASTER_PGSQL_DATABASE,MASTER_PGSQL_SCHEMA,MASTER_PGSQL_PORT,MASTER_PGSQL_USERNAME,SLAVE_NAME])
logger = logging.getLogger(__name__)
if not feedback_disabled:
sql_cmd = ["psql","-h",MASTER_PGSQL_HOST,"-p",MASTER_PGSQL_PORT,"-d",MASTER_PGSQL_DATABASE,"-U",MASTER_PGSQL_USERNAME,"-w","-c",None]
env = os.environ.copy()
env["PGPASSWORD"] = MASTER_PGSQL_PASSWORD
class SlaveServerSyncNotify(object):
_failed_sql_file_template = os.path.join(CACHE_PATH,'failed_response_{0}.sql')
@classmethod
def _current_failed_sql_file(cls):
"""
Return the current sql file for saving the failed sql
"""
return cls._failed_sql_file_template.format(date.today().strftime('%Y-%m-%d'))
@classmethod
def _failed_sql_files(cls):
"""
Return the active failed sql files which need to be executed again.
Only the failed sql occured in recent two days' are required to execute; this can prevent non successful sql from being executed for ever.
"""
t = date.today()
return [cls._failed_sql_file_template.format(d.strftime('%Y-%m-%d')) for d in [t - timedelta(1),t]]
_sql_separator='/*-------------------------------------------------------------------------------------------------------------------------------*/'
@classmethod
def exec_failed_sql(cls):
for sql_file in cls._failed_sql_files():
if not os.path.exists(sql_file):
#no failed feedbacks
continue
failed_sqls = []
with open(sql_file,'r') as f:
for sql in f.read().split(cls._sql_separator):
if not sql.strip(): continue
sql_cmd[len(sql_cmd) - 1] = sql
sql_process = subprocess.Popen(sql_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
sql_output = sql_process.communicate()
if sql_output[1] and sql_output[1].strip():
logger.info("stderr: {}".format(sql_output[1]))
if sql_process.returncode != 0:
logger.error("Execute failed sql failed with return code ({0})".format(sql_process.returncode))
failed_sqls.append(sql.strip())
if failed_sqls:
#some feedbacks are executed failed
with open(sql_file,'w') as f:
f.write("{0}{1}{0}".format(os.linesep,cls._sql_separator).join(failed_sqls))
else:
#failed feedbacks are executed successfully, remove the failed feedbacks.
os.remove(sql_file)
@classmethod
def _save_failed_sql(cls,sql):
f_name = cls._current_failed_sql_file()
f_exists = os.path.exists(f_name)
with open(f_name,'a' if f_exists else 'w') as f:
if f_exists:
f.write("{1}{2}{1}{0}".format(sql,os.linesep,cls._sql_separator))
else:
f.write(sql)
@classmethod
def send_last_poll_time(cls):
if feedback_disabled: return
if not hasattr(cls,"_listen_channels"):
cls._listen_channels = ",".join(LISTEN_CHANNELS)
last_poll_time = now()
sql = """
DO
$$BEGIN
IF EXISTS (SELECT 1 FROM {0}.monitor_slaveserver WHERE name='{1}') THEN
UPDATE {0}.monitor_slaveserver SET listen_channels='{2}', last_poll_time='{3}',code_version='{4}' WHERE name='{1}';
ELSE
INSERT INTO {0}.monitor_slaveserver (name,register_time,listen_channels,last_poll_time,code_version) VALUES ('{1}','{3}','{2}','{3}','{4}');
END IF;
END$$;
""".format(MASTER_PGSQL_SCHEMA,SLAVE_NAME,cls._listen_channels,last_poll_time,"{0} ({1})".format(get_version(),CODE_BRANCH))
sql_cmd[len(sql_cmd) - 1] = sql
sql_process = subprocess.Popen(sql_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
sql_output = sql_process.communicate()
if sql_output[1] and sql_output[1].strip():
logger.info("stderr: {}".format(sql_output[1]))
if sql_process.returncode != 0:
logger.error("Update the last sync info in master db failed with return code ({0})".format(sql_process.returncode))
@classmethod
def send_last_sync_time(cls,pull_status):
if feedback_disabled: return
last_sync_time = now()
last_sync_message = str(pull_status).replace("'","''")
sql = """
DO
$$BEGIN
UPDATE {0}.monitor_slaveserver SET last_sync_time='{2}', last_sync_message='{3}' WHERE name='{1}';
END$$;
""".format(MASTER_PGSQL_SCHEMA,SLAVE_NAME,last_sync_time,last_sync_message)
#logger.info("hg pull status notify: \r\n" + sql)
sql_cmd[len(sql_cmd) - 1] = sql
sql_process = subprocess.Popen(sql_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
sql_output = sql_process.communicate()
if sql_output[1] and sql_output[1].strip():
logger.info("stderr: {}".format(sql_output[1]))
if sql_process.returncode != 0:
logger.error("Update the last sync info in master db failed with return code ({0})".format(sql_process.returncode))
@classmethod
def send_feature_sync_status(cls,task,remove=False):
if feedback_disabled:
logger.info("Notify feature is disabled.")
return
try:
sync_succeed = task["status"].is_succeed
sync_message = str(task["status"]).replace("'","''")
sync_time = task["status"].last_process_time or now()
if sync_succeed:
if remove:
#remove publish succeed.
sql_template = """
DELETE FROM {0}.monitor_publishsyncstatus AS b
USING {0}.monitor_slaveserver AS a
WHERE a.id = b.slave_server_id
AND a.name = '{1}'
AND b.publish = '{2}'
"""
else:
#update publish succeed
sql_template = """
DO
$$BEGIN
IF EXISTS (SELECT 1 FROM {0}.monitor_slaveserver a JOIN {0}.monitor_publishsyncstatus b ON a.id=b.slave_server_id WHERE a.name='{1}' AND b.publish='{2}') THEN
UPDATE {0}.monitor_publishsyncstatus AS b SET deploied_job_id='{3}', deploied_job_batch_id='{4}', deploy_message='{5}',deploy_time= {6}, sync_job_id = null, sync_job_batch_id = null, sync_time = null, sync_message = null,preview_file={7},spatial_type='{8}'
FROM {0}.monitor_slaveserver AS a
WHERE b.slave_server_id = a.id AND a.name='{1}' AND b.publish='{2}';
ELSE
INSERT INTO {0}.monitor_publishsyncstatus
(slave_server_id,publish,deploied_job_id,deploied_job_batch_id,deploy_message,deploy_time,sync_job_id,sync_job_batch_id,sync_message,sync_time,preview_file,spatial_type)
SELECT id,'{2}','{3}','{4}','{5}',{6},null,null,null,null,{7},'{8}'
FROM {0}.monitor_slaveserver
WHERE name = '{1}';
END IF;
END$$;
"""
elif not remove:
sql_template = """
DO
$$BEGIN
IF EXISTS (SELECT 1 FROM {0}.monitor_slaveserver a JOIN {0}.monitor_publishsyncstatus b ON a.id=b.slave_server_id WHERE a.name='{1}' AND b.publish='{2}') THEN
UPDATE {0}.monitor_publishsyncstatus AS b SET sync_job_id='{3}', sync_job_batch_id='{4}', sync_message='{5}',sync_time= {6},spatial_type='{8}'
FROM {0}.monitor_slaveserver AS a
WHERE b.slave_server_id = a.id AND a.name='{1}' AND b.publish='{2}';
ELSE
INSERT INTO {0}.monitor_publishsyncstatus
(slave_server_id,publish,deploied_job_id,deploied_job_batch_id,deploy_message,deploy_time,sync_job_id,sync_job_batch_id,sync_message,sync_time,spatial_type)
SELECT id,'{2}',null,null,null,null,'{3}','{4}','{5}',{6},'{8}'
FROM {0}.monitor_slaveserver
WHERE name = '{1}';
END IF;
END$$;
"""
else:
return
preview_file = task["status"].get_task_status("get_layer_preview").get_message("preview_file") or None
preview_file = "'{}'".format(preview_file) if preview_file else "null"
sql = sql_template.format(MASTER_PGSQL_SCHEMA, SLAVE_NAME,task['name'], task.get("job_id"), task.get("job_batch_id"), sync_message, "'{0}'".format(sync_time) if sync_time else 'null',preview_file,task.get('spatial_type',''))
#logger.info("Feature sync status notify: \r\n" + sql)
sql_cmd[len(sql_cmd) - 1] = sql
sql_process = subprocess.Popen(sql_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
sql_output = sql_process.communicate()
if sql_output[1] and sql_output[1].strip():
logger.info("stderr: {}".format(sql_output[1]))
if sql_process.returncode != 0 and sql_output[1].find("ERROR") >= 0:
cls._save_failed_sql(sql)
logger.error("Update sync status of task ({0}) in master db failed with return code ({1})".format(task['job_file'],sql_process.returncode))
except:
logger.error("Update sync status of task ({0}) in master db failed. {1}".format(task['job_file'],traceback.format_exc()))
@classmethod
def send_job_sync_status(cls,task,task_metadata):
if feedback_disabled:
logger.info("Notify feature is disabled.")
return
try:
task_type = task_metadata[JOB_DEF_INDEX][JOB_TYPE_INDEX]
task_name = jobname(task,task_metadata)
action = task["action"]
sync_succeed = task["status"].is_succeed
sync_message = str(task["status"]).replace("'","''")
sync_time = task["status"].last_process_time
#update publish succeed
sql_template = """
DO
$$BEGIN
IF EXISTS (SELECT 1 FROM {0}.monitor_slaveserver a JOIN {0}.monitor_tasksyncstatus b ON a.id=b.slave_server_id WHERE a.name='{1}' AND b.task_type='{2}' AND b.task_name='{3}' AND b.action='{4}') THEN
UPDATE {0}.monitor_tasksyncstatus AS b SET sync_succeed={5}, sync_message='{6}',sync_time= {7},preview_file={8}
FROM {0}.monitor_slaveserver AS a
WHERE b.slave_server_id = a.id AND a.name='{1}' AND b.task_type='{2}' AND b.task_name='{3}' AND b.action='{4}';
ELSE
INSERT INTO {0}.monitor_tasksyncstatus
(slave_server_id,task_type,task_name,action,sync_succeed,sync_message,sync_time,preview_file)
SELECT id,'{2}','{3}','{4}',{5},'{6}',{7},{8}
FROM {0}.monitor_slaveserver
WHERE name = '{1}';
END IF;
END$$;
"""
preview_file = task["status"].get_task_status("get_layer_preview").get_message("preview_file") or None
preview_file = "'{}'".format(preview_file) if preview_file else "null"
sql = sql_template.format(MASTER_PGSQL_SCHEMA, SLAVE_NAME,task_type,task_name,action,sync_succeed, sync_message, "'{0}'".format(sync_time) if sync_time else 'null',preview_file)
#logger.info("Notify: \r\n" + sql)
sql_cmd[len(sql_cmd) - 1] = sql
sql_process = subprocess.Popen(sql_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
sql_output = sql_process.communicate()
if sql_output[1] and sql_output[1].strip():
logger.info("stderr: {}".format(sql_output[1]))
if sql_process.returncode != 0 and sql_output[1].find("ERROR") >= 0:
cls._save_failed_sql(sql)
logger.error("Update sync status of task ({0}) in master db failed with return code ({1})".format(task['job_file'],sql_process.returncode))
except:
logger.error("Update sync status of task ({0}) in master db failed. {1}".format(task['job_file'],traceback.format_exc()))
task_name = lambda task: "{0}:{1}".format(task["workspace"],task["name"])
def send_update_feature_notify(sync_job,task_metadata):
SlaveServerSyncNotify.send_feature_sync_status(sync_job,False)
def send_remove_feature_notify(sync_job,task_metadata):
SlaveServerSyncNotify.send_feature_sync_status(sync_job,True)
def send_job_notify(sync_job,task_metadata):
SlaveServerSyncNotify.send_job_sync_status(sync_job,task_metadata)
tasks_metadata = [
("send_notify", update_feature_job, None, task_name, send_update_feature_notify),
("send_notify", remove_feature_job, None, task_name, send_remove_feature_notify),
("send_notify", update_feature_metadata_job , None, task_name,send_job_notify),
("send_notify", update_auth_job , None, "update_roles",send_job_notify),
("send_notify", update_access_rules_job, None, "update_access_rules", send_job_notify),
("send_notify", update_wmsstore_job,None, task_name, send_job_notify),
("send_notify", update_wmslayer_job,None, task_name, send_job_notify),
("send_notify", remove_wmslayer_job, None, task_name, send_job_notify),
("send_notify", remove_wmsstore_job, None, task_name, send_job_notify),
("send_notify", remove_wmslayer_job, None, task_name, send_job_notify),
("send_notify", empty_gwc_layer_job , None, task_name, send_job_notify),
("send_notify", empty_gwc_group_job , None, task_name, send_job_notify),
("send_notify", empty_gwc_feature_job , None, task_name, send_job_notify),
("send_notify", update_layergroup_job, None, task_name, send_job_notify),
("send_notify", remove_layergroup_job, None, task_name, send_job_notify),
("send_notify", update_workspace_job, None, lambda task: task["schema"], send_job_notify),
("send_notify", update_livelayer_job,None, task_name, send_job_notify),
("send_notify", remove_livelayer_job, None, task_name, send_job_notify),
("send_notify", update_livestore_job, None, task_name, send_job_notify),
("send_notify", remove_livestore_job, None, task_name, send_job_notify),
("send_notify", empty_gwc_livelayer_job, None, task_name, send_job_notify),
]