Skip to content

Commit

Permalink
Remove call cache hack and enhace tests (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
morsecodist authored May 10, 2022
1 parent bd95212 commit 9228a04
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 14 deletions.
15 changes: 6 additions & 9 deletions miniwdl-plugins/s3upload/miniwdl_s3upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ def flag_temporary(s3uri):


def inode(link: str):
if link.startswith("s3://"):
return link
st = os.stat(os.path.realpath(link))
return (st.st_dev, st.st_ino)

Expand Down Expand Up @@ -238,15 +240,7 @@ def workflow(cfg, logger, run_id, run_dir, workflow, **recv):
workflow.name,
)

# HACK: Because of the way that call caching works if a step is call cached its outputs
# will be s3 paths. This is fine for inputs to other steps because the downloader
# will download them but for the last step of the pipeline, it tries to link
# the s3 paths if they are outputs to the global pipeline and this results
# in file not found errors. Technically for swipe we don't need linking
# and our whole system works if we just stop here. Once we solve the linking
# problem a bit better we may want to revisit this and return this to:
# yield recv
exit(0)
yield recv


def write_outputs_s3_json(logger, outputs, run_dir, s3prefix, namespace):
Expand All @@ -255,6 +249,9 @@ def write_outputs_s3_json(logger, outputs, run_dir, s3prefix, namespace):

# rewrite uploaded files to their S3 URIs
def rewriter(fd):
if fd.value.startswith("s3://"):
return fd.value

try:
return _uploaded_files[inode(fd.value)]
except Exception:
Expand Down
12 changes: 8 additions & 4 deletions test/test_wdl.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ def _wait_sfn(
stateMachineArn=sfn_arn, name=execution_name, input=json.dumps(sfn_input)
)
arn = res["executionArn"]
assert res
start = time.time()
description = self.sfn.describe_execution(executionArn=arn)
while description["status"] == "RUNNING" and time.time() < start + 2 * 60:
Expand Down Expand Up @@ -375,7 +374,7 @@ def test_call_cache(self):
)
outputs_obj = self.test_bucket.Object(f"{output_prefix}/test-1/out.txt")
output_text = outputs_obj.get()["Body"].read().decode()
assert output_text == "hello\nworld\n", output_text
self.assertEqual(output_text, "hello\nworld\n")

self.test_bucket.Object(f"{output_prefix}/test-1/out.txt").put(
Body="cache_break\n".encode()
Expand All @@ -388,17 +387,22 @@ def test_call_cache(self):
Prefix=f"{output_prefix}/test-1/cache/add_goodbye/",
)["Contents"]
self.test_bucket.Object(objects[0]["Key"]).delete()
objects = self.s3_client.list_objects_v2(
Bucket=self.test_bucket.name,
Prefix=f"{output_prefix}/test-1/cache/swipe_test/",
)["Contents"]
self.test_bucket.Object(objects[0]["Key"]).delete()
self.test_bucket.Object(out_json_path).delete()

self._wait_sfn(sfn_input, self.single_sfn_arn)

outputs = json.loads(self.test_bucket.Object(out_json_path).get()["Body"].read().decode())
for v in outputs.values():
assert v.startswith("s3://"), f"{v} does not start with 's3://'"
self.assert_(v.startswith("s3://"), f"{v} does not start with 's3://'")

outputs_obj = self.test_bucket.Object(f"{output_prefix}/test-1/out_goodbye.txt")
output_text = outputs_obj.get()["Body"].read().decode()
assert output_text == "cache_break\ngoodbye\n", output_text
self.assertEqual(output_text, "cache_break\ngoodbye\n")


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v0.21.1-beta
v0.21.3-beta

0 comments on commit 9228a04

Please sign in to comment.