Skip to content

Commit

Permalink
Try to fix orphan processes
Browse files Browse the repository at this point in the history
  • Loading branch information
edeutsch committed Aug 10, 2023
1 parent 51c0754 commit 8260a47
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 6 deletions.
17 changes: 13 additions & 4 deletions code/ARAX/ARAXQuery/ARAX_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def query_return_message(self, query, mode='ARAX'):
'status': 'Running Async',
'message_id': None,
'message_code': 'Running',
'code_description': 'Query running via /asyncquery'
'code_description': 'Query running via /asyncquery (parent)'
}
query_tracker = ARAXQueryTracker()
query_tracker.update_tracker_entry(self.response.job_id, attributes)
Expand Down Expand Up @@ -695,6 +695,10 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'):
#### and the child continues to work on the query, eventually to finish and exit()
if mode == 'asynchronous':
callback = input_operations_dict['callback']
if callback.startswith('http://localhost'):
response.error(f"ERROR: A callback to localhost ({callback}) does not work. Please specify a resolvable callback URL")
return response

response.info(f"Everything seems in order to begin processing the query asynchronously. Processing will continue and Response will be posted to {callback}")
newpid = os.fork()
#### The parent returns to tell the caller that work will proceed
Expand All @@ -704,14 +708,19 @@ def execute_processing_plan(self,input_operations_dict, mode='ARAX'):
return response
#### The child continues
#### The child loses the MySQL connection of the parent, so need to reconnect
time.sleep(1)
response_cache.connect()
time.sleep(1)

child_pid = os.getpid()
response.debug(f"Child continues running. Child PID is {child_pid}. Record with alter_tracker_entry()")
attributes = {
'pid': os.getpid(),
'code_description': 'Query executing via /asyncquery'
'pid': child_pid,
'code_description': 'Query executing via /asyncquery (child)'
}
query_tracker = ARAXQueryTracker()
query_tracker.alter_tracker_entry(self.response.job_id, attributes)
alter_result = query_tracker.alter_tracker_entry(self.response.job_id, attributes)
response.debug(f"Child PID {child_pid} recorded with result {alter_result}")


#### If there is already a KG with edges, recompute the qg_keys
Expand Down
15 changes: 13 additions & 2 deletions code/ARAX/ARAXQuery/ARAX_query_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,26 +221,37 @@ def update_tracker_entry(self, tracker_id, attributes):
#### Alter arbitray values in a tracker entry
def alter_tracker_entry(self, tracker_id, attributes):
if tracker_id is None:
return
return("ERROR: tracker_id is None")

session = self.session
if session is None:
return
return("ERROR: session is None")

return_value = ''

tracker_entries = session.query(ARAXQuery).filter(ARAXQuery.query_id==tracker_id).all()
if len(tracker_entries) > 0:
tracker_entry = tracker_entries[0]
for key, value in attributes.items():
setattr(tracker_entry, key, value)
else:
return_value += 'ERROR: No tracker_entries '

ongoing_tracker_entries = session.query(ARAXOngoingQuery).filter(ARAXOngoingQuery.query_id==tracker_id).all()
if len(ongoing_tracker_entries) > 0:
ongoing_tracker_entry = ongoing_tracker_entries[0]
for key, value in attributes.items():
setattr(ongoing_tracker_entry, key, value)
else:
return_value += 'ERROR: No ongoing_tracker_entries '

session.commit()

if len(return_value) == 0:
return_value = 'OK'

return(return_value)


##################################################################################################
def get_instance_info(self):
Expand Down

0 comments on commit 8260a47

Please sign in to comment.