Skip to content

Commit

Permalink
Add unit test for JobManager get_finished_jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
mbthornton-lbl committed Oct 8, 2024
1 parent df35a06 commit 62d53ec
Show file tree
Hide file tree
Showing 4 changed files with 278 additions and 8 deletions.
2 changes: 1 addition & 1 deletion nmdc_automation/workflow_automation/watch_nmdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def get_finished_jobs(self)->Tuple[List[WorkflowJob], List[WorkflowJob]]:
successful_jobs = []
failed_jobs = []
for job in self.job_cache:
if not job.done:
if job.done:
status = job.job_status
if status == "Succeeded" and job.opid:
successful_jobs.append(job)
Expand Down
36 changes: 29 additions & 7 deletions nmdc_automation/workflow_automation/wfutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from nmdc_automation.config import SiteConfig
from nmdc_automation.workflow_automation.models import DataObject, workflow_process_factory

DEFAULT_MAX_RETRIES = 2

class JobRunnerABC(ABC):

Expand All @@ -22,7 +23,11 @@ def submit_job(self) -> str:
pass

@abstractmethod
def check_job_status(self) -> str:
def get_job_status(self) -> str:
pass

@abstractmethod
def get_job_metadata(self) -> Dict[str, Any]:
pass

@property
Expand All @@ -40,27 +45,38 @@ def outputs(self) -> Dict[str, str]:
def metadata(self) -> Dict[str, Any]:
pass

@property
@abstractmethod
def max_retries(self) -> int:
pass



class CromwellRunner(JobRunnerABC):

def __init__(self, site_config: SiteConfig, workflow: "WorkflowStateManager", job_metadata: Dict[str, Any] = None):
def __init__(self, site_config: SiteConfig, workflow: "WorkflowStateManager", job_metadata: Dict[str,
Any] = None, max_retries: int = DEFAULT_MAX_RETRIES):
self.config = site_config
self.workflow = workflow
self.service_url = self.config.cromwell_url
self._metadata = {}
if job_metadata:
self._metadata = job_metadata
self._max_retries = max_retries


def submit_job(self) -> str:
# TODO: implement
pass

def check_job_status(self) -> str:
def get_job_status(self) -> str:
# TODO: implement
return "Pending"

def get_job_metadata(self) -> Dict[str, Any]:
raise NotImplementedError
# TODO: implement

@property
def job_id(self) -> Optional[str]:
return self.metadata.get("id", None)
Expand All @@ -77,6 +93,11 @@ def metadata(self) -> Dict[str, Any]:
def metadata(self, metadata: Dict[str, Any]):
self._metadata = metadata

@property
def max_retries(self) -> int:
return self._max_retries



class WorkflowStateManager:
def __init__(self, state: Dict[str, Any] = None, opid: str = None):
Expand Down Expand Up @@ -183,18 +204,19 @@ def done(self, done: bool):
def job_status(self) -> str:
status = None
job_id_keys = ["cromwell_jobid"]
failed_count = self.workflow.state.get("failed_count", 0)
# if none of the job id keys are in the workflow state, it is unsubmitted
if not any(key in self.workflow.state for key in job_id_keys):
status = "Unsubmitted"
self.workflow.update_state({"last_status": status})
return status
elif self.workflow.state.get("last_status") == "Succeeded":
status = "Succeeded"
return status
elif self.workflow.state.get("last_status") == "Failed" and failed_count >= self.job.max_retries:
status = "Failed"
else:
status = self.job.check_job_status()
status = self.job.get_job_status()
self.workflow.update_state({"last_status": status})
return status
return status


@property
Expand Down
216 changes: 216 additions & 0 deletions tests/fixtures/failed_job_state.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
{
"type": "MAGs: v1.3.10",
"cromwell_jobid": "9492a397-eb30-472b-9d3b-abc123456789",
"nmdc_jobid": "nmdc:66cf64b6-7462-11ef-8b84-abc123456789",
"conf": {
"git_repo": "https://github.com/microbiomedata/metaMAGs",
"release": "v1.3.10",
"wdl": "mbin_nmdc.wdl",
"activity_id": "nmdc:wfmag-11-g7msr323.1",
"activity_set": "mags_activity_set",
"was_informed_by": "nmdc:omprc-11-9cdxha98",
"trigger_activity": "nmdc:wfmgan-11-jv8kx789.1",
"iteration": 1,
"input_prefix": "nmdc_mags",
"inputs": {
"proj": "nmdc:wfmag-11-g7msr323.1",
"contig_file": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgan-11-jv8kx789.1/nmdc_wfmgan-11-jv8kx789.1_contigs.fna",
"sam_file": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgas-11-0qvjnc54.1/nmdc_wfmgas-11-0qvjnc54.1_pairedMapped_sorted.bam",
"gff_file": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgan-11-jv8kx789.1/nmdc_wfmgan-11-jv8kx789.1_functional_annotation.gff",
"proteins_file": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgan-11-jv8kx789.1/nmdc_wfmgan-11-jv8kx789.1_proteins.faa",
"cog_file": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgan-11-jv8kx789.1/nmdc_wfmgan-11-jv8kx789.1_cog.gff",
"ec_file": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgan-11-jv8kx789.1/nmdc_wfmgan-11-jv8kx789.1_ec.tsv",
"ko_file": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgan-11-jv8kx789.1/nmdc_wfmgan-11-jv8kx789.1_ko.tsv",
"pfam_file": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgan-11-jv8kx789.1/nmdc_wfmgan-11-jv8kx789.1_pfam.gff",
"tigrfam_file": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgan-11-jv8kx789.1/nmdc_wfmgan-11-jv8kx789.1_tigrfam.gff",
"crispr_file": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgan-11-jv8kx789.1/nmdc_wfmgan-11-jv8kx789.1_crt.crisprs",
"product_names_file": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgan-11-jv8kx789.1/nmdc_wfmgan-11-jv8kx789.1_product_names.tsv",
"gene_phylogeny_file": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgan-11-jv8kx789.1/nmdc_wfmgan-11-jv8kx789.1_gene_phylogeny.tsv",
"lineage_file": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgan-11-jv8kx789.1/nmdc_wfmgan-11-jv8kx789.1_scaffold_lineage.tsv",
"map_file": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgan-11-jv8kx789.1/nmdc_wfmgan-11-jv8kx789.1_contig_names_mapping.tsv"
},
"input_data_objects": [
{
"id": "nmdc:dobj-11-1x850k20",
"name": "nmdc_wfmgan-11-jv8kx789.1_contigs.fna",
"description": "Assembly contigs (remapped) for nmdc:wfmgan-11-jv8kx789.1",
"url": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgan-11-jv8kx789.1/nmdc_wfmgan-11-jv8kx789.1_contigs.fna",
"md5_checksum": "6debed079383eeca2045ce23b0576607",
"file_size_bytes": 2084209623,
"data_object_type": "Assembly Contigs"
},
{
"id": "nmdc:dobj-11-fkj2kt47",
"name": "nmdc_wfmgas-11-0qvjnc54.1_pairedMapped_sorted.bam",
"description": "Metagenome Alignment BAM file for nmdc:omprc-11-9cdxha98",
"url": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgas-11-0qvjnc54.1/nmdc_wfmgas-11-0qvjnc54.1_pairedMapped_sorted.bam",
"md5_checksum": "88ec004bd037a3820060427098798666",
"file_size_bytes": 15704979428,
"data_object_type": "Assembly Coverage BAM"
},
{
"id": "nmdc:dobj-11-f9rnav80",
"name": "nmdc_wfmgan-11-jv8kx789.1_functional_annotation.gff",
"description": "Functional Annotation for nmdc:wfmgan-11-jv8kx789.1",
"url": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgan-11-jv8kx789.1/nmdc_wfmgan-11-jv8kx789.1_functional_annotation.gff",
"md5_checksum": "349cae9b4fe62bb910f08a183e57b475",
"file_size_bytes": 1320869282,
"data_object_type": "Functional Annotation GFF"
},
{
"id": "nmdc:dobj-11-btqzf393",
"name": "nmdc_wfmgan-11-jv8kx789.1_proteins.faa",
"description": "FASTA Amino Acid File for nmdc:wfmgan-11-jv8kx789.1",
"url": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgan-11-jv8kx789.1/nmdc_wfmgan-11-jv8kx789.1_proteins.faa",
"md5_checksum": "292eae73923605dae2ef9f5d582e4603",
"file_size_bytes": 1075716574,
"data_object_type": "Annotation Amino Acid FASTA"
},
{
"id": "nmdc:dobj-11-hdty3m42",
"name": "nmdc_wfmgan-11-jv8kx789.1_cog.gff",
"description": "COGs for nmdc:wfmgan-11-jv8kx789.1",
"url": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgan-11-jv8kx789.1/nmdc_wfmgan-11-jv8kx789.1_cog.gff",
"md5_checksum": "c4d1121c1ceb1229afb7190d23553003",
"file_size_bytes": 712459544,
"data_object_type": "Clusters of Orthologous Groups (COG) Annotation GFF"
},
{
"id": "nmdc:dobj-11-0gk70187",
"name": "nmdc_wfmgan-11-jv8kx789.1_ec.tsv",
"description": "EC Annotations for nmdc:wfmgan-11-jv8kx789.1",
"url": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgan-11-jv8kx789.1/nmdc_wfmgan-11-jv8kx789.1_ec.tsv",
"md5_checksum": "84cf22f39532e1bd001bea8425735a82",
"file_size_bytes": 116429630,
"data_object_type": "Annotation Enzyme Commission"
},
{
"id": "nmdc:dobj-11-3mtmhf26",
"name": "nmdc_wfmgan-11-jv8kx789.1_ko.tsv",
"description": "KEGG Orthology for nmdc:wfmgan-11-jv8kx789.1",
"url": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgan-11-jv8kx789.1/nmdc_wfmgan-11-jv8kx789.1_ko.tsv",
"md5_checksum": "17d699df17c97fc28796a198cf40a328",
"file_size_bytes": 169182276,
"data_object_type": "Annotation KEGG Orthology"
},
{
"id": "nmdc:dobj-11-7kfhf682",
"name": "nmdc_wfmgan-11-jv8kx789.1_pfam.gff",
"description": "Pfam Annotation for nmdc:wfmgan-11-jv8kx789.1",
"url": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgan-11-jv8kx789.1/nmdc_wfmgan-11-jv8kx789.1_pfam.gff",
"md5_checksum": "23c33758dc138e1af0f39fa1f3ca07db",
"file_size_bytes": 602929841,
"data_object_type": "Pfam Annotation GFF"
},
{
"id": "nmdc:dobj-11-9hjg8y84",
"name": "nmdc_wfmgan-11-jv8kx789.1_tigrfam.gff",
"description": "TIGRFam for nmdc:wfmgan-11-jv8kx789.1",
"url": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgan-11-jv8kx789.1/nmdc_wfmgan-11-jv8kx789.1_tigrfam.gff",
"md5_checksum": "bbfded219e0b359602725c9efb4f0c54",
"file_size_bytes": 61788991,
"data_object_type": "TIGRFam Annotation GFF"
},
{
"id": "nmdc:dobj-11-2x0wy902",
"name": "nmdc_wfmgan-11-jv8kx789.1_crt.crisprs",
"description": "Crispr Terms for nmdc:wfmgan-11-jv8kx789.1",
"url": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgan-11-jv8kx789.1/nmdc_wfmgan-11-jv8kx789.1_crt.crisprs",
"md5_checksum": "9d2255a63e39552328c4da20ccf2bb3f",
"file_size_bytes": 142989,
"data_object_type": "Crispr Terms"
},
{
"id": "nmdc:dobj-11-r0bx4g71",
"name": "nmdc_wfmgan-11-jv8kx789.1_product_names.tsv",
"description": "Product names for nmdc:wfmgan-11-jv8kx789.1",
"url": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgan-11-jv8kx789.1/nmdc_wfmgan-11-jv8kx789.1_product_names.tsv",
"md5_checksum": "6f1325b2f8dee9b2a75598fb9645c43d",
"file_size_bytes": 401118634,
"data_object_type": "Product Names"
},
{
"id": "nmdc:dobj-11-7mj15p44",
"name": "nmdc_wfmgan-11-jv8kx789.1_gene_phylogeny.tsv",
"description": "Gene Phylogeny for nmdc:wfmgan-11-jv8kx789.1",
"url": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgan-11-jv8kx789.1/nmdc_wfmgan-11-jv8kx789.1_gene_phylogeny.tsv",
"md5_checksum": "037aee803f1b81ac5ac1bccb9a18527d",
"file_size_bytes": 748420652,
"data_object_type": "Gene Phylogeny tsv"
},
{
"id": "nmdc:dobj-11-r2zqpy26",
"name": "nmdc_wfmgan-11-jv8kx789.1_scaffold_lineage.tsv",
"description": "Scaffold Lineage tsv for nmdc:wfmgan-11-jv8kx789.1",
"url": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgan-11-jv8kx789.1/nmdc_wfmgan-11-jv8kx789.1_scaffold_lineage.tsv",
"md5_checksum": "efdce9771cdda8bd8548e44ef6d1d3a3",
"file_size_bytes": 503898615,
"data_object_type": "Scaffold Lineage tsv"
},
{
"id": "nmdc:dobj-11-4k2bt072",
"name": "nmdc_wfmgan-11-jv8kx789.1_contig_names_mapping.tsv",
"description": "Contig mappings file for nmdc:wfmgan-11-jv8kx789.1",
"url": "https://data.microbiomedata.org/data/nmdc:omprc-11-9cdxha98/nmdc:wfmgan-11-jv8kx789.1/nmdc_wfmgan-11-jv8kx789.1_contig_names_mapping.tsv",
"md5_checksum": "1056a6ef48ce9124de0828ee85246e65",
"file_size_bytes": 250129248,
"data_object_type": "Contig Mapping File"
}
],
"activity": {
"name": "Metagenome Assembled Genomes Analysis Activity for {id}",
"type": "nmdc:MagsAnalysisActivity",
"binned_contig_num": "{outputs.final_stats_json.binned_contig_num}",
"input_contig_num": "{outputs.final_stats_json.input_contig_num}",
"low_depth_contig_num": "{outputs.final_stats_json.low_depth_contig_num}",
"mags_list": "{outputs.final_stats_json.mags_list}",
"too_short_contig_num": "{outputs.final_stats_json.too_short_contig_num}",
"unbinned_contig_num": "{outputs.final_stats_json.unbinned_contig_num}"
},
"outputs": [
{
"output": "final_checkm",
"data_object_type": "CheckM Statistics",
"description": "CheckM for {id}",
"name": "CheckM statistics report",
"id": "nmdc:dobj-11-xvjz5h55"
},
{
"output": "final_hqmq_bins_zip",
"data_object_type": "Metagenome Bins",
"description": "Metagenome Bins for {id}",
"name": "Metagenome bin tarfiles archive",
"id": "nmdc:dobj-11-85q1v678"
},
{
"output": "final_gtdbtk_bac_summary",
"data_object_type": "GTDBTK Bacterial Summary",
"description": "Bacterial Summary for {id}",
"name": "GTDBTK bacterial summary",
"id": "nmdc:dobj-11-j5p58211"
},
{
"output": "final_gtdbtk_ar_summary",
"data_object_type": "GTDBTK Archaeal Summary",
"description": "Archaeal Summary for {id}",
"name": "GTDBTK archaeal summary",
"suffix": "_gtdbtk.ar122.summary.tsv",
"id": "nmdc:dobj-11-ec2fqk35"
},
{
"output": "mags_version",
"data_object_type": "Metagenome Bins Info File",
"description": "Metagenome Bins Info File for {id}",
"name": "Metagenome Bins Info File",
"id": "nmdc:dobj-11-kg68h909"
}
]
},
"activity_id": "nmdc:wfmag-11-g7msr323.1",
"last_status": "Failed",
"done": true,
"failed_count": 2,
"start": "2024-09-16T19:33:32.562412+00:00",
"end": "2024-09-16T21:52:12.873101+00:00",
"opid": "nmdc:wfmag-11-g7msr323.1"
}
32 changes: 32 additions & 0 deletions tests/test_watch_nmdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,38 @@ def test_job_manager_prepare_and_cache_new_job(site_config, initial_state_file,
jm.job_cache = []


def test_job_manager_get_finished_jobs(site_config, initial_state_file, fixtures_dir):
# Arrange - initial state has 1 failure and is not done
fh = FileHandler(site_config, initial_state_file)
jm = JobManager(site_config, fh)

# Add a job to the cache - mags is done and successful
new_job_state = json.load(open(fixtures_dir / "mags_workflow_state.json"))
assert new_job_state
new_job = WorkflowJob(site_config, new_job_state)
jm.job_cache.append(new_job)
# sanity check
assert len(jm.job_cache) == 2

# add a failed job
failed_job_state = json.load(open(fixtures_dir / "failed_job_state.json"))
assert failed_job_state
failed_job = WorkflowJob(site_config, failed_job_state)
assert failed_job.job_status == "Failed"
jm.job_cache.append(failed_job)
# sanity check
assert len(jm.job_cache) == 3



# Act
successful_jobs, failed_jobs = jm.get_finished_jobs()
# Assert
assert successful_jobs
assert failed_jobs
# cleanup
jm.job_cache = []


@fixture
def mock_runtime_api_handler(site_config, mock_api):
Expand Down

0 comments on commit 62d53ec

Please sign in to comment.