Skip to content

Commit

Permalink
(CI: syntax fix in granta)
Browse files Browse the repository at this point in the history
  • Loading branch information
eudoxos committed Oct 16, 2024
1 parent 0497bf6 commit 6324b57
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 2 deletions.
2 changes: 1 addition & 1 deletion mupifDB/api/client_granta.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ def ObjIDIsIterable(val):
return None


def _getGrantaExecutionRecords(workflow_id=None, workflow_version=None, label=None, num_limit=None, status=None): -> List[models.WorkflowExecution_Model]:
def _getGrantaExecutionRecords(workflow_id=None, workflow_version=None, label=None, num_limit=None, status=None) -> List[models.WorkflowExecution_Model]:
assert api_type == 'granta'
r = rGet("executions/", headers=getGrantaHeaders())
return [
Expand Down
64 changes: 63 additions & 1 deletion mupifDB/api/client_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from requests.models import Response
import logging
import json
from typing import TypeVar,Any,Callable,Optional,Dict
from typing import TypeVar,Any,Callable,Optional,Dict,List,Literal
from rich import print_json
log=logging.getLogger(__name__)

Expand Down Expand Up @@ -109,3 +109,65 @@ def logMessage(*,name,levelno,pathname,lineno,created,**kw):
logging.disable(previous_level)
return response.json()


if 0:
# this is not used, and maybe never will

from .. import models
import pydantic

# https://stackoverflow.com/a/68284497
from abc import ABCMeta, abstractmethod

# decorate all methods as class methods, abstract, and validated by pydantic
class Interface(ABCMeta):
def __new__(metacls, name, bases, classdict):
for attr, value in classdict.items():
if callable(value):
classdict[attr] = abstractmethod(pydantic.validate_call(staticmethod(value)))
return super().__new__(ABCMeta, name, bases, classdict)

# linters don't know those methods are static, so warning everywhere.
class Client_ABC(Interface):
def getUsercaseRecords() -> List[models.UseCase_Model]: ...
def getUsecaseRecord(ucid: str) -> models.UseCase_Model: ...
def insertUsecaseRecord(ucid: str, description: str): ...
def getWorkflowRecords() -> List[models.Workflow_Model]: ...
def getWorkflowRecordsWithUsecase(usecase) -> List[models.Workflow_Model]: ...
def getWorkflowRecord(wid: str) -> models.Workflow_Model: ...
def insertWorkflow(wf: models.Workflow_Model): ...
def updateWorkflow(wf: models.Workflow_Model) -> models.Workflow_Model: ...
def getWorkflowRecordGeneral(wid, version: int) -> models.Workflow_Model: ...
def getWorkflowRecordFromHistory(wid: str, version: int) -> models.Workflow_Model: ...
def insertWorkflowHistory(wf: models.Workflow_Model): ...
def getExecutionRecords(workflow_id: str|None=None, workflow_version: int|None=None, label: str|None=None, num_limit: int|None=None, status: str|None=None) -> List[models.WorkflowExecution_Model]: ...
def getExecutionRecord(weid: str) -> models.WorkflowExecution_Model: ...
def getScheduledExecutions(num_limit=None): ...
def getPendingExecutions(num_limit=None): ...
def scheduleExecution(execution_id): ...
def setExecutionParameter(execution_id, param, value, val_type="str"): ...
def setExecutionOntoBaseObjectID(execution_id, name, value): ...
def setExecutionOntoBaseObjectIDMultiple(execution_id, data): ...
def setExecutionOntoBaseObjectIDs(execution_id, name, value): ...
def setExecutionAttemptsCount(execution_id, val): ...
def setExecutionStatus(execution_id: str, status: Literal=['Scheduled','Created','Pending','Running','Finished','Failed']): ...
def createExecution(wid: str, version: int, ip: str, no_onto=False): ...
def insertExecution(m: models.WorkflowExecution_Model): ...
def getExecutionInputRecord(weid) -> List[models.IODataRecordItem_Model]: ...
def getExecutionOutputRecord(weid) -> List[models.IODataRecordItem_Model]: ...
def getExecutionInputRecordItem(weid, name, obj_id): ...
def getExecutionOutputRecordItem(weid, name, obj_id): ...
def getIODataRecord(iod_id: str) -> models.IODataRecord_Model: ...
def insertIODataRecord(data: models.IODataRecord_Model): ...
def setExecutionInputLink(weid, name, obj_id, link_eid, link_name, link_obj_id): ...
def setExecutionInputObject(weid, name, obj_id, object_dict): ...
def setExecutionOutputObject(weid, name, obj_id, object_dict): ...
def getPropertyArrayData(file_id, i_start, i_count): ... # may not be used
def getBinaryFileByID(fid): ...
def uploadBinaryFile(binary_data): ...
def getStatus(): ...
def getExecutionStatistics() -> models.MupifDBStatus_Model.ExecutionStatistics_Model: ...
def getStatScheduler() : ...
def setStatScheduler(runningTasks=None, scheduledTasks=None, load=None, processedTasks=None, session=requests): ...
def updateStatScheduler(runningTasks=None, scheduledTasks=None, load=None, processedTasks=None): ...
def getSettings(maybe_init_db: bool=False): ...

0 comments on commit 6324b57

Please sign in to comment.