Skip to content

Commit

Permalink
Changes allover the place, so that hte (simulated) web client adds wo…
Browse files Browse the repository at this point in the history
…rkflow, creates execution, runs it.
  • Loading branch information
eudoxos committed Oct 11, 2024
1 parent f23865c commit c2ca415
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 133 deletions.
11 changes: 6 additions & 5 deletions mupifDB/api/client_mupif.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@ def getWorkflowRecord(wid: str) -> models.Workflow_Model|None:

@pydantic.validate_call
def insertWorkflow(wf: models.Workflow_Model):
# print_json(data=wf.model_dump())
response = rPost("workflows/", data=json.dumps({"entity": wf.model_dump()}))
print('QQQQ')
print_json(data=wf.model_dump())
response = rPost("workflows/", data=wf.model_dump_json())
return response.json()

@pydantic.validate_call
def updateWorkflow(wf: models.Workflow_Model):
response = rPatch("workflows/", data=json.dumps({"entity": wf.model_dump()}))
response = rPatch("workflows/", data=wf.model_dump_json())
return models.Workflow_Model.model_validate(response.json())


Expand Down Expand Up @@ -88,7 +89,7 @@ def getWorkflowRecordFromHistory(wid: str, version: int) -> models.Workflow_Mode

@pydantic.validate_call
def insertWorkflowHistory(wf: models.Workflow_Model):
response = rPost("workflows_history/", data=json.dumps({"entity": wf.model_dump()}))
response = rPost("workflows_history/", data=wf.model_dump_json())
return response.json()


Expand Down Expand Up @@ -192,7 +193,7 @@ def insertExecution(m: models.WorkflowExecution_Model):

def getExecutionInputRecord(weid) -> List[models.IODataRecordItem_Model]:
response = rGet(f"executions/{weid}/inputs/")
if response.json() is None: return None
# if response.json() is None: return []
#print(200*'#')
#print_json(data=response.json())
# return models.IODataRecord_Model.model_validate(response.json())
Expand Down
2 changes: 1 addition & 1 deletion mupifDB/api/client_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def logMessage(*,name,levelno,pathname,lineno,created,**kw):
previous_level = logging.root.manager.disable
logging.disable(logging.CRITICAL)
try:
response = rPost("logs/", data=json.dumps({"entity": data}))
response = rPost("logs/", data=json.dumps(data))
finally:
logging.disable(previous_level)
return response.json()
Expand Down
152 changes: 80 additions & 72 deletions mupifDB/api/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@

if __name__ == '__main__':
import logging
logging.basicConfig()
log=logging.getLogger('mupifDB.api.server')
import argparse, sys
_parser=argparse.ArgumentParser()
_parser.add_argument('--export-openapi',default='',metavar='FILE')
cmdline_opts=_parser.parse_args()
if __name__ == '__main__' and not cmdline_opts.export_openapi:
print('SERVING')
import uvicorn
import os
host=os.environ.get('MUPIFDB_RESTAPI_HOST','0.0.0.0')
Expand Down Expand Up @@ -28,8 +35,8 @@
sys.path.append(os.path.dirname(os.path.abspath(__file__))+"/..")
import mupifDB
import mupif as mp
import mupif.pyroutil
import Pyro5.api
import logging
import pydantic
import json
from typing import Any,List
Expand All @@ -38,9 +45,6 @@



logging.basicConfig()
log=logging.getLogger('restApi')


from mupifDB import table_structures
from mupifDB import models
Expand Down Expand Up @@ -87,8 +91,15 @@

app = FastAPI(openapi_tags=tags_metadata)

# ugly way to get JSON-able diciontary from pydantic (for mongo insert), without special objects like datetime and IPv4Address etc.
def _model2jsondict(m: pydantic.BaseModel) -> Any: return json.loads(m.model_dump_json())

def _model2jsondict(m: models.StrictBase) -> dict:
'''
Mongo-specific enhancement of pydantic dump: if _id is None, remove it from the dict.
Otherwise MongoDB would see it as _id=null when inserting, rather than creating a new entry.
'''
ret=m.model_dump(mode='json')
if '_id' in ret and ret['_id'] is None: del ret['_id']
return ret

@app.exception_handler(fastapi.exceptions.RequestValidationError)
async def validation_exception_handler(request: fastapi.Request, exc: fastapi.exceptions.RequestValidationError):
Expand All @@ -110,9 +121,9 @@ async def validation_exception_handler(request: fastapi.Request, exc: fastapi.ex
if 1:
import mupifDB.api.edm as edm
# when imported at readthedocs, don't try to connect to the DB (no DB running there)
if 'MUPIFDB_DRY_RUN' not in os.environ:
if 'MUPIFDB_DRY_RUN' not in os.environ and not cmdline_opts.export_openapi:
edm.initializeEdm(client)
else: print('MUPIFDB_DRY_RUN defined, not initializing EDM DB connection.')
else: log.info('MUPIFDB_DRY_RUN / --generate-openapi active, not initializing EDM DB connection.')
app.include_router(edm.dms3.router)


Expand Down Expand Up @@ -179,14 +190,9 @@ def get_usecase_workflows(uid: str):
return []


class M_UseCase(BaseModel):
ucid: str
description: str


@app.post("/usecases/", tags=["Usecases"])
def post_usecase(data: M_UseCase):
res = db.UseCases.insert_one({"ucid": data.ucid, "Description": data.description})
def post_usecase(uc: models.UseCase_Model):
res = db.UseCases.insert_one(_model2jsondict(uc))
return str(res.inserted_id)


Expand Down Expand Up @@ -215,26 +221,27 @@ def get_workflow(workflow_id: str):
return None


class M_Dict(BaseModel):
entity: dict


@app.patch("/workflows/", tags=["Workflows"])
def update_workflow(data: M_Dict) -> models.Workflow_Model:
res = db.Workflows.find_one_and_update({'wid': data.entity['wid']}, {'$set': data.entity}, return_document=ReturnDocument.AFTER)
# return table_structures.extendRecord(fix_id(res), table_structures.tableWorkflow)
def update_workflow(wf: models.Workflow_Model) -> models.Workflow_Model:
#print('OOO')
#rint_json(data=wf.model_dump())
res = db.Workflows.find_one_and_update({'wid': wf.wid}, {'$set': _model2jsondict(wf)}, return_document=ReturnDocument.AFTER)
return models.Workflow_Model.model_validate(res)


@app.post("/workflows/", tags=["Workflows"])
def insert_workflow(data: M_Dict):
res = db.Workflows.insert_one(data.entity)
def insert_workflow(wf: models.Workflow_Model):
res = db.Workflows.insert_one(_model2jsondict(wf))
return str(res.inserted_id)


@app.post("/workflows_history/", tags=["Workflows"])
def insert_workflow_history(data: M_Dict):
res = db.WorkflowsHistory.insert_one(data.entity)
def insert_workflow_history(wf: models.Workflow_Model):
#print('TTT')
#print_json(data=wf.model_dump())
res = db.WorkflowsHistory.insert_one(_model2jsondict(wf))
#print(f'{res=}')
return str(res.inserted_id)


Expand All @@ -244,9 +251,9 @@ def insert_workflow_history(data: M_Dict):

@app.get("/workflows_history/{workflow_id}/{workflow_version}", tags=["Workflows"])
def get_workflow_history(workflow_id: str, workflow_version: int) -> models.Workflow_Model|None:
print(f'AAA: {workflow_id=} {workflow_version=}')
#print(f'AAA: {workflow_id=} {workflow_version=}')
res = db.WorkflowsHistory.find_one({"wid": workflow_id, "Version": workflow_version})
print(f'BBB: {res}')
#print(f'BBB: {res}')
if res:
# return table_structures.extendRecord(fix_id(res), table_structures.tableWorkflow)
return models.Workflow_Model.model_validate(res)
Expand Down Expand Up @@ -284,13 +291,11 @@ def get_executions(status: str = "", workflow_version: int = 0, workflow_id: str


@app.get("/executions/{uid}", tags=["Executions"])
def get_execution(uid: str) -> models.WorkflowExecution_Model|None:
def get_execution(uid: str) -> models.WorkflowExecution_Model:
res = db.WorkflowExecutions.find_one({"_id": bson.objectid.ObjectId(uid)})
print(f'OOO: {res=}')
if res: return models.WorkflowExecution_Model.model_validate(res)
#if res:
# return table_structures.extendRecord(fix_id(res), table_structures.tableExecution)
return None
# print(f'OOO: {res=}')
if res is None: raise KeyError(f'Database reports no execution with uid={uid}.')
return models.WorkflowExecution_Model.model_validate(res)


@app.post("/executions/create/", tags=["Executions"])
Expand Down Expand Up @@ -336,37 +341,37 @@ def get_execution_livelog(uid: str, num: int):
proxy._pyroTimeout=5
ll=proxy.tail(num,raw=True)
if isinstance(ll,dict): ll=serpent.tobytes(ll)
ll=pickle.loads(ll)
ll=pickle.loads(ll) # type: ignore
return [fmt.format(rec) for rec in ll]


def get_execution_io_item(uid, name, obj_id, inout):
we = db.WorkflowExecutions.find_one({"_id": bson.objectid.ObjectId(uid)})
data = db.IOData.find_one({'_id': bson.objectid.ObjectId(we[inout])})
for elem in data['DataSet']:
if elem.get('Name', None) == name and elem.get('ObjID', '') == obj_id:
def get_execution_io_item(uid: str, name, obj_id: str, inputs: bool):
we = models.WorkflowExecution_Model.model_validate(db.WorkflowExecutions.find_one({"_id": bson.objectid.ObjectId(uid)}))
data = models.IODataRecord_Model.model_validate(db.IOData.find_one({'_id': bson.objectid.ObjectId(we.Inputs if inputs else we.Outputs)}))
for elem in data.DataSet:
if elem.Name == name and elem.ObjID == obj_id:
return elem
return None


@app.get("/executions/{uid}/input_item/{name}/{obj_id}/", tags=["Executions"])
def get_execution_input_item(uid: str, name: str, obj_id: str):
return get_execution_io_item(uid, name, obj_id, 'Inputs')
return get_execution_io_item(uid, name, obj_id, inputs=True)


@app.get("/executions/{uid}/output_item/{name}/{obj_id}/", tags=["Executions"])
def get_execution_output_item(uid: str, name: str, obj_id: str):
return get_execution_io_item(uid, name, obj_id, 'Outputs')
return get_execution_io_item(uid, name, obj_id, inputs=False)


@app.get("/executions/{uid}/input_item/{name}//", tags=["Executions"])
def _get_execution_input_item(uid: str, name: str):
return get_execution_io_item(uid, name, '', 'Inputs')
return get_execution_io_item(uid, name, '', inputs=True)


@app.get("/executions/{uid}/output_item/{name}//", tags=["Executions"])
def _get_execution_output_item(uid: str, name: str):
return get_execution_io_item(uid, name, '', 'Outputs')
return get_execution_io_item(uid, name, '', inputs=False)


class M_IOData_link(BaseModel):
Expand All @@ -380,11 +385,11 @@ class M_IODataSetContainer(BaseModel):
object: typing.Optional[dict] = None


def set_execution_io_item(uid, name, obj_id, inout, data_container):
we = db.WorkflowExecutions.find_one({"_id": bson.objectid.ObjectId(uid)})
if (we.get('Status', '') == 'Created' and inout == 'Inputs') or (we.get('Status', '') == 'Running' and inout == 'Outputs'):
id_condition = {'_id': bson.objectid.ObjectId(we[inout])}
if data_container.link is not None and inout == 'Inputs':
def set_execution_io_item(uid, name, obj_id, inputs: bool, data_container):
we = models.WorkflowExecution_Model.model_validate(db.WorkflowExecutions.find_one({"_id": bson.objectid.ObjectId(uid)}))
if (we.Status == 'Created' and inputs==True) or (we.Status == 'Running' and inputs==False):
id_condition = {'_id': bson.objectid.ObjectId(we.Inputs if inputs else we.Outputs)}
if data_container.link is not None and inputs==True:
res = db.IOData.update_one(id_condition, {'$set': {"DataSet.$[r].Link": data_container.link}}, array_filters=[{"r.Name": name, "r.ObjID": str(obj_id)}])
return res.matched_count > 0
if data_container.object is not None:
Expand All @@ -395,22 +400,22 @@ def set_execution_io_item(uid, name, obj_id, inout, data_container):

@app.patch("/executions/{uid}/input_item/{name}/{obj_id}/", tags=["Executions"])
def set_execution_input_item(uid: str, name: str, obj_id: str, data: M_IODataSetContainer):
return set_execution_io_item(uid, name, obj_id, 'Inputs', data)
return set_execution_io_item(uid, name, obj_id, True, data)


@app.patch("/executions/{uid}/output_item/{name}/{obj_id}/", tags=["Executions"])
def set_execution_output_item(uid: str, name: str, obj_id: str, data: M_IODataSetContainer):
return set_execution_io_item(uid, name, obj_id, 'Outputs', data)
return set_execution_io_item(uid, name, obj_id, False, data)


@app.patch("/executions/{uid}/input_item/{name}//", tags=["Executions"])
def _set_execution_input_item(uid: str, name: str, data: M_IODataSetContainer):
return set_execution_io_item(uid, name, '', 'Inputs', data)
return set_execution_io_item(uid, name, '', True, data)


@app.patch("/executions/{uid}/output_item/{name}//", tags=["Executions"])
def _set_execution_output_item(uid: str, name: str, data: M_IODataSetContainer):
return set_execution_io_item(uid, name, '', 'Outputs', data)
return set_execution_io_item(uid, name, '', False, data)


class M_ModifyExecutionOntoBaseObjectID(BaseModel):
Expand All @@ -429,9 +434,9 @@ class M_ModifyExecutionOntoBaseObjectIDMultiple(BaseModel):


@app.patch("/executions/{uid}/set_onto_base_object_id_multiple/", tags=["Executions"])
def modify_execution(uid: str, data: M_ModifyExecutionOntoBaseObjectIDMultiple):
for d in data.data:
db.WorkflowExecutions.update_one({'_id': bson.objectid.ObjectId(uid), "EDMMapping.Name": d['name']}, {"$set": {"EDMMapping.$.id": d['value']}})
def modify_execution(uid: str, data: List[M_ModifyExecutionOntoBaseObjectID]):
for d in data:
db.WorkflowExecutions.update_one({'_id': bson.objectid.ObjectId(uid), "EDMMapping.Name": d.name}, {"$set": {"EDMMapping.$.id": d.value}})
return get_execution(uid)


Expand Down Expand Up @@ -558,8 +563,8 @@ def get_field_as_vtu(fid: str, tdir=Depends(get_temp_dir)):
# --------------------------------------------------

@app.post("/logs/", tags=["Logs"])
def insert_log(data: M_Dict):
res = db.Logs.insert_one(data.entity)
def insert_log(data: dict):
res = db.Logs.insert_one(data)
return str(res.inserted_id)


Expand All @@ -579,12 +584,11 @@ def get_status():
with open(pidfile, "r") as f:
try:
pid = int(f.read())
if not psutil.pid_exists(pid):
schedulerStatus = 'Failed'
except (OSError, ValueError):
schedulerStatus = 'Failed'

if not psutil.pid_exists(pid):
schedulerStatus = 'Failed'

# get some scheduler stats
stat = mupifDB.schedulerstat.getGlobalStat()
schedulerstat = db.Stat.find_one()['scheduler']
Expand Down Expand Up @@ -637,11 +641,12 @@ def db_init():
('WorkflowExecutions',None),
('IOData',None)
]:
try: c=db.create_collection(coll)
try:
c=db.create_collection(coll)
if rec is None: continue
try: c.insert_one(rec)
except Exception: log.exception(f'Error populating initial collection {coll} with {rec}.')
except Exception as e: log.exception(f'Error creating initial collection {coll}.')
if rec is None: continue
try: c.insert_one(rec)
except Exception: log.exception(f'Error populating initial collection {coll} with {rec}.')
return True


Expand Down Expand Up @@ -677,11 +682,10 @@ def set_scheduler_statistics(data: M_ModifyStatistics):
def get_status2():
ns = None
try:
ns = mp.pyroutil.connectNameserver();
ns = mupif.pyroutil.connectNameserver()
nameserverStatus = 'OK'
except:
nameserverStatus = 'Failed'

# get Scheduler status
schedulerStatus = 'Failed'
query = ns.yplookup(meta_any={"type:scheduler"})
Expand All @@ -703,21 +707,21 @@ def get_status2():

@app.get("/scheduler-status2/", tags=["Stats"])
def get_scheduler_status2():
ns = mp.pyroutil.connectNameserver();
ns = mupif.pyroutil.connectNameserver();
return mp.monitor.schedulerInfo(ns)

@app.get("/ns-status2/", tags=["Stats"])
def get_ns_status2():
ns = mp.pyroutil.connectNameserver();
ns = mupif.pyroutil.connectNameserver();
return mp.monitor.nsInfo(ns)

@app.get("/vpn-status2/", tags=["Stats"])
def get_vpn_status2():
return mp.monitor.vpnInfo(hidePriv=False)
return mupif.monitor.vpnInfo(hidePriv=False)

@app.get("/jobmans-status2/", tags=["Stats"])
def get_jobmans_status2():
ns = mp.pyroutil.connectNameserver();
ns = mupif.pyroutil.connectNameserver();
return mp.monitor.jobmanInfo(ns)


Expand Down Expand Up @@ -754,3 +758,7 @@ def edm_find(db: str, type: str, data: M_FindParams):
return ids


if __name__ == '__main__' and cmdline_opts.export_openapi:
open(cmdline_opts.export_openapi,'w').write(json.dumps(app.openapi(),indent=2))


Loading

0 comments on commit c2ca415

Please sign in to comment.