From 9228a04ba992c2734cd4a3fa17fc6c256f9aea1e Mon Sep 17 00:00:00 2001 From: Todd Morse Date: Tue, 10 May 2022 14:17:18 -0700 Subject: [PATCH] Remove call cache hack and enhace tests (#82) --- miniwdl-plugins/s3upload/miniwdl_s3upload.py | 15 ++++++--------- test/test_wdl.py | 12 ++++++++---- version | 2 +- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/miniwdl-plugins/s3upload/miniwdl_s3upload.py b/miniwdl-plugins/s3upload/miniwdl_s3upload.py index 9b940fdf..857a054d 100644 --- a/miniwdl-plugins/s3upload/miniwdl_s3upload.py +++ b/miniwdl-plugins/s3upload/miniwdl_s3upload.py @@ -81,6 +81,8 @@ def flag_temporary(s3uri): def inode(link: str): + if link.startswith("s3://"): + return link st = os.stat(os.path.realpath(link)) return (st.st_dev, st.st_ino) @@ -238,15 +240,7 @@ def workflow(cfg, logger, run_id, run_dir, workflow, **recv): workflow.name, ) - # HACK: Because of the way that call caching works if a step is call cached its outputs - # will be s3 paths. This is fine for inputs to other steps because the downloader - # will download them but for the last step of the pipeline, it tries to link - # the s3 paths if they are outputs to the global pipeline and this results - # in file not found errors. Technically for swipe we don't need linking - # and our whole system works if we just stop here. Once we solve the linking - # problem a bit better we may want to revisit this and return this to: - # yield recv - exit(0) + yield recv def write_outputs_s3_json(logger, outputs, run_dir, s3prefix, namespace): @@ -255,6 +249,9 @@ def write_outputs_s3_json(logger, outputs, run_dir, s3prefix, namespace): # rewrite uploaded files to their S3 URIs def rewriter(fd): + if fd.value.startswith("s3://"): + return fd.value + try: return _uploaded_files[inode(fd.value)] except Exception: diff --git a/test/test_wdl.py b/test/test_wdl.py index 35b30ed1..5c53fa30 100644 --- a/test/test_wdl.py +++ b/test/test_wdl.py @@ -219,7 +219,6 @@ def _wait_sfn( stateMachineArn=sfn_arn, name=execution_name, input=json.dumps(sfn_input) ) arn = res["executionArn"] - assert res start = time.time() description = self.sfn.describe_execution(executionArn=arn) while description["status"] == "RUNNING" and time.time() < start + 2 * 60: @@ -375,7 +374,7 @@ def test_call_cache(self): ) outputs_obj = self.test_bucket.Object(f"{output_prefix}/test-1/out.txt") output_text = outputs_obj.get()["Body"].read().decode() - assert output_text == "hello\nworld\n", output_text + self.assertEqual(output_text, "hello\nworld\n") self.test_bucket.Object(f"{output_prefix}/test-1/out.txt").put( Body="cache_break\n".encode() @@ -388,17 +387,22 @@ def test_call_cache(self): Prefix=f"{output_prefix}/test-1/cache/add_goodbye/", )["Contents"] self.test_bucket.Object(objects[0]["Key"]).delete() + objects = self.s3_client.list_objects_v2( + Bucket=self.test_bucket.name, + Prefix=f"{output_prefix}/test-1/cache/swipe_test/", + )["Contents"] + self.test_bucket.Object(objects[0]["Key"]).delete() self.test_bucket.Object(out_json_path).delete() self._wait_sfn(sfn_input, self.single_sfn_arn) outputs = json.loads(self.test_bucket.Object(out_json_path).get()["Body"].read().decode()) for v in outputs.values(): - assert v.startswith("s3://"), f"{v} does not start with 's3://'" + self.assert_(v.startswith("s3://"), f"{v} does not start with 's3://'") outputs_obj = self.test_bucket.Object(f"{output_prefix}/test-1/out_goodbye.txt") output_text = outputs_obj.get()["Body"].read().decode() - assert output_text == "cache_break\ngoodbye\n", output_text + self.assertEqual(output_text, "cache_break\ngoodbye\n") if __name__ == "__main__": diff --git a/version b/version index 9aae823f..d2f87e24 100644 --- a/version +++ b/version @@ -1 +1 @@ -v0.21.1-beta +v0.21.3-beta \ No newline at end of file