Skip to content

Commit

Permalink
Convert statistics to models as well, minor cleanups, add annotations…
Browse files Browse the repository at this point in the history
… here and there
  • Loading branch information
eudoxos committed Oct 16, 2024
1 parent 49951a4 commit 0497bf6
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 104 deletions.
77 changes: 40 additions & 37 deletions mupifDB/api/client_granta.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,19 +104,34 @@ def fix_json(val: str) -> str:
val = val.replace("False", "false").replace("True", "true")
return val


def _getGrantaBinaryFileByID(fid):
assert api_type == 'granta'
# this is .../filestore instead of ..../api, so the ../filestore should do the trick
response = rGet(f"../filestore/{fid}", headers={'Authorization': f'Bearer {getAuthToken()["access_token"]}'}, allow_redirects=True)
return response.content, response.headers['content-disposition'].split('filename=')[1].replace('"', '')

def _uploadGrantaBinaryFile(binary_data):
assert api_type == 'granta'
response = rPost("../filestore", headers={'Authorization': f'Bearer {getAuthToken()["access_token"]}'}, files={"file": binary_data})
return response.json()['guid']



def _getGrantaWorkflowRecordGeneral(wid, version: int):
r = rGet(f"templates/{wid}", headers=getGrantaHeaders())
r_json = r.json()
workflow = table_structures.extendRecord({}, table_structures.tableWorkflow)
workflow['_id'] = r_json['guid']
workflow['wid'] = r_json['guid']
workflow['WorkflowVersion'] = 1

workflow['modulename'] = 'unknown'
workflow['classname'] = 'unknown'
workflow['GridFSID'] = 'unknown'
workflow['UseCase'] = ''
workflow['metadata'] = ''
# workflow = table_structures.extendRecord({}, table_structures.tableWorkflow)
workflow = models.Workflow_Model(
dbID = r_json['guid'],
wid = r_json['guid'],
# Version = 1,
modulename = 'unknown',
classname = 'unknown',
GridFSID = 'unknown',
# UseCase = '',
# metadata = ''
)

fid = None
gmds = r_json['metadata']
Expand All @@ -125,17 +140,17 @@ def _getGrantaWorkflowRecordGeneral(wid, version: int):
if gmd['name'] == 'muPIF metadata':
md = json.loads(fix_json(gmd['value']))
# print(md)
workflow['metadata'] = md
workflow['classname'] = md['ClassName']
workflow['modulename'] = md['ModuleName']
workflow.metadata = md
workflow.classname = md['ClassName']
workflow.modulename = md['ModuleName']

if gmd['name'] == 'workflow python file':
fid = gmd['value']['url'].split('/')[-1]

if fid:
file, filename = _getGrantaBinaryFileByID(fid)
workflow['GridFSID'] = fid
workflow['modulename'] = filename.replace('.py', '')
workflow.GridFSID = fid
workflow.modulename = filename.replace('.py', '')

return workflow

Expand Down Expand Up @@ -308,19 +323,18 @@ def ObjIDIsIterable(val):
return None


def _getGrantaExecutionRecords(workflow_id=None, workflow_version=None, label=None, num_limit=None, status=None): # -> List[models.WorkflowExecution_Model]:
def _getGrantaExecutionRecords(workflow_id=None, workflow_version=None, label=None, num_limit=None, status=None): -> List[models.WorkflowExecution_Model]:
assert api_type == 'granta'
r = rGet("executions/", headers=getGrantaHeaders())
res = []
for ex in r.json():
execution = table_structures.extendRecord({}, table_structures.tableExecution)
execution['_id'] = ex['guid']
execution['WorkflowID'] = ex['template_guid']
execution['WorkflowVersion'] = -1
execution['Status'] = {'Ready':'Pending','On-going':'Running','Completed':'Finished','Completed, to be reviewed':'Finished','Completed & reviewed':'Finished','Cancelled':'Failed'}.get(ex['status'],ex['status'])
execution['Task_ID'] = ''
res.append(execution)
return res
return [
models.WorkflowExecution_Model(
dbID = ex['guid'],
WorkflowID = ex['template_guid'],
WorkflowVersion = -1,
Status = {'Ready':'Pending','On-going':'Running','Completed':'Finished','Completed, to be reviewed':'Finished','Completed & reviewed':'Finished','Cancelled':'Failed'}.get(ex['status'],ex['status']),
Task_ID = '',
)
for ex in r.json()]


def _getGrantaExecutionRecord(weid: str):
Expand Down Expand Up @@ -383,14 +397,3 @@ def _setGrantaExecutionStatus(eid, val):
return False


def _getGrantaBinaryFileByID(fid):
assert api_type == 'granta'
# this is .../filestore instead of ..../api, so the ../filestore should do the trick
response = rGet(f"../filestore/{fid}", headers={'Authorization': f'Bearer {getAuthToken()["access_token"]}'}, allow_redirects=True)
return response.content, response.headers['content-disposition'].split('filename=')[1].replace('"', '')

def _uploadGrantaBinaryFile(binary_data):
assert api_type == 'granta'
response = rPost("../filestore", headers={'Authorization': f'Bearer {getAuthToken()["access_token"]}'}, files={"file": binary_data})
return response.json()['guid']

14 changes: 4 additions & 10 deletions mupifDB/api/client_mupif.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from rich.pretty import pprint
from typing import List,Optional

@pydantic.validate_call

def getUsecaseRecords():
return [models.UseCase_Model.model_validate(rec) for rec in rGet("usecases/").json()]

Expand Down Expand Up @@ -253,18 +253,12 @@ def getStatus():
response = rGet("status/")
return response.json()

def getExecutionStatistics():
def getExecutionStatistics() -> models.MupifDBStatus_Model.ExecutionStatistics_Model:
response = rGet("execution_statistics/")
return response.json()
return models.MupifDBStatus_Model.ExecutionStatistics_Model.model_validate(response.json())

def getStatScheduler():
response = rGet("scheduler_statistics/")
response_json = response.json()
keys = ["runningTasks", "scheduledTasks", "load", "processedTasks"]
for k in keys:
if k not in response_json:
response_json[k] = None
return response_json
return models.MupifDBStatus_Model.Stat_Model.SchedulerStat_Model.model_validate(rGet("scheduler_statistics/").json())

# session is the requests module by default (one-off session for each request) but can be passed
# a custom requests.Session() object with config such as retries and timeouts.
Expand Down
55 changes: 28 additions & 27 deletions mupifDB/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def get_usecases() -> List[models.UseCase_Model]:


@app.get("/usecases/{uid}", tags=["Usecases"])
def get_usecase(uid: str):
def get_usecase(uid: str) -> models.UseCase_Model:
res = db.UseCases.find_one({"ucid": uid})
if res is None: raise NotFoundError(f'Database reports no workflow with ucid={uid}.')
return models.UseCase_Model.model_validate(res)
Expand All @@ -171,7 +171,7 @@ def get_usecase_workflows(uid: str) -> List[models.Workflow_Model]:


@app.post("/usecases/", tags=["Usecases"])
def post_usecase(uc: models.UseCase_Model):
def post_usecase(uc: models.UseCase_Model) -> str:
res = db.UseCases.insert_one(uc.model_dump_db())
return str(res.inserted_id)

Expand All @@ -197,20 +197,18 @@ def get_workflow(workflow_id: str) -> models.Workflow_Model:

@app.patch("/workflows/", tags=["Workflows"])
def update_workflow(wf: models.Workflow_Model) -> models.Workflow_Model:
#print('OOO')
#rint_json(data=wf.model_dump())
res = db.Workflows.find_one_and_update({'wid': wf.wid}, {'$set': wf.model_dump_db()}, return_document=ReturnDocument.AFTER)
return models.Workflow_Model.model_validate(res)


@app.post("/workflows/", tags=["Workflows"])
def insert_workflow(wf: models.Workflow_Model):
def insert_workflow(wf: models.Workflow_Model) -> str:
res = db.Workflows.insert_one(wf.model_dump_db())
return str(res.inserted_id)


@app.post("/workflows_history/", tags=["Workflows"])
def insert_workflow_history(wf: models.Workflow_Model):
def insert_workflow_history(wf: models.Workflow_Model) -> str:
#print('TTT')
#print_json(data=wf.model_dump())
res = db.WorkflowsHistory.insert_one(wf.model_dump_db())
Expand All @@ -223,8 +221,8 @@ def insert_workflow_history(wf: models.Workflow_Model):
# --------------------------------------------------

@app.get("/workflows_history/{workflow_id}/{workflow_version}", tags=["Workflows"])
def get_workflow_history(workflow_id: str, workflow_version: int) -> models.Workflow_Model|None:
#print(f'AAA: {workflow_id=} {workflow_version=}')
def get_workflow_history(workflow_id: str, workflow_version: int) -> models.Workflow_Model:
# print(f'AAA: {workflow_id=} {workflow_version=}')
res = db.WorkflowsHistory.find_one({"wid": workflow_id, "Version": workflow_version})
if res is None: raise NotFoundError(f'Database reports no workflow with wid={workflow_id} and Version={workflow_version}.')
return models.Workflow_Model.model_validate(res)
Expand Down Expand Up @@ -555,15 +553,20 @@ def get_status():
schedulerStatus = 'Failed'
except (OSError, ValueError):
schedulerStatus = 'Failed'

# get some scheduler stats
stat = mupifDB.schedulerstat.getGlobalStat()
schedulerstat = db.Stat.find_one()['scheduler']
return {'mupifDBStatus': mupifDBStatus, 'schedulerStatus': schedulerStatus, 'totalStat': stat, 'schedulerStat': schedulerstat}
rec=db.Stat.find_one()
# pprint(rec)
statRec=models.MupifDBStatus_Model.Stat_Model.model_validate(rec)
return models.MupifDBStatus_Model(
schedulerStatus=schedulerStatus,
mupifDBStatus=mupifDBStatus,
# roundtrip to execution_statistics below via API request back to us? wth
totalStat=mupifDB.schedulerstat.getGlobalStat(),
schedulerStat=statRec.scheduler
)


@app.get("/execution_statistics/", tags=["Stats"])
def get_execution_statistics():
def get_execution_statistics() -> models.MupifDBStatus_Model.ExecutionStatistics_Model:
res = client.MuPIF.WorkflowExecutions.aggregate([
{"$group": {"_id": "$Status", "count": {"$sum": 1}}}
])
Expand All @@ -573,17 +576,15 @@ def get_execution_statistics():
vals[r['_id']] = r['count']
tot += r['count']

output = {}
output['totalExecutions'] = tot
output['finishedExecutions'] = vals.get('Finished', 0)
output['failedExecutions'] = vals.get('Failed', 0)
output['createdExecutions'] = vals.get('Created', 0)
output['pendingExecutions'] = vals.get('Pending', 0)
output['scheduledExecutions'] = vals.get('Scheduled', 0)
output['runningExecutions'] = vals.get('Running', 0)
return output


return models.MupifDBStatus_Model.ExecutionStatistics_Model(
totalExecutions = tot,
finishedExecutions = vals.get('Finished', 0),
failedExecutions = vals.get('Failed', 0),
createdExecutions = vals.get('Created', 0),
pendingExecutions = vals.get('Pending', 0),
scheduledExecutions = vals.get('Scheduled', 0),
runningExecutions = vals.get('Running', 0),
)


@app.get("/settings/", tags=["Settings"])
Expand All @@ -602,7 +603,7 @@ def db_init():
for coll,rec in [
('Settings',{'projectName':'TEST','projectLogo':'https://raw.githubusercontent.com/mupif/mupifDB/bd297a4a719336cd9672cfe73f31f7cbe2b4e029/webapi/static/images/mupif-logo.png'}),
('UseCases',models.UseCase_Model(projectName='TEST',projectLogo='https://raw.githubusercontent.com/mupif/mupifDB/bd297a4a719336cd9672cfe73f31f7cbe2b4e029/webapi/static/images/mupif-logo.png',ucid='1',Description='Test usecase').model_dump()),
('Stat',{"scheduler": {"load": 0, "processedTasks": 0, "runningTasks": 0, "scheduledTasks": 0}}),
('Stat',models.MupifDBStatus_Model.Stat_Model().model_dump(mode="json")),
('Workflows',None),
('WorkflowsHistory',None),
('WorkflowExecutions',None),
Expand Down Expand Up @@ -655,7 +656,7 @@ def get_status2():
nameserverStatus = 'Failed'
# get Scheduler status
schedulerStatus = 'Failed'
query = ns.yplookup(meta_any={"type:scheduler"})
query = ns.yplookup(meta_any={"type:scheduler"}) # type: ignore
try:
for name, (uri, metadata) in query.items():
s = Pyro5.api.Proxy(uri)
Expand Down
58 changes: 45 additions & 13 deletions mupifDB/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,9 @@ def model_dump_json(self, **kwargs: Any) -> str:
kwargs.setdefault("by_alias", True)
return super().model_dump_json(**kwargs)

class Parent_Model(StrictBase):
where: str
id: str

class MongoObj_Model(StrictBase):
class MongoObjBase_Model(StrictBase):
dbID: Optional[DatabaseID]=Field(None,alias='_id') # type: ignore[arg-type]
parent: Optional[Parent_Model]=None
def model_dump_db(self):
'''
MongoDB-specific enhancement: with _id=None (default), mongoDB would use _id:null instead of treating it as unset. Therefore remove it from the dump if None.
Expand All @@ -48,18 +44,29 @@ def model_dump_db(self):
if '_id' in ret and ret['_id'] is None: del ret['_id']
return ret

class GridFSFile_Model(MongoObjBase_Model):
length: int
chunkSize: int
uploadDate: datetime.datetime
metadata: Dict[str,Any]={}


class MongoObj_Model(MongoObjBase_Model):
class Parent_Model(StrictBase):
where: str
id: str
parent: Optional[Parent_Model]=None

class UseCase_Model(MongoObj_Model):
ucid: str
projectName: str=''
projectLogo: str=''
Description: str=''


class EDMMappingIDs_Model(StrictBase):
id: Optional[str]=None
ids: Optional[List[str]]=[]


class EDMMapping_Model(StrictBase):
id: Optional[str]=None
ids: Optional[List[str]]=[]
Expand All @@ -71,9 +78,6 @@ class EDMMapping_Model(StrictBase):
createNew: Optional[EDMMappingIDs_Model]=None
createFrom: Any='' ### XXXX what is this??




class InputOutputBase_Model(StrictBase):
Name: str
Description: Optional[str]=None
Expand All @@ -91,8 +95,8 @@ def TypeID(self) -> str: return self.Type_ID
class IODataRecordItem_Model(InputOutputBase_Model):
class Link_Model(StrictBase):
ExecID: Str_EmptyFromNone='' # XXX: test loads None
Name: Str_EmptyFromNone='' # XXX: test loads None
ObjID: Str_EmptyFromNone='' # XXX: test loads None
Name: Str_EmptyFromNone='' # XXX: test loads None
ObjID: Str_EmptyFromNone='' # XXX: test loads None
Value: Optional[dict[str,Any]|str]=None # deema: allow str
Link: Link_Model=Link_Model()
FileID: Optional[str]=None
Expand Down Expand Up @@ -159,11 +163,39 @@ class WorkflowExecution_Model(MongoObj_Model):
Outputs: str





#class ExecutionQuery_Model(StrictBase):
# status: Optional[ExecutionStatus_Literal]=Field(None,alias='Status')
# workflow_version: Optional[int]=Field(None,alias='WorkflowVersion')
# workflow_id: Optional[str]=Field(None,alias='WorkflowID')
# label: Optional[str]=None
# num_limit: int=999999


class MupifDBStatus_Model(StrictBase):
class Stat_Model(MongoObjBase_Model):
'Persisted in the DB, so deriving from MongoObjBase_Model.'
class SchedulerStat_Model(StrictBase):
load: float=0.
processedTasks: int=0
runningTasks: int=0
scheduledTasks: int=0
scheduler: SchedulerStat_Model=SchedulerStat_Model()
class ExecutionStatistics_Model(StrictBase):
totalExecutions: int=0
finishedExecutions: int=0
failedExecutions: int=0
createdExecutions: int=0
pendingExecutions: int=0
scheduledExecutions:int=0
runningExecutions: int=0

mupifDBStatus: Literal['OK','Failed']
schedulerStatus: Literal['OK','Failed']
schedulerStat: Stat_Model.SchedulerStat_Model
totalStat: ExecutionStatistics_Model


#{'mupifDBStatus': mupifDBStatus, 'schedulerStatus': schedulerStatus, 'totalStat': stat, 'schedulerStat': schedulerstat}
# {'mupifDBStatus': mupifDBStatus, 'schedulerStatus': schedulerStatus, 'totalStat': stat, 'schedulerStat': schedulerstat}
Loading

0 comments on commit 0497bf6

Please sign in to comment.