Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
Including optional delete on transfer data flow
  • Loading branch information
mcmero committed Oct 11, 2023
2 parents e4c1bb8 + f0a4769 commit 9f66291
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 32 deletions.
9 changes: 8 additions & 1 deletion .test/config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,14 @@ ignore_proj_regex: False
threads: 12

# whether to transfer data upon archiving completion using Globus
transfer: False
transfer: False

# whether to delete data from source endpoint after transfer
delete_on_transfer: False

# Globus Flow ID to use for transfer
# only used if delete_on_transfer flag is true
globus_flow_id: ''

# this machine's Globus endpoint ID
src_endpoint: ''
Expand Down
8 changes: 8 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ transfer_dir: '_transfer'
file_types:
- 'reports'
- 'fastq'
- 'fast5'
- 'pod5'
- 'fast5'
- 'bam'
Expand Down Expand Up @@ -51,6 +52,13 @@ threads: 12
# whether to transfer data upon archiving completion using Globus
transfer: False

# whether to delete data from source endpoint after transfer
delete_on_transfer: False

# Globus Flow ID to use for transfer (Move (copy and delete) files using Globus)
# only used if delete_on_transfer flag is true
globus_flow_id: 'f37e5766-7b3c-4c02-92ee-e6aacd8f4cb8'

# this machine's Globus endpoint ID
src_endpoint: ''

Expand Down
4 changes: 4 additions & 0 deletions workflow/envs/globus_automate.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
channels:
- conda-forge
dependencies:
- globus-automate-client
6 changes: 6 additions & 0 deletions workflow/envs/python.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
channels:
- conda-forge
dependencies:
- python=3.9
- numpy
- pandas
5 changes: 3 additions & 2 deletions workflow/rules/common.smk
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ end_of_run_file_regex = re.compile(r"%s" % config["end_of_run_file_regex"])
ignore_proj_regex = str(config["ignore_proj_regex"]).lower() == "true"
check_if_complete = str(config["check_if_complete"]).lower() == "true"
transfer = str(config["transfer"]).lower() == "true"
delete_on_transfer = str(config["delete_on_transfer"]).lower() == "true"

# error check input
if not os.path.exists(data_dir):
Expand Down Expand Up @@ -94,7 +95,7 @@ def is_processing_complete(project_dir_full):
directory may have been deleted due to transfer.
"""
transfer_dir_full = os.path.join(project_dir_full, transfer_dir)
transfer_complete_dir = f"{transfer_dir_full}_complete"
processing_complete_file = os.path.join(project_dir_full, "processing.success")
if os.path.exists(transfer_dir_full):
files_in_transfer_dir = next(os.walk(transfer_dir_full))[2]
final_file = "transfer.txt" if transfer else "tar_file_counts.txt"
Expand Down Expand Up @@ -122,7 +123,7 @@ def is_processing_complete(project_dir_full):
)

# if transfer directory does not exist, check for _complete directory
return os.path.exists(transfer_complete_dir)
return os.path.exists(processing_complete_file)


# build list of projects and samples to archive
Expand Down
102 changes: 73 additions & 29 deletions workflow/rules/transfer.smk
Original file line number Diff line number Diff line change
@@ -1,29 +1,73 @@
# NOTE: this step will only invoke the transfer but there is no guarantee that it
# will be successful. Check the Globus dashboard for the status of the transfer.
rule transfer:
input:
f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_file_counts.txt",
output:
f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_transfer.txt",
log:
"logs/{project}_transfer.log",
conda:
"../envs/globus.yaml"
threads: 1
params:
data_dir=data_dir,
transfer_dir=transfer_dir,
src_endpoint=config["src_endpoint"],
dest_endpoint=config["dest_endpoint"],
dest_path=config["dest_path"],
shell:
"""
globus transfer \
{params.src_endpoint}:{params.data_dir}/{wildcards.project}/{params.transfer_dir} \
{params.dest_endpoint}:{params.dest_path}/{wildcards.project}/{params.transfer_dir} \
--recursive \
--sync-level checksum \
--verify-checksum \
--fail-on-quota-errors \
--notify on > {output}
"""
if delete_on_transfer:

# NOTE: this step uses a Globus data flow, so first we need to create the json file
rule create_globus_json_input:
input:
f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_file_counts.txt",
output:
f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_globus_input.json",
log:
"logs/{project}_create_globus_json.log",
conda:
"../envs/python.yaml"
threads: 1
script:
"../scripts/create_globus_json_input.py"

# NOTE: this step will only invoke the transfer but there is no guarantee that it
# will be successful. Check the Globus dashboard for the status of the transfer.
rule transfer:
input:
f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_globus_input.json",
output:
transfer_file=f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_transfer.txt",
complete_file=f"{data_dir}/{{project}}/processing.success",
log:
"logs/{project}_transfer.log",
conda:
"../envs/globus_automate.yaml"
threads: 1
params:
globus_flow_id=config["globus_flow_id"],
shell:
"""
globus-automate flow run \
{params.globus_flow_id} \
--flow-input {input} \
--label "Transfer {wildcards.project}" > {output.transfer_file}
touch {output.complete_file}
"""


else:

# NOTE: this step will only invoke the transfer but there is no guarantee that it
# will be successful. Check the Globus dashboard for the status of the transfer.
rule transfer:
input:
f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_file_counts.txt",
output:
f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_transfer.txt",
log:
"logs/{project}_transfer.log",
conda:
"../envs/globus.yaml"
threads: 1
params:
data_dir=data_dir,
transfer_dir=transfer_dir,
src_endpoint=config["src_endpoint"],
dest_endpoint=config["dest_endpoint"],
dest_path=config["dest_path"],
shell:
"""
globus transfer \
{params.src_endpoint}:{params.data_dir}/{wildcards.project}/{params.transfer_dir} \
{params.dest_endpoint}:{params.dest_path}/{wildcards.project}/{params.transfer_dir} \
--recursive \
--sync-level checksum \
--verify-checksum \
--fail-on-quota-errors \
--notify on > {output}
"""
35 changes: 35 additions & 0 deletions workflow/scripts/create_globus_json_input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""
Creates json input file for Globus copy, delete and mark complete
data flow.
"""
import sys
import json
import os

sys.stderr = open(snakemake.log[0], "w")

src_endpoint = snakemake.config["src_endpoint"]
dest_endpoint = snakemake.config["dest_endpoint"]

data_dir = snakemake.config["data_dir"]
transfer_dir = snakemake.config["transfer_dir"]
project = snakemake.wildcards.project
src_path = f"{data_dir}/{project}/{transfer_dir}"

dest_path = os.path.join(snakemake.config["dest_path"], project)

input = {
"source": {
"id": src_endpoint,
"path": src_path
},
"destination": {
"id": dest_endpoint,
"path": dest_path
},
"transfer_label": f"Transfer archives for {project}",
"delete_label": f"Delete archives for {project}"
}

with open(snakemake.output[0], "w") as f:
json.dump(input, f, indent=2)

0 comments on commit 9f66291

Please sign in to comment.