Skip to content

Commit

Permalink
change setExecutionStatus* to setExecutionStatus(...,*), fix scripts …
Browse files Browse the repository at this point in the history
…around accordingly
  • Loading branch information
eudoxos committed Oct 16, 2024
1 parent 6324b57 commit f4e3a75
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 55 deletions.
12 changes: 6 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ run: $(TASKS)


# if any task fails, use kill -TERM $(MAKPID) to teminate everything immediately
MAKEPID:= $(shell echo $$PPID)
DIE := kill -TERM $(shell echo $$PPID)


ns:
python3 -m Pyro5.nameserver --port 11001
python3 -m Pyro5.nameserver --port 11001 || $(DIE)
mongo:
mkdir -p mongodb-tmp~ && /usr/bin/mongod --port 11002 --noauth --dbpath=./mongodb-tmp~ --logpath=/dev/null --logappend || kill -TERM $(MAKEPID)
mkdir -p mongodb-tmp~ && /usr/bin/mongod --port 11002 --noauth --dbpath=./mongodb-tmp~ --logpath=/dev/null --logappend || $(DIE)
rest:
# MUPIFDB_DRY_RUN=1
sleep 2 && cd mupifDB/api && MUPIFDB_MONGODB_PORT=11002 MUPIFDB_REST_SERVER=http://127.0.0.1:11003 MUPIFDB_LOG_LEVEL=DEBUG MUPIFDB_RESTAPI_HOST=localhost MUPIFDB_RESTAPI_PORT=11003 PYTHONPATH=../.. python3 main.py || kill -TERM $(MAKEPID)
sleep 2 && cd mupifDB/api && MUPIFDB_MONGODB_PORT=11002 MUPIFDB_REST_SERVER=http://127.0.0.1:11003 MUPIFDB_LOG_LEVEL=DEBUG MUPIFDB_RESTAPI_HOST=localhost MUPIFDB_RESTAPI_PORT=11003 PYTHONPATH=../.. python3 main.py || $(DIE)
web:
sleep 7 && cd webapi && MUPIFDB_MONGODB_PORT=11002 MUPIFDB_REST_SERVER=http://127.0.0.1:11003 MUPIFDB_WEB_FAKE_AUTH=1 FLASK_APP=index.py PYTHONPATH=.. python3 -m flask run --debug --no-reload --host 127.0.0.1 --port 11004 || kill -TERM $(MAKEPID)
sleep 7 && cd webapi && MUPIFDB_MONGODB_PORT=11002 MUPIFDB_REST_SERVER=http://127.0.0.1:11003 MUPIFDB_WEB_FAKE_AUTH=1 FLASK_APP=index.py PYTHONPATH=.. python3 -m flask run --debug --no-reload --host 127.0.0.1 --port 11004 || $(DIE)
browse:
sleep 9 # && xdg-open http://127.0.0.1:11004
scheduler:
sleep 7 && MUPIF_LOG_LEVEL=DEBUG MUPIFDB_REST_SERVER=http://127.0.0.1:11003 MUPIF_NS=localhost:11001 PYTHONPATH=.. python3 -c 'from mupifDB import workflowscheduler as ws; ws.LOOP_SLEEP_SEC=5; ws.schedulerStatFile="./sched-stat.json"; ws.main()' || kill -TERM $(MAKEPID)
sleep 7 && MUPIF_LOG_LEVEL=DEBUG MUPIFDB_REST_SERVER=http://127.0.0.1:11003 MUPIF_NS=localhost:11001 PYTHONPATH=.. python3 -c 'from mupifDB import workflowscheduler as ws; ws.LOOP_SLEEP_SEC=5; ws.schedulerStatFile="./sched-stat.json"; ws.main()' || $(DIE)
44 changes: 12 additions & 32 deletions mupifDB/api/client_mupif.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,17 @@ def getExecutionRecord(weid: str) -> models.WorkflowExecution_Model:
response = rGet(f"executions/{weid}")
return models.WorkflowExecution_Model.model_validate(response.json())

def getScheduledExecutions(num_limit=None):
def getScheduledExecutions(num_limit: int=None):
return getExecutionRecords(status="Scheduled", num_limit=num_limit)

def getPendingExecutions(num_limit=None):
def getPendingExecutions(num_limit: int=None):
return getExecutionRecords(status="Pending", num_limit=num_limit)

def scheduleExecution(execution_id):
def scheduleExecution(execution_id: str):
response = rPatch(f"executions/{execution_id}/schedule")
return response.json()

def setExecutionParameter(execution_id, param, value, val_type="str"):
def setExecutionParameter(execution_id: str, param: str, value: Any, val_type="str"):
response = rPatch(f"executions/{execution_id}", data=json.dumps({"key": str(param), "value": value}))
return response.json()

Expand All @@ -129,37 +129,17 @@ def setExecutionAttemptsCount(execution_id, val):
return setExecutionParameter(execution_id, "Attempts", str(val), "int")


def setExecutionStatusScheduled(execution_id):
return setExecutionParameter(execution_id, "Status", "Scheduled")

def setExecutionStatusCreated(execution_id): # only reverted
setExecutionParameter(execution_id, "SubmittedDate", "")
return setExecutionParameter(execution_id, "Status", "Created")


def setExecutionStatusPending(execution_id, reverted=False):
if reverted:
pass
# setExecutionParameter(execution_id, "StartDate", "")
else:
def setExecutionStatus(execution_id: str, status: models.ExecutionStatus_Literal, revertPending=False):
if status=='Created': setExecutionParameter(execution_id, "SubmittedDate", str(datetime.datetime.now()))
elif status=='Pending' and not revertPending:
setExecutionParameter(execution_id, "SubmittedDate", str(datetime.datetime.now()))
setExecutionAttemptsCount(execution_id, 0)
return setExecutionParameter(execution_id, "Status", "Pending")


def setExecutionStatusRunning(execution_id):
setExecutionParameter(execution_id, "StartDate", str(datetime.datetime.now()))
return setExecutionParameter(execution_id, "Status", "Running")


def setExecutionStatusFinished(execution_id):
setExecutionParameter(execution_id, "EndDate", str(datetime.datetime.now()))
return setExecutionParameter(execution_id, "Status", "Finished")

elif status=='Running':
setExecutionParameter(execution_id, "StartDate", str(datetime.datetime.now()))
elif status in ('Finished','Failed'):
setExecutionParameter(execution_id, "EndDate", str(datetime.datetime.now()))
return setExecutionParameter(execution_id, "Status", status)

def setExecutionStatusFailed(execution_id):
setExecutionParameter(execution_id, "EndDate", str(datetime.datetime.now()))
return setExecutionParameter(execution_id, "Status", "Failed")

def createExecution(wid: str, version: int, ip: str, no_onto=False):
wec=models.WorkflowExecutionCreate_Model(wid=wid,version=version,ip=ip,no_onto=no_onto)
Expand Down
5 changes: 0 additions & 5 deletions mupifDB/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,11 +339,6 @@ def _get_execution_output_item(uid: str, name: str):
return get_execution_io_item(uid, name, '', inputs=False)


class M_IOData_link(BaseModel):
ExecID: str
Name: str
ObjID: str


class M_IODataSetContainer(BaseModel):
link: typing.Optional[dict] = None
Expand Down
10 changes: 5 additions & 5 deletions mupifDB/workflow_execution_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def downloadWorkflowFiles(eid):
if args.download:
downloadWorkflowFiles(weid)

mupifDB.restApiControl.setExecutionStatusRunning(weid)
mupifDB.restApiControl.setExecutionStatus(weid,'Running')

# add REST logging handler for this weid, add 'weid' field to every message automatically
import mupifDB.restLogger
Expand All @@ -89,7 +89,7 @@ def downloadWorkflowFiles(eid):
workflow_record = mupifDB.restApiControl.getWorkflowRecordGeneral(execution_record.WorkflowID, execution_record.WorkflowVersion)
if workflow_record is None:
log.error("Workflow not found")
mupifDB.restApiControl.setExecutionStatusFailed(weid)
mupifDB.restApiControl.setExecutionStatus(weid,'Failed')
sys.exit(1)

#
Expand Down Expand Up @@ -120,15 +120,15 @@ def downloadWorkflowFiles(eid):
log.error('Not enough resources')
mupifDB.restApiControl.setExecutionStatusFailed(weid)
sys.exit(2)
mupifDB.restApiControl.setExecutionStatusFailed(weid)
mupifDB.restApiControl.setExecutionStatus(weid,'Failed')
sys.exit(1)

except:
log.info("Unknown error")
if workflow is not None:
workflow.terminate()
mupifDB.restApiControl.setExecutionStatusFailed(weid)
mupifDB.restApiControl.setExecutionStatus(weid,'Failed')
sys.exit(1)

mupifDB.restApiControl.setExecutionStatusFinished(weid)
mupifDB.restApiControl.setExecutionStatus(weid,'Finished')
sys.exit(0)
14 changes: 7 additions & 7 deletions mupifDB/workflowscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,15 +379,15 @@ def executeWorkflow_inner2(lock, schedulerStat, we_id: str, we_rec, workflow_rec
except Exception as e:
log.exception('Error in executeWorkflow_inner2')
# set execution code to failed ...yes or no?
restApiControl.setExecutionStatusFailed(we_id)
restApiControl.setExecutionStatus(we_id,'Failed')
my_email.sendEmailAboutExecutionStatus(we_id)
return None
# execute
log.info("Executing we_id %s, tempdir %s" % (we_id, tempDir))
# update status
updateStatRunning(lock, schedulerStat, we_id, wid)
#runningJobs[we_id]=wid # for runtime monitoring
restApiControl.setExecutionStatusRunning(we_id)
restApiControl.setExecutionStatus(we_id,'Running')
restApiControl.setExecutionAttemptsCount(we_id, we_rec.Attempts+1)
# uses the same python interpreter as the current process
cmd = [sys.executable, execScript, '-eid', str(we_id)]
Expand Down Expand Up @@ -433,17 +433,17 @@ def executeWorkflow_inner2(lock, schedulerStat, we_id: str, we_rec, workflow_rec
# set execution code to completed
if completed == 0:
log.warning("Workflow execution %s Finished" % we_id)
restApiControl.setExecutionStatusFinished(we_id)
restApiControl.setExecutionStatus(we_id,'Finished')
my_email.sendEmailAboutExecutionStatus(we_id)
return we_id, ExecutionResult.Finished # XXX ??
elif completed == 1:
log.warning("Workflow execution %s Failed" % we_id)
restApiControl.setExecutionStatusFailed(we_id)
restApiControl.setExecutionStatus(we_id,'Failed')
my_email.sendEmailAboutExecutionStatus(we_id)
return we_id, ExecutionResult.Failed # XXX ??
elif completed == 2:
log.warning("Workflow execution %s could not be initialized due to lack of resources" % we_id)
restApiControl.setExecutionStatusPending(we_id, True)
restApiControl.setExecutionStatus(we_id, 'Pending', revertPending=True)
else:
pass

Expand Down Expand Up @@ -644,7 +644,7 @@ def main():
# check number of attempts for execution
if int(wed.Attempts) > 10:
try:
restApiControl.setExecutionStatusCreated(weid)
restApiControl.setExecutionStatus(weid,'Created')
if api_type != 'granta':
my_email.sendEmailAboutExecutionStatus(weid)
except Exception as e:
Expand All @@ -656,7 +656,7 @@ def main():
# add the correspoding weid to the pool, change status to scheduled
res = False
try:
res = restApiControl.setExecutionStatusScheduled(weid)
res = restApiControl.setExecutionStatus(weid,'Scheduled')
except Exception as e:
log.exception()

Expand Down

0 comments on commit f4e3a75

Please sign in to comment.