-
Notifications
You must be signed in to change notification settings - Fork 1
/
Syncer.py
255 lines (223 loc) · 12.1 KB
/
Syncer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
import contextlib
import logging
import time
import traceback
from datetime import datetime, timezone
import neo4j.exceptions
from AbstractRequester import AbstractRequester
from AgentSyncer import AgentSyncer
from EMInfraImporter import EMInfraImporter
from EventProcessors.NieuwAssetProcessor import NieuwAssetProcessor
from EventProcessors.RelatieProcessor import RelatieProcessor
from EventProcessors.RelationNotCreatedError import BetrokkeneRelationNotCreatedError, AssetRelationNotCreatedError
from FeedEventsCollector import FeedEventsCollector
from FeedEventsProcessor import FeedEventsProcessor
from Neo4JConnector import Neo4JConnector
class Syncer:
def __init__(self, connector: Neo4JConnector, requester: AbstractRequester, eminfra_importer: EMInfraImporter, settings=None):
self.connector = connector
self.requester = requester
self.eminfra_importer = eminfra_importer
self.events_collector = FeedEventsCollector(eminfra_importer)
self.events_processor = FeedEventsProcessor(connector, eminfra_importer)
self.settings = settings
self.sync_start = None
self.sync_end = None
if 'time' in self.settings:
self.sync_start = self.settings['time']['start']
self.sync_end = self.settings['time']['end']
def start_syncing(self, stop_when_fully_synced=False):
while True:
try:
params = self.connector.get_page_by_get_or_create_params()
if params['freshstart']:
self.perform_fresh_start_sync(params)
else:
self.perform_syncing(stop_when_fully_synced=stop_when_fully_synced)
if stop_when_fully_synced:
break
except Exception as ex:
logging.error(ex)
print(ex)
logging.error(ex.args)
logging.error('Could not start synchronising. Do you have connection to the internet and the Neo4J database?')
logging.info('Retrying in 30 seconds.')
time.sleep(30)
def perform_fresh_start_sync(self, params: dict):
page_size = params['pagesize']
page = params['page']
if page == -1:
tx_context = self.connector.start_transaction()
self.save_last_feedevent_to_params(page_size, tx_context=tx_context)
self.connector.commit_transaction(tx_context)
self.check_apoc()
while True:
# main sync loop for getting all assets/agents/relations
params = self.connector.get_page_by_get_or_create_params()
otltype = params['otltype']
cursor = params['cursor']
page_size = params['pagesize']
if otltype == -1:
otltype = 1
if otltype >= 5:
break
tx_context = self.connector.start_transaction()
self.eminfra_importer.cursor = cursor
if otltype == 1:
agents = self.eminfra_importer.import_agents_from_webservice_page_by_page(page_size)
agentsyncer = AgentSyncer(emInfraImporter=self.eminfra_importer, neo4J_connector=self.connector)
agentsyncer.tx_context = tx_context
agentsyncer.update_agents(agents)
elif otltype == 2:
assets = self.eminfra_importer.import_assets_from_webservice_page_by_page(page_size)
asset_processor = NieuwAssetProcessor()
asset_processor.tx_context = tx_context
for asset in assets:
asset_processor.create_asset_from_jsonLd_dict(asset)
elif otltype == 3:
start = time.time()
assetrelaties = self.eminfra_importer.import_assetrelaties_from_webservice_page_by_page(page_size)
relatie_processor = RelatieProcessor()
relatie_processor.tx_context = tx_context
for assetrelatie in assetrelaties:
with contextlib.suppress(AssetRelationNotCreatedError):
relatie_processor.create_assetrelatie_from_jsonLd_dict(assetrelatie)
end = time.time()
logging.info(f'time for 100 relations: {round(end - start, 2)}')
elif otltype == 4:
start = time.time()
betrokkenerelaties = self.eminfra_importer.import_betrokkenerelaties_from_webservice_page_by_page(page_size)
relatie_processor = RelatieProcessor()
relatie_processor.tx_context = tx_context
for betrokkenerelatie in betrokkenerelaties:
with contextlib.suppress(BetrokkeneRelationNotCreatedError):
relatie_processor.create_betrokkenerelatie_from_jsonLd_dict(betrokkenerelatie)
end = time.time()
logging.info(f'time for 100 betrokkenerelations: {round(end - start, 2)}')
cursor = self.eminfra_importer.cursor
if cursor == '':
otltype += 1
self.connector.save_props_to_params(tx=tx_context, params=
{'otltype': otltype,
'cursor': cursor,
'last_update_utc': datetime.now(timezone.utc)})
if otltype >= 5:
self.connector.save_props_to_params(tx=tx_context, params=
{'freshstart': False})
self.connector.commit_transaction(tx_context)
def save_last_feedevent_to_params(self, page_size: int, tx_context):
start_num = 1
step = 5
start_num = self.recur_exp_find_start_page(current_num=start_num, step=step, page_size=page_size)
current_page_num = self.recur_find_last_page(current_num=int(start_num / step), current_step=int(start_num / step),
step=step, page_size=page_size)
# doublecheck
event_page = self.eminfra_importer.get_events_from_page(page_num=current_page_num, page_size=page_size)
links = event_page['links']
prev_link = next((l for l in links if l['rel'] == 'previous'), None)
if prev_link is not None:
raise RuntimeError('algorithm did not result in the last page')
# find last event_id
entries = event_page['entries']
last_event_id = entries[0]['id']
self.connector.save_props_to_params(tx=tx_context, params=
{'event_id': last_event_id,
'page': current_page_num})
def recur_exp_find_start_page(self, current_num, step, page_size):
event_page = self.eminfra_importer.get_events_from_page(page_num=current_num, page_size=page_size)
if 'message' not in event_page:
return self.recur_exp_find_start_page(current_num=current_num * step, step=step, page_size=100)
return current_num
def recur_find_last_page(self, current_num, current_step, step, page_size):
new_i = 0
for i in range(step + 1):
new_num = current_num + current_step * i
event_page = self.eminfra_importer.get_events_from_page(page_num=new_num, page_size=page_size)
if 'message' in event_page:
new_i = i - 1
break
if current_step == 1:
return current_num + current_step * new_i
return self.recur_find_last_page(current_num + current_step * new_i,
int(current_step / step), step, page_size)
def calculate_sync_allowed_by_time(self):
if self.sync_start is None:
return True
start_struct = time.strptime(self.sync_start, "%H:%M:%S")
end_struct = time.strptime(self.sync_end, "%H:%M:%S")
now = datetime.now(timezone.utc)
start = now.replace(hour=start_struct.tm_hour, minute=start_struct.tm_min, second=start_struct.tm_sec)
end = now.replace(hour=end_struct.tm_hour, minute=end_struct.tm_min, second=end_struct.tm_sec)
return start < now < end
def perform_syncing(self, stop_when_fully_synced=False):
while self.calculate_sync_allowed_by_time():
params = self.connector.get_page_by_get_or_create_params()
current_page = params['page']
completed_event_id = params['event_id']
page_size = params['pagesize']
logging.info(f'starting a sync cycle, page: {str(current_page + 1)} event_id: {str(completed_event_id)}')
start = time.time()
eventsparams_to_process = self.events_collector.collect_starting_from_page(current_page, completed_event_id,
page_size)
total_events = sum(len(lists) for lists in eventsparams_to_process.event_dict.values())
if total_events == 0:
with self.connector.driver.session(database=self.connector.db) as session:
tx = session.begin_transaction()
self.connector.save_props_to_params(params={'last_sync_utc': datetime.now(timezone.utc)}, tx=tx)
tx.commit()
tx.close()
if stop_when_fully_synced:
logging.info("The database is fully synced.")
break
logging.info("The database is fully synced. Continuing keep up to date in 30 seconds")
time.sleep(30) # wait 30 seconds to prevent overloading API
continue
end = time.time()
self.log_eventparams(eventsparams_to_process.event_dict, round(end - start, 2),
event_timestamp=eventsparams_to_process.event_timestamp)
try:
self.events_processor.process_events(eventsparams_to_process)
except BetrokkeneRelationNotCreatedError as ex:
# agents syncen of na 24h
self.events_processor.tx_context.rollback()
if len(ex.agent_uuids) > 0:
self.sync_all_agents()
if len(ex.asset_uuids) > 0:
self.events_processor.tx_context = self.connector.start_transaction()
event_processor = self.events_processor.create_processor("NIEUW_ONDERDEEL", self.events_processor.tx_context)
event_processor.process(ex.asset_uuids)
self.connector.commit_transaction(self.events_processor.tx_context)
except AssetRelationNotCreatedError:
self.events_processor.tx_context.rollback()
self.sync_all_agents()
time.sleep(30)
except Exception as exc:
traceback.print_exception(exc)
self.events_processor.tx_context.rollback()
time.sleep(30)
@staticmethod
def log_eventparams(event_dict, time: float, event_timestamp: datetime):
total = sum(len(events) for events in event_dict.values())
logging.info(f'fetched {total} asset events to sync in {time} seconds, starting from {event_timestamp}')
for k, v in event_dict.items():
if len(v) > 0:
logging.info(f'number of events of type {k}: {len(v)}')
def sync_all_agents(self):
logging.info('sync_all_agents started')
agentsyncer = AgentSyncer(emInfraImporter=self.eminfra_importer, neo4J_connector=self.connector)
agentsyncer.sync_agents()
logging.info('sync_all_agents done')
def check_apoc(self):
apoc_check_query = 'RETURN apoc.version() AS output;'
tx_context = self.connector.start_transaction()
try:
result = tx_context.run(apoc_check_query)
version = result.data()[0]['output']
logging.info(f'The apoc plugin is installed, version {version}')
tx_context.commit()
except neo4j.exceptions.ClientError as exc:
if "Unknown function 'apoc.version'" in exc.message:
tx_context.rollback()
logging.error('The apoc plugin is not enabled in this Neo4J database. Please install it first.')
raise RuntimeError('The apoc plugin is not enabled in this Neo4J database. Please install it first.'
) from exc