From b743c4bd217edd8ad64f985cc0611e37aec05580 Mon Sep 17 00:00:00 2001 From: mcmero Date: Fri, 18 Aug 2023 16:03:52 +1000 Subject: [PATCH 01/10] Refactoring - redesign to check fast5 and pod5 directories for pass, fail and skip files - dynamic rules to handle each above case (doesn't matter if any dirs are not present, they will be skipped) - simplified validation by creating txt file lists of tar files in the same step as archiving - consolidated steps, raw_format no longer needed - archive_complete now a lot simpler, just counts all files per sample on the system and in tar files --- .test/config/config.yaml | 1 + workflow/Snakefile | 2 - workflow/rules/archive.smk | 192 ++++++++++++------------------------- workflow/rules/common.smk | 96 +++++++------------ 4 files changed, 93 insertions(+), 198 deletions(-) diff --git a/.test/config/config.yaml b/.test/config/config.yaml index 3eaf561..8df4d39 100644 --- a/.test/config/config.yaml +++ b/.test/config/config.yaml @@ -14,6 +14,7 @@ file_types: - 'reports' - 'fastq' - 'pod5' + - 'fast5' - 'checksums' # project directory regex (set this based on diff --git a/workflow/Snakefile b/workflow/Snakefile index fb37c98..617a751 100644 --- a/workflow/Snakefile +++ b/workflow/Snakefile @@ -26,7 +26,5 @@ rule all: input: get_outputs(file_types), get_final_checksum_outputs(), - get_validate_tars_outputs(), - get_validate_reports_outputs(), get_archive_complete_outputs(), get_transfer_outputs(), diff --git a/workflow/rules/archive.smk b/workflow/rules/archive.smk index 370fed8..79021f8 100644 --- a/workflow/rules/archive.smk +++ b/workflow/rules/archive.smk @@ -3,7 +3,7 @@ rule calculate_checksums: [f"{data_dir}/{project}/{sample}" for project, sample in zip(projects, samples)], output: expand( - "{data_dir}/{{project}}/{transfer_dir}/checksums/{{project}}_{{sample}}.sha1", + "{data_dir}/{{project}}/{transfer_dir}/checksums/{{project}}_{{sample}}_checksums.sha1", data_dir=data_dir, transfer_dir=transfer_dir, ), @@ -20,89 +20,12 @@ rule calculate_checksums: find {wildcards.sample}/* -type f | xargs shasum -a 1 > {output} """ - -rule tar_reports: - input: - [f"{data_dir}/{project}/{sample}" for project, sample in zip(projects, samples)], - output: - expand( - "{data_dir}/{{project}}/{transfer_dir}/reports/{{project}}_{{sample}}_reports.tar.gz", - data_dir=data_dir, - transfer_dir=transfer_dir, - ), - log: - "logs/{project}_{sample}_reports.log", - conda: - "../envs/archive.yaml" - threads: config["threads"] - params: - data_dir=data_dir, - shell: - """ - cd {params.data_dir}/{wildcards.project} && tar -cvf - {wildcards.sample}/*/*.* {wildcards.sample}/*/other_reports | - pigz -p {threads} > {output} - """ - - -rule tar_fastqs: - input: - [f"{data_dir}/{project}/{sample}" for project, sample in zip(projects, samples)], - output: - expand( - "{data_dir}/{{project}}/{transfer_dir}/fastq/{{project}}_{{sample}}_fastq_{{state}}.tar", - data_dir=data_dir, - transfer_dir=transfer_dir, - state=STATES, - ), - log: - "logs/{project}_{sample}_{state}_fastq.log", - conda: - "../envs/archive.yaml" - threads: 1 - params: - data_dir=data_dir, - shell: - """ - cd {params.data_dir}/{wildcards.project} && - find {wildcards.sample}/*/fastq_{wildcards.state} -iname "*fastq.gz" | - tar -cvf {output} --files-from - - """ - - -rule tar_raw_data: - input: - [f"{data_dir}/{project}/{sample}" for project, sample in zip(projects, samples)], - output: - expand( - "{data_dir}/{{project}}/{transfer_dir}/{raw_format}/{{project}}_{{sample}}_{raw_format}_{{state}}.tar.gz", - data_dir=data_dir, - transfer_dir=transfer_dir, - raw_format=raw_format, - state=STATES, - ), - log: - "logs/{project}_{sample}_{state}_raw_data.log", - conda: - "../envs/archive.yaml" - threads: config["threads"] - params: - data_dir=data_dir, - raw_format=raw_format, - shell: - """ - cd {params.data_dir}/{wildcards.project} && - find {wildcards.sample}/*/{raw_format}_{wildcards.state} -iname "*{params.raw_format}" | - tar -cvf - --files-from - | - pigz -p {threads} > {output} - """ - - rule calculate_archive_checksums: input: get_outputs(file_types), output: expand( - "{data_dir}/{{project}}/{transfer_dir}/checksums/final/{{project}}_archives.sha1", + "{data_dir}/{{project}}/{transfer_dir}/checksums/{{project}}_archives.sha1", data_dir=data_dir, transfer_dir=transfer_dir, ), @@ -117,90 +40,93 @@ rule calculate_archive_checksums: find . -type f -iname "*tar*" | xargs shasum -a 1 > {output} """ +for project, sample in zip(projects, samples): + for file_type in file_types: + for state in STATES: + ext = "tar" if file_type == "fastq" else "tar.gz" + rule: + name: + f"tar_{project}_{sample}_{file_type}_{state}" + input: + f"{data_dir}/{project}/{sample}", + output: + tar=f"{data_dir}/{project}/{transfer_dir}/{file_type}/{project}_{sample}_{file_type}_{state}.{ext}", + txt=f"{data_dir}/{project}/{transfer_dir}/{file_type}/{project}_{sample}_{file_type}_{state}_list.txt", + log: + f"logs/{project}_{sample}_{file_type}_{state}_tar.log", + conda: + "../envs/archive.yaml" + threads: 1 + params: + data_dir=data_dir, + project=project, + sample=sample, + file_type=file_type, + state=state, + shell: + """ + if [[ "{params.file_type}" == "fastq" ]]; then + cd {params.data_dir}/{params.project} && + find {params.sample}/*/{params.file_type}_{params.state} -iname "*fastq.gz" | + tar -cvf {output.tar} --files-from - ; + tar -tvf {output.tar} >> {output.txt} + else + cd {params.data_dir}/{params.project} && + find {params.sample}/*/{params.file_type}_{params.state} -iname "*{params.file_type}" | + tar -cvf - --files-from - | + pigz -p {threads} > {output.tar} ; + tar -tvf <(pigz -dc {output.tar}) >> {output.txt} + fi + """ -rule validate_tars: +rule tar_reports: input: - get_outputs(file_types), + [f"{data_dir}/{project}/{sample}" for project, sample in zip(projects, samples)], output: - expand( - "{data_dir}/{{project}}/{transfer_dir}/{{file_type}}/{{project}}_{{sample}}_{{file_type}}_{{state}}_list.txt", + tar=expand( + "{data_dir}/{{project}}/{transfer_dir}/reports/{{project}}_{{sample}}_reports.tar.gz", data_dir=data_dir, transfer_dir=transfer_dir, ), - log: - "logs/{project}_{sample}_{file_type}_{state}_validate_tars.log", - conda: - "../envs/archive.yaml" - threads: 1 - params: - data_dir=data_dir, - transfer_dir=transfer_dir, - shell: - """ - tar={wildcards.project}_{wildcards.sample}_{wildcards.file_type}_{wildcards.state}.tar* - cd {params.data_dir}/{wildcards.project}/{params.transfer_dir}/{wildcards.file_type} && - if [[ $tar == *.gz ]]; then - tar -tvf <(pigz -dc $tar) >> {output} - else - tar -tvf $tar >> {output} - fi - """ - - -rule validate_reports: - input: - get_report_outputs(), - output: - expand( + txt=expand( "{data_dir}/{{project}}/{transfer_dir}/reports/{{project}}_{{sample}}_reports_list.txt", data_dir=data_dir, transfer_dir=transfer_dir, ), log: - "logs/{project}_{sample}_reports_validate_reports.log", + "logs/{project}_{sample}_reports.log", conda: "../envs/archive.yaml" - threads: 1 + threads: config["threads"] params: data_dir=data_dir, - transfer_dir=transfer_dir, shell: """ - tar={wildcards.project}_{wildcards.sample}_reports.tar.gz - cd {params.data_dir}/{wildcards.project}/{params.transfer_dir}/reports && - tar -tvf <(pigz -dc $tar) >> {output} + cd {params.data_dir}/{wildcards.project} && tar -cvf - {wildcards.sample}/*/*.* {wildcards.sample}/*/other_reports | + pigz -p {threads} > {output.tar} ; + tar -tvf <(pigz -dc {output.tar}) >> {output.txt} """ rule archive_complete: input: - get_validate_reports_outputs(), - get_validate_tars_outputs(), + get_outputs(file_types) output: - tar_file_counts=f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_tar_file_counts.txt", - sys_file_counts=f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_system_file_counts.txt", + f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_file_counts.txt", log: "logs/{project}_archive_complete.txt", threads: 1 params: data_dir=data_dir, - transfer_dir=transfer_dir, - raw_format=raw_format, + transfer_dir=transfer_dir shell: """ - cd {params.data_dir}/{wildcards.project}/{params.transfer_dir} - wc -l */*_{params.raw_format}_*_list.txt > {output.tar_file_counts} - wc -l */*_fastq_*_list.txt >> {output.tar_file_counts} - + transfer_path={params.data_dir}/{wildcards.project}/{params.transfer_dir} samples=`ls {params.data_dir}/{wildcards.project}/ | grep -v _transfer` - for type in {params.raw_format} fastq; do - for sample in $samples; do - if [[ -d {params.data_dir}/{wildcards.project}/$sample ]]; then - for state in fail pass; do - count=`find {params.data_dir}/{wildcards.project}/$sample/*/${{type}}_$state -type f | wc -l` - echo "$count $sample $type $state" >> {output.sys_file_counts} - done - fi - done + for sample in $samples; do + tar_count=`cat $transfer_path/*/{wildcards.project}_${{sample}}*_list.txt | grep -v "/$" | wc -l` + sys_file_count=`find {params.data_dir}/{wildcards.project}/$sample -type f | wc -l` + echo "$sample tar file counts: $tar_count" >> {output} + echo "$sample sys file counts: $sys_file_count" >> {output} done """ diff --git a/workflow/rules/common.smk b/workflow/rules/common.smk index 2d32a10..dc6078f 100644 --- a/workflow/rules/common.smk +++ b/workflow/rules/common.smk @@ -2,11 +2,12 @@ import pandas as pd import numpy as np import os import re +from glob import iglob # variables DATA_FILES = ["reports", "fastq", "fast5", "pod5"] POSSIBLE_FILE_TYPES = DATA_FILES + ["checksums"] -STATES = ["pass", "fail"] +STATES = ["pass", "fail", "skip"] data_dir = config["data_dir"] transfer_dir = config["transfer_dir"] @@ -106,6 +107,11 @@ def is_processing_complete(project_dir_full): project_name = os.path.basename(project_dir_full) final_file_with_projname = ( + f"{project_name}_transfer.txt" + if transfer + else f"{project_name}_file_counts.txt" + ) + final_file_legacy = ( f"{project_name}_transfer.txt" if transfer else f"{project_name}_tar_file_counts.txt" @@ -118,6 +124,7 @@ def is_processing_complete(project_dir_full): "archive.success" in files_in_transfer_dir or final_file in files_in_transfer_dir or final_file_with_projname in files_in_log_dir + or final_file_legacy in files_in_log_dir ) else: return False @@ -180,54 +187,33 @@ for project in project_dirs: # input/output functions -def get_report_outputs(): - report_outputs = [ - f"{data_dir}/{project}/{{transfer_dir}}/reports/{project}_{sample}_reports.tar.gz" - for project, sample in zip(projects, samples) - ] - report_outputs = expand( - report_outputs, - transfer_dir=transfer_dir, - ) - return report_outputs - - def get_checksum_outputs(): checksum_outputs = [ - f"{data_dir}/{project}/{{transfer_dir}}/checksums/{project}_{sample}.sha1" + f"{data_dir}/{project}/{transfer_dir}/checksums/{project}_{sample}_checksums.sha1" for project, sample in zip(projects, samples) ] - checksum_outputs = expand( - checksum_outputs, - transfer_dir=transfer_dir, - ) return checksum_outputs +def get_report_outputs(): + report_outputs = [] + for project, sample in zip(projects, samples): + report_outputs.append(f"{data_dir}/{project}/{transfer_dir}/reports/{project}_{sample}_reports.tar.gz") + report_outputs.append(f"{data_dir}/{project}/{transfer_dir}/reports/{project}_{sample}_reports_list.txt") + return report_outputs -def get_fastq_outputs(): - fastq_outputs = [ - f"{data_dir}/{project}/{{transfer_dir}}/fastq/{project}_{sample}_fastq_{{state}}.tar" - for project, sample in zip(projects, samples) - ] - fastq_outputs = expand( - fastq_outputs, - transfer_dir=transfer_dir, - state=STATES, - ) - return fastq_outputs +def get_output_by_type(filetype): + file_extension = "tar" if filetype == "fastq" else "tar.gz" + outputs = [] + for project, sample in zip(projects, samples): + files_under_sample = [os.path.basename(f) for f in iglob(f"{data_dir}/{project}/{sample}/*/*")] + out_prefix = f"{data_dir}/{project}/{transfer_dir}/{filetype}/{project}_{sample}_{filetype}" + for state in STATES: + if f"{filetype}_{state}" in files_under_sample: + outputs.append(f"{out_prefix}_{state}.{file_extension}") + outputs.append(f"{out_prefix}_{state}_list.txt") -def get_raw_outputs(): - raw_outputs = [ - f"{data_dir}/{project}/{{transfer_dir}}/{raw_format}/{project}_{sample}_{raw_format}_{{state}}.tar.gz" - for project, sample in zip(projects, samples) - ] - raw_outputs = expand( - raw_outputs, - transfer_dir=transfer_dir, - state=STATES, - ) - return raw_outputs + return outputs def get_outputs(file_types): @@ -237,15 +223,17 @@ def get_outputs(file_types): if "reports" in file_types: outputs.extend(get_report_outputs()) if "fastq" in file_types: - outputs.extend(get_fastq_outputs()) - if "fast5" in file_types or "pod5" in file_types: - outputs.extend(get_raw_outputs()) + outputs.extend(get_output_by_type("fastq")) + if "fast5" in file_types: + outputs.extend(get_output_by_type("fast5")) + if "pod5" in file_types: + outputs.extend(get_output_by_type("pod5")) return outputs def get_final_checksum_outputs(): final_checksum_outputs = expand( - "{data_dir}/{project}/{transfer_dir}/checksums/final/{project}_archives.sha1", + "{data_dir}/{project}/{transfer_dir}/checksums/{project}_archives.sha1", data_dir=data_dir, project=np.unique(projects), transfer_dir=transfer_dir, @@ -253,24 +241,6 @@ def get_final_checksum_outputs(): return final_checksum_outputs -def get_validate_tars_outputs(): - validate_tars_outputs = [ - f"{data_dir}/{project}/{transfer_dir}/{{file_type}}/{project}_{sample}_{{file_type}}_{{state}}_list.txt" - for project, sample in zip(projects, samples) - ] - - validate_tars_outputs = expand( - validate_tars_outputs, - file_type=[ - file_type - for file_type in file_types - if file_type not in ["checksums", "reports"] - ], - state=STATES, - ) - return validate_tars_outputs - - def get_validate_reports_outputs(): validate_reports_outputs = [ f"{data_dir}/{project}/{transfer_dir}/reports/{project}_{sample}_reports_list.txt" @@ -281,7 +251,7 @@ def get_validate_reports_outputs(): def get_archive_complete_outputs(): archive_complete_outputs = [ - f"{data_dir}/{project}/{transfer_dir}/logs/{project}_tar_file_counts.txt" + f"{data_dir}/{project}/{transfer_dir}/logs/{project}_file_counts.txt" for project in np.unique(projects) if project not in projects_with_incomplete_runs ] From cf6f49f9dfd5382c79005bb193e6bc3b956f7f24 Mon Sep 17 00:00:00 2001 From: mcmero Date: Fri, 18 Aug 2023 16:20:18 +1000 Subject: [PATCH 02/10] Fix threads setting for pigz archiving rules --- workflow/rules/archive.smk | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/workflow/rules/archive.smk b/workflow/rules/archive.smk index 79021f8..b09e1dd 100644 --- a/workflow/rules/archive.smk +++ b/workflow/rules/archive.smk @@ -44,6 +44,7 @@ for project, sample in zip(projects, samples): for file_type in file_types: for state in STATES: ext = "tar" if file_type == "fastq" else "tar.gz" + threads = 1 if file_type == "fastq" else config["threads"] rule: name: f"tar_{project}_{sample}_{file_type}_{state}" @@ -56,7 +57,7 @@ for project, sample in zip(projects, samples): f"logs/{project}_{sample}_{file_type}_{state}_tar.log", conda: "../envs/archive.yaml" - threads: 1 + threads: threads params: data_dir=data_dir, project=project, From 0d345a4aa22784553afdce1a7be988f54c109563 Mon Sep 17 00:00:00 2001 From: mcmero Date: Mon, 21 Aug 2023 09:51:04 +1000 Subject: [PATCH 03/10] Snakefmt fix --- workflow/rules/archive.smk | 8 ++++++-- workflow/rules/common.smk | 14 +++++++++++--- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/workflow/rules/archive.smk b/workflow/rules/archive.smk index b09e1dd..5de8bca 100644 --- a/workflow/rules/archive.smk +++ b/workflow/rules/archive.smk @@ -20,6 +20,7 @@ rule calculate_checksums: find {wildcards.sample}/* -type f | xargs shasum -a 1 > {output} """ + rule calculate_archive_checksums: input: get_outputs(file_types), @@ -40,11 +41,13 @@ rule calculate_archive_checksums: find . -type f -iname "*tar*" | xargs shasum -a 1 > {output} """ + for project, sample in zip(projects, samples): for file_type in file_types: for state in STATES: ext = "tar" if file_type == "fastq" else "tar.gz" threads = 1 if file_type == "fastq" else config["threads"] + rule: name: f"tar_{project}_{sample}_{file_type}_{state}" @@ -80,6 +83,7 @@ for project, sample in zip(projects, samples): fi """ + rule tar_reports: input: [f"{data_dir}/{project}/{sample}" for project, sample in zip(projects, samples)], @@ -111,7 +115,7 @@ rule tar_reports: rule archive_complete: input: - get_outputs(file_types) + get_outputs(file_types), output: f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_file_counts.txt", log: @@ -119,7 +123,7 @@ rule archive_complete: threads: 1 params: data_dir=data_dir, - transfer_dir=transfer_dir + transfer_dir=transfer_dir, shell: """ transfer_path={params.data_dir}/{wildcards.project}/{params.transfer_dir} diff --git a/workflow/rules/common.smk b/workflow/rules/common.smk index dc6078f..c712696 100644 --- a/workflow/rules/common.smk +++ b/workflow/rules/common.smk @@ -194,19 +194,27 @@ def get_checksum_outputs(): ] return checksum_outputs + def get_report_outputs(): report_outputs = [] for project, sample in zip(projects, samples): - report_outputs.append(f"{data_dir}/{project}/{transfer_dir}/reports/{project}_{sample}_reports.tar.gz") - report_outputs.append(f"{data_dir}/{project}/{transfer_dir}/reports/{project}_{sample}_reports_list.txt") + report_outputs.append( + f"{data_dir}/{project}/{transfer_dir}/reports/{project}_{sample}_reports.tar.gz" + ) + report_outputs.append( + f"{data_dir}/{project}/{transfer_dir}/reports/{project}_{sample}_reports_list.txt" + ) return report_outputs + def get_output_by_type(filetype): file_extension = "tar" if filetype == "fastq" else "tar.gz" outputs = [] for project, sample in zip(projects, samples): - files_under_sample = [os.path.basename(f) for f in iglob(f"{data_dir}/{project}/{sample}/*/*")] + files_under_sample = [ + os.path.basename(f) for f in iglob(f"{data_dir}/{project}/{sample}/*/*") + ] out_prefix = f"{data_dir}/{project}/{transfer_dir}/{filetype}/{project}_{sample}_{filetype}" for state in STATES: if f"{filetype}_{state}" in files_under_sample: From ac802e9fb1aff067dd4ad15160a094679f1948be Mon Sep 17 00:00:00 2001 From: mcmero Date: Mon, 21 Aug 2023 10:03:51 +1000 Subject: [PATCH 04/10] Fix transfer file input --- workflow/rules/transfer.smk | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workflow/rules/transfer.smk b/workflow/rules/transfer.smk index 79d5a7f..473fa56 100644 --- a/workflow/rules/transfer.smk +++ b/workflow/rules/transfer.smk @@ -2,7 +2,7 @@ # will be successful. Check the Globus dashboard for the status of the transfer. rule transfer: input: - f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_system_file_counts.txt", + f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_file_counts.txt", output: f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_transfer.txt", log: From b44d9b6f9d08f003b06404eb76b68f97f21cb50d Mon Sep 17 00:00:00 2001 From: mcmero Date: Mon, 21 Aug 2023 11:02:51 +1000 Subject: [PATCH 05/10] Clean up config --- .test/config/config.yaml | 2 -- config/config.yaml | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.test/config/config.yaml b/.test/config/config.yaml index 8df4d39..f284cbf 100644 --- a/.test/config/config.yaml +++ b/.test/config/config.yaml @@ -8,8 +8,6 @@ transfer_dir: '_transfer' # each item must match one of # 'fastq', 'fast5', 'pod5', # 'reports' and 'checksums' -# NOTE: you must specify only ONE -# of fast5 or pod5 per run file_types: - 'reports' - 'fastq' diff --git a/config/config.yaml b/config/config.yaml index 71ecaba..1daeee7 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -13,6 +13,7 @@ transfer_dir: '_transfer' file_types: - 'reports' - 'fastq' + - 'fast5' - 'pod5' - 'checksums' From f972ac2b9149117f495bfd18503295e9a3b56883 Mon Sep 17 00:00:00 2001 From: mcmero Date: Mon, 25 Sep 2023 19:02:55 +1000 Subject: [PATCH 06/10] Support for transfer and delete functionality --- .test/config/config.yaml | 9 +- config/config.yaml | 7 + .../copy_delete_mark_complete_definition.json | 375 ++++++++++++++++++ ...opy_delete_mark_complete_input_schema.json | 78 ++++ workflow/envs/python.yaml | 6 + workflow/rules/common.smk | 1 + workflow/rules/transfer.smk | 99 +++-- workflow/scripts/create_globus_json_input.py | 34 ++ 8 files changed, 580 insertions(+), 29 deletions(-) create mode 100644 globus_flows/copy_delete_mark_complete_definition.json create mode 100644 globus_flows/copy_delete_mark_complete_input_schema.json create mode 100644 workflow/envs/python.yaml create mode 100644 workflow/scripts/create_globus_json_input.py diff --git a/.test/config/config.yaml b/.test/config/config.yaml index 01f0e10..9744efa 100644 --- a/.test/config/config.yaml +++ b/.test/config/config.yaml @@ -49,7 +49,14 @@ ignore_proj_regex: False threads: 12 # whether to transfer data upon archiving completion using Globus -transfer: False +transfer: False + +# whether to delete data from source endpoint after transfer +delete_on_transfer: False + +# Globus Flow ID to use for transfer +# only used if delete_on_transfer flag is true +globus_flow_id: '' # this machine's Globus endpoint ID src_endpoint: '' diff --git a/config/config.yaml b/config/config.yaml index 5144b63..1c4d70b 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -52,6 +52,13 @@ threads: 12 # whether to transfer data upon archiving completion using Globus transfer: False +# whether to delete data from source endpoint after transfer +delete_on_transfer: False + +# Globus Flow ID to use for transfer +# only used if delete_on_transfer flag is true +globus_flow_id: '' + # this machine's Globus endpoint ID src_endpoint: '' diff --git a/globus_flows/copy_delete_mark_complete_definition.json b/globus_flows/copy_delete_mark_complete_definition.json new file mode 100644 index 0000000..305b422 --- /dev/null +++ b/globus_flows/copy_delete_mark_complete_definition.json @@ -0,0 +1,375 @@ +{ + "States": { + "Delete": { + "Next": "CreateTransferCompleteDir", + "Type": "Action", + "Catch": [ + { + "Next": "DeleteFailed", + "ResultPath": "$.DeleteErrorResult", + "ErrorEquals": [ + "ActionUnableToRun", + "ActionFailedException", + "ActionTimeout" + ] + } + ], + "Comment": "Use Transfer to delete the initial source ep/source path. It uses the same value for recursive as the transfer", + "WaitTime": 172800, + "ActionUrl": "https://actions.globus.org/transfer/delete", + "Parameters": { + "items.=": "[source.path]", + "label.=": "getattr('delete_label', 'Delete from Source for Move Flow Run with id ' + _context.run_id)", + "recursive.$": "$.SourceInfo.is_recursive", + "endpoint_id.$": "$.source.id" + }, + "ResultPath": "$.DeleteResult", + "ExceptionOnActionFailure": true + }, + "Transfer": { + "Next": "Delete", + "Type": "Action", + "Catch": [ + { + "Next": "TransferFailed", + "ResultPath": "$.TransferErrorResult", + "ErrorEquals": [ + "ActionUnableToRun", + "ActionFailedException", + "ActionTimeout" + ] + } + ], + "Comment": "Run the initial transfer operation from the source ep/source path to the destination ep/destination path", + "WaitTime": 172800, + "ActionUrl": "https://actions.globus.org/transfer/transfer", + "Parameters": { + "label.=": "getattr('transfer_label', 'Transfer for Move Flow Run with id ' + _context.run_id)", + "transfer_items": [ + { + "recursive.$": "$.SourceInfo.is_recursive", + "source_path.$": "$.source.path", + "destination_path.$": "$.Destination.destination_path" + } + ], + "source_endpoint_id.$": "$.source.id", + "destination_endpoint_id.$": "$.destination.id" + }, + "ResultPath": "$.TransferResult", + "ExceptionOnActionFailure": true + }, + "CreateTransferCompleteDir": { + "Next": "AllComplete", + "Type": "Action", + "Catch": [ + { + "Next": "CreateTransferCompleteDirFailed", + "ResultPath": "$.CreateTransferCompleteDirErrorResult", + "ErrorEquals": [ + "ActionUnableToRun", + "ActionFailedException", + "ActionTimeout" + ] + } + ], + "Comment": "Create directory called _transfer_complete in source folder.", + "WaitTime": 172800, + "ActionUrl": "https://actions.globus.org/transfer/mkdir", + "Parameters": { + "path.=": "`$.source.path` + '_complete'", + "endpoint_id.$": "$.source.id" + }, + "ResultPath": "$.TransferCompletePath", + "ExceptionOnActionFailure": true + }, + "AllComplete": { + "End": true, + "Type": "Pass", + "Comment": "Normal completion, so report success and exit", + "Parameters": { + "message": "Move operation complete" + }, + "ResultPath": "$.FlowResult" + }, + "CreateTransferCompleteDirFailed": { + "Type": "Fail", + "Cause": "CreateTransferCompleteDirFailed", + "Error": "See state in $.CreateTransferCompleteDirErrorResult of the run output; data transfer was successful.", + "Comment": "Report the error and end the flow execution" + }, + "DeleteFailed": { + "Type": "Fail", + "Cause": "DeleteFailed", + "Error": "See state in $.DeleteErrorResult of the run output; data transfer was successful.", + "Comment": "Report the error and end the flow execution" + }, + "SetSourceInfo": { + "Next": "LookupDestinationPath", + "Type": "ExpressionEval", + "Comment": "Set the recursive flag", + "Parameters": { + "source_file.=": "SourcePathInfo.details.DATA[0].name", + "is_recursive.=": "SourcePathInfo.details.DATA[0].is_folder", + "source_folder.=": "SourcePathInfo.details.path" + }, + "ResultPath": "$.SourceInfo" + }, + "FailSourceRoot": { + "Type": "Fail", + "Cause": "SourcePathIsRoot", + "Error": "The source path must be a sub-folder. It cannot be a root folder like '/' or '/~/'", + "Comment": "Report failure due to using a root path as the source" + }, + "TransferFailed": { + "Type": "Fail", + "Cause": "TransferFailed", + "Error": "See state in $.TransferErrorResult of the run output", + "Comment": "Report the error and end the flow execution" + }, + "LookupSourcePath": { + "Next": "CheckSourcePathInfo", + "Type": "Action", + "Catch": [ + { + "Next": "SourcePathMissing", + "ResultPath": "$.SourcePathLookupErrorResult", + "ErrorEquals": [ + "ActionUnableToRun", + "ActionFailedException", + "ActionTimeout" + ] + } + ], + "Comment": "Lookup the source path to determine its type (file/dir) to decide if transfer should be recursive", + "WaitTime": 172800, + "ActionUrl": "https://actions.globus.org/transfer/ls", + "Parameters": { + "path.$": "$.source.path", + "path_only": true, + "endpoint_id.$": "$.source.id" + }, + "ResultPath": "$.SourcePathInfo" + }, + "SourcePathMissing": { + "Next": "SourcePathMissingFail", + "Type": "ExpressionEval", + "Parameters": { + "error.=": "'Missing source path ' + source.path + ' on collection ' + source.id" + }, + "ResultPath": "$.FlowResult" + }, + "SetDestinationInfo": { + "Next": "SetDestinationPath", + "Type": "ExpressionEval", + "Comment": "Set information about the destination path", + "Parameters": { + "exists.=": "is_present('DestinationPathInfo.details.DATA[0]')", + "is_folder.=": "getattr('DestinationPathInfo.details.DATA[0].is_folder', False)", + "destination_file.=": "getattr('DestinationPathInfo.details.DATA[0].name', '/')", + "destination_folder.=": "DestinationPathInfo.details.path" + }, + "ResultPath": "$.DestinationInfo" + }, + "SetDestinationPath": { + "Next": "Transfer", + "Type": "ExpressionEval", + "Comment": "Compute Destination Path full string based on source and destination path lookup info", + "Parameters": { + "destination_path.=": "(destination.path + '/' + SourceInfo.source_file) if (DestinationInfo.is_folder or ((not DestinationInfo.exists) and SourceInfo.is_recursive)) else destination.path" + }, + "ResultPath": "$.Destination" + }, + "CheckSourcePathInfo": { + "Type": "Choice", + "Choices": [ + { + "Next": "SourcePathMissing", + "Variable": "$.SourcePathInfo.details.DATA[0]", + "IsPresent": false + } + ], + "Comment": "Determine the type of the source path", + "Default": "SetSourceInfo" + }, + "GetSourceCollection": { + "Next": "CheckSourceCollectionId", + "Type": "Action", + "Comment": "Get information about the source collection", + "WaitTime": 172800, + "ActionUrl": "https://actions.globus.org/transfer/collection_info", + "Parameters": { + "endpoint_id.$": "$.source.id" + }, + "ResultPath": "$.SourceEpInfo" + }, + "TestPathConstraints": { + "Type": "Choice", + "Choices": [ + { + "Next": "FailSrcAndDstMustBeDifferent", + "Variable": "$.source.id", + "StringEqualsPath": "$.destination.id" + }, + { + "Or": [ + { + "Variable": "$.source.path", + "StringEquals": "/" + }, + { + "Variable": "$.source.path", + "StringEquals": "/~/" + } + ], + "Next": "FailSourceRoot" + } + ], + "Default": "GetSourceCollection" + }, + "NoManagedCollections": { + "Next": "NoManagedCollectionFail", + "Type": "ExpressionEval", + "Parameters": { + "error.=": "'At least one of the collections ' + source.id + ' or ' + destination.id + ' must be managed.'" + }, + "ResultPath": "$.FlowResult" + }, + "LookupDestinationPath": { + "Next": "SetDestinationInfo", + "Type": "Action", + "Catch": [ + { + "Next": "SetDestinationMissingInfo", + "ResultPath": "$.SourcePathLookupErrorResult", + "ErrorEquals": [ + "ActionUnableToRun", + "ActionFailedException", + "ActionTimeout" + ] + } + ], + "Comment": "Lookup the destination path to determine its type (file/dir)", + "WaitTime": 172800, + "ActionUrl": "https://actions.globus.org/transfer/ls", + "Parameters": { + "path.$": "$.destination.path", + "path_only": true, + "endpoint_id.$": "$.destination.id" + }, + "ResultPath": "$.DestinationPathInfo" + }, + "SourcePathMissingFail": { + "Type": "Fail", + "Cause": "SourcePathMissing", + "Error": "See state in $.FlowResult of the run output", + "Comment": "Report the error and end the flow execution" + }, + "CheckSourceCollectionId": { + "Type": "Choice", + "Choices": [ + { + "Or": [ + { + "And": [ + { + "Variable": "$.SourceEpInfo.details.subscription_id", + "IsPresent": true + }, + { + "IsNull": false, + "Variable": "$.SourceEpInfo.details.subscription_id" + } + ] + }, + { + "And": [ + { + "Variable": "$.SourceEpInfo.details.entity_type", + "IsPresent": true + }, + { + "Variable": "$.SourceEpInfo.details.entity_type", + "StringEquals": "GCP_guest_collection" + } + ] + } + ], + "Next": "LookupSourcePath" + } + ], + "Comment": "Check that the source collection is managed", + "Default": "GetDestinationCollection" + }, + "NoManagedCollectionFail": { + "Type": "Fail", + "Cause": "NoManagedCollection", + "Error": "See state in $.FlowResult of the run output", + "Comment": "Report the error and end the flow execution" + }, + "GetDestinationCollection": { + "Next": "CheckDestinationCollectionId", + "Type": "Action", + "Comment": "Get information about the destination collection", + "WaitTime": 172800, + "ActionUrl": "https://actions.globus.org/transfer/collection_info", + "Parameters": { + "endpoint_id.$": "$.destination.id" + }, + "ResultPath": "$.DestinationEpInfo" + }, + "SetDestinationMissingInfo": { + "Next": "SetDestinationPath", + "Type": "Pass", + "Comment": "Set the expected destination information if the lookup fails", + "Parameters": { + "exists": false, + "is_folder": false + }, + "ResultPath": "$.DestinationInfo" + }, + "CheckDestinationCollectionId": { + "Type": "Choice", + "Choices": [ + { + "Or": [ + { + "And": [ + { + "Variable": "$.DestinationEpInfo.details.subscription_id", + "IsPresent": true + }, + { + "IsNull": false, + "Variable": "$.DestinationEpInfo.details.subscription_id" + } + ] + }, + { + "And": [ + { + "Variable": "$.DestinationEpInfo.details.entity_type", + "IsPresent": true + }, + { + "Variable": "$.DestinationEpInfo.details.entity_type", + "StringEquals": "GCP_guest_collection" + } + ] + } + ], + "Next": "LookupSourcePath" + } + ], + "Comment": "Check that the destination collection is managed", + "Default": "NoManagedCollections" + }, + "FailSrcAndDstMustBeDifferent": { + "Type": "Fail", + "Cause": "DuplicateSourceAndDestination", + "Error": "To reduce the risk of data loss, the source.id cannot be the same as the destination.id", + "Comment": "Report failure due to using the same collection for source and destination" + } + }, + "Comment": "A Flow for performing a logical 'move' operation by first transferring from a source to a destination and then deleting from the source", + "StartAt": "TestPathConstraints" + } \ No newline at end of file diff --git a/globus_flows/copy_delete_mark_complete_input_schema.json b/globus_flows/copy_delete_mark_complete_input_schema.json new file mode 100644 index 0000000..6b12404 --- /dev/null +++ b/globus_flows/copy_delete_mark_complete_input_schema.json @@ -0,0 +1,78 @@ +{ + "type": "object", + "required": [ + "source", + "destination" + ], + "properties": { + "source": { + "type": "object", + "title": "Source", + "format": "globus-collection", + "required": [ + "id", + "path" + ], + "properties": { + "id": { + "type": "string", + "title": "Source Collection ID", + "format": "uuid", + "pattern": "[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}", + "maxLength": 36, + "minLength": 36, + "description": "The UUID for the collection which serves as the source of the Move" + }, + "path": { + "type": "string", + "title": "Source Collection Path", + "description": "The path on the source collection for the data" + } + }, + "description": "Globus-provided flows require that at least one collection is managed under a subscription.", + "additionalProperties": false + }, + "destination": { + "type": "object", + "title": "Destination", + "format": "globus-collection", + "required": [ + "id", + "path" + ], + "properties": { + "id": { + "type": "string", + "title": "Destination Collection ID", + "format": "uuid", + "pattern": "[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}", + "maxLength": 36, + "minLength": 36, + "description": "The UUID for the collection which serves as the destination for the Move" + }, + "path": { + "type": "string", + "title": "Destination Collection Path", + "description": "The path on the destination collection where the data will be stored" + } + }, + "description": "Globus-provided flows require that at least one collection is managed under a subscription.", + "additionalProperties": false + }, + "delete_label": { + "type": "string", + "title": "Label for Delete Task from Source", + "pattern": "^[a-zA-Z0-9-_, ]+$", + "maxLength": 128, + "description": "A label placed on the Delete operation" + }, + "transfer_label": { + "type": "string", + "title": "Label for Transfer Task", + "pattern": "^[a-zA-Z0-9-_, ]+$", + "maxLength": 128, + "description": "A label placed on the Transfer operation" + } + }, + "additionalProperties": false +} diff --git a/workflow/envs/python.yaml b/workflow/envs/python.yaml new file mode 100644 index 0000000..f7e196e --- /dev/null +++ b/workflow/envs/python.yaml @@ -0,0 +1,6 @@ +channels: + - conda-forge +dependencies: + - python=3.9 + - numpy + - pandas \ No newline at end of file diff --git a/workflow/rules/common.smk b/workflow/rules/common.smk index 91dc7d7..8caf987 100644 --- a/workflow/rules/common.smk +++ b/workflow/rules/common.smk @@ -19,6 +19,7 @@ end_of_run_file_regex = re.compile(r"%s" % config["end_of_run_file_regex"]) ignore_proj_regex = str(config["ignore_proj_regex"]).lower() == "true" check_if_complete = str(config["check_if_complete"]).lower() == "true" transfer = str(config["transfer"]).lower() == "true" +delete_on_transfer = str(config["delete_on_transfer"]).lower() == "true" # error check input if not os.path.exists(data_dir): diff --git a/workflow/rules/transfer.smk b/workflow/rules/transfer.smk index de4b4b8..40c52d6 100644 --- a/workflow/rules/transfer.smk +++ b/workflow/rules/transfer.smk @@ -1,29 +1,72 @@ -# NOTE: this step will only invoke the transfer but there is no guarantee that it -# will be successful. Check the Globus dashboard for the status of the transfer. -rule transfer: - input: - f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_file_counts.txt", - output: - f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_transfer.txt", - log: - "logs/{project}_transfer.log", - conda: - "../envs/globus.yaml" - threads: 1 - params: - data_dir=data_dir, - transfer_dir=transfer_dir, - src_endpoint=config["src_endpoint"], - dest_endpoint=config["dest_endpoint"], - dest_path=config["dest_path"], - shell: - """ - globus transfer \ - {params.src_endpoint}:{params.data_dir}/{wildcards.project}/{params.transfer_dir} \ - {params.dest_endpoint}:{params.dest_path}/{wildcards.project}/{params.transfer_dir} \ - --recursive \ - --sync-level checksum \ - --verify-checksum \ - --fail-on-quota-errors \ +if delete_on_transfer: + + # NOTE: this step uses a Globus data flow, so first we need to create the json file + rule create_globus_json_input: + input: + f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_file_counts.txt", + output: + f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_globus_input.json", + log: + "logs/{project}_globus_input.log", + conda: + "../envs/python.yaml" + threads: 1 + params: + data_dir=data_dir, + transfer_dir=transfer_dir, + src_endpoint=config["src_endpoint"], + dest_endpoint=config["dest_endpoint"], + dest_path=config["dest_path"], + script: + "../scripts/create_globus_json_input.py" + + rule transfer: + input: + f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_globus_input.json", + output: + f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_transfer.txt", + log: + "logs/{project}_transfer.log", + conda: + "../envs/globus.yaml" + threads: 1 + shell: + """ + globus-automate flow run \ + --flow-input {input} \ + -l "{wildcards.project transfer}" \ + {params.globus_flow_id} + """ + + +else: + + # NOTE: this step will only invoke the transfer but there is no guarantee that it + # will be successful. Check the Globus dashboard for the status of the transfer. + rule transfer: + input: + f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_file_counts.txt", + output: + f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_transfer.txt", + log: + "logs/{project}_transfer.log", + conda: + "../envs/globus.yaml" + threads: 1 + params: + data_dir=data_dir, + transfer_dir=transfer_dir, + src_endpoint=config["src_endpoint"], + dest_endpoint=config["dest_endpoint"], + dest_path=config["dest_path"], + shell: + """ + globus transfer \ + {params.src_endpoint}:{params.data_dir}/{wildcards.project}/{params.transfer_dir} \ + {params.dest_endpoint}:{params.dest_path}/{wildcards.project}/{params.transfer_dir} \ + --recursive \ + --sync-level checksum \ + --verify-checksum \ + --fail-on-quota-errors \ --notify on > {output} - """ + """ diff --git a/workflow/scripts/create_globus_json_input.py b/workflow/scripts/create_globus_json_input.py new file mode 100644 index 0000000..2c40607 --- /dev/null +++ b/workflow/scripts/create_globus_json_input.py @@ -0,0 +1,34 @@ +""" +Creates json input file for Globus copy, delete and mark complete +data flow. +""" +import sys +import json + +sys.stderr = open(snakemake.log[0], "w") + +src_endpoint = snakemake.config["src_endpoint"] +dest_endpoint = snakemake.config["dest_endpoint"] + +data_dir = snakemake.config["data_dir"] +transfer_dir = snakemake.config["transfer_dir"] +project = snakemake.wildcards.project +src_path = f"{data_dir}/{project}/{transfer_dir}" + +dest_path = snakemake.config["dest_path"] + +input = { + "source": { + "id": src_endpoint, + "path": src_path + }, + "destination": { + "id": dest_endpoint, + "path": dest_path + }, + "transfer_label": f"Transfer archives for {project}", + "delete_label": f"Delete archives for {project}" +} + +with open(snakemake.output[0], "w") as f: + json.dump(input, f, indent=2) \ No newline at end of file From 56264d39cac7639679d818faff8944bf0edc5001 Mon Sep 17 00:00:00 2001 From: mcmero Date: Tue, 26 Sep 2023 11:10:37 +1000 Subject: [PATCH 07/10] Fix dest path and add globus-automate conda env --- workflow/envs/globus_automate.yaml | 4 ++++ workflow/rules/transfer.smk | 16 ++++++---------- workflow/scripts/create_globus_json_input.py | 3 ++- 3 files changed, 12 insertions(+), 11 deletions(-) create mode 100644 workflow/envs/globus_automate.yaml diff --git a/workflow/envs/globus_automate.yaml b/workflow/envs/globus_automate.yaml new file mode 100644 index 0000000..9e04b7e --- /dev/null +++ b/workflow/envs/globus_automate.yaml @@ -0,0 +1,4 @@ +channels: + - conda-forge +dependencies: + - globus-automate-client diff --git a/workflow/rules/transfer.smk b/workflow/rules/transfer.smk index 40c52d6..16196ed 100644 --- a/workflow/rules/transfer.smk +++ b/workflow/rules/transfer.smk @@ -7,16 +7,10 @@ if delete_on_transfer: output: f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_globus_input.json", log: - "logs/{project}_globus_input.log", + "logs/{project}_create_globus_json.log", conda: "../envs/python.yaml" threads: 1 - params: - data_dir=data_dir, - transfer_dir=transfer_dir, - src_endpoint=config["src_endpoint"], - dest_endpoint=config["dest_endpoint"], - dest_path=config["dest_path"], script: "../scripts/create_globus_json_input.py" @@ -28,14 +22,16 @@ if delete_on_transfer: log: "logs/{project}_transfer.log", conda: - "../envs/globus.yaml" + "../envs/globus_automate.yaml" threads: 1 + params: + globus_flow_id=config["globus_flow_id"], shell: """ globus-automate flow run \ + {params.globus_flow_id} \ --flow-input {input} \ - -l "{wildcards.project transfer}" \ - {params.globus_flow_id} + --label "Transfer {wildcards.project}" """ diff --git a/workflow/scripts/create_globus_json_input.py b/workflow/scripts/create_globus_json_input.py index 2c40607..2ad9051 100644 --- a/workflow/scripts/create_globus_json_input.py +++ b/workflow/scripts/create_globus_json_input.py @@ -4,6 +4,7 @@ """ import sys import json +import os sys.stderr = open(snakemake.log[0], "w") @@ -15,7 +16,7 @@ project = snakemake.wildcards.project src_path = f"{data_dir}/{project}/{transfer_dir}" -dest_path = snakemake.config["dest_path"] +dest_path = os.path.join(snakemake.config["dest_path"], project) input = { "source": { From 44cd695d214375f4a882b5c392d5c74740717cea Mon Sep 17 00:00:00 2001 From: mcmero Date: Wed, 27 Sep 2023 11:40:01 +1000 Subject: [PATCH 08/10] Write transfer output file for delete_on_transfer --- workflow/rules/transfer.smk | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/workflow/rules/transfer.smk b/workflow/rules/transfer.smk index 16196ed..c6732dd 100644 --- a/workflow/rules/transfer.smk +++ b/workflow/rules/transfer.smk @@ -31,7 +31,7 @@ if delete_on_transfer: globus-automate flow run \ {params.globus_flow_id} \ --flow-input {input} \ - --label "Transfer {wildcards.project}" + --label "Transfer {wildcards.project}" > {output} """ @@ -64,5 +64,5 @@ else: --sync-level checksum \ --verify-checksum \ --fail-on-quota-errors \ - --notify on > {output} + --notify on > {output} """ From 9dea23191b2ded9c990bd9c5821090e68cea59ed Mon Sep 17 00:00:00 2001 From: mcmero Date: Tue, 3 Oct 2023 13:34:27 +1100 Subject: [PATCH 09/10] Use standard Globus workflow to avoid flow expiration issues. --- config/config.yaml | 4 ++-- workflow/rules/common.smk | 4 ++-- workflow/rules/transfer.smk | 9 +++++++-- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index 1c4d70b..397a243 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -55,9 +55,9 @@ transfer: False # whether to delete data from source endpoint after transfer delete_on_transfer: False -# Globus Flow ID to use for transfer +# Globus Flow ID to use for transfer (Move (copy and delete) files using Globus) # only used if delete_on_transfer flag is true -globus_flow_id: '' +globus_flow_id: 'f37e5766-7b3c-4c02-92ee-e6aacd8f4cb8' # this machine's Globus endpoint ID src_endpoint: '' diff --git a/workflow/rules/common.smk b/workflow/rules/common.smk index 8caf987..e710335 100644 --- a/workflow/rules/common.smk +++ b/workflow/rules/common.smk @@ -95,7 +95,7 @@ def is_processing_complete(project_dir_full): directory may have been deleted due to transfer. """ transfer_dir_full = os.path.join(project_dir_full, transfer_dir) - transfer_complete_dir = f"{transfer_dir_full}_complete" + processing_complete_file = os.path.join(project_dir_full, "processing.success") if os.path.exists(transfer_dir_full): files_in_transfer_dir = next(os.walk(transfer_dir_full))[2] final_file = "transfer.txt" if transfer else "tar_file_counts.txt" @@ -123,7 +123,7 @@ def is_processing_complete(project_dir_full): ) # if transfer directory does not exist, check for _complete directory - return os.path.exists(transfer_complete_dir) + return os.path.exists(processing_complete_file) # build list of projects and samples to archive diff --git a/workflow/rules/transfer.smk b/workflow/rules/transfer.smk index c6732dd..555dfa4 100644 --- a/workflow/rules/transfer.smk +++ b/workflow/rules/transfer.smk @@ -14,11 +14,14 @@ if delete_on_transfer: script: "../scripts/create_globus_json_input.py" + # NOTE: this step will only invoke the transfer but there is no guarantee that it + # will be successful. Check the Globus dashboard for the status of the transfer. rule transfer: input: f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_globus_input.json", output: - f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_transfer.txt", + transfer_file=f"{data_dir}/{{project}}/{transfer_dir}/logs/{{project}}_transfer.txt", + complete_file=f"{data_dir}/{{project}}/processing.success", log: "logs/{project}_transfer.log", conda: @@ -31,7 +34,9 @@ if delete_on_transfer: globus-automate flow run \ {params.globus_flow_id} \ --flow-input {input} \ - --label "Transfer {wildcards.project}" > {output} + --label "Transfer {wildcards.project}" > {output.transfer_file} + + touch {output.complete_file} """ From f0a4769fe927b8e258832dde82145ecdd15bef91 Mon Sep 17 00:00:00 2001 From: mcmero Date: Tue, 3 Oct 2023 16:11:47 +1100 Subject: [PATCH 10/10] No longer need custom flows --- .../copy_delete_mark_complete_definition.json | 375 ------------------ ...opy_delete_mark_complete_input_schema.json | 78 ---- 2 files changed, 453 deletions(-) delete mode 100644 globus_flows/copy_delete_mark_complete_definition.json delete mode 100644 globus_flows/copy_delete_mark_complete_input_schema.json diff --git a/globus_flows/copy_delete_mark_complete_definition.json b/globus_flows/copy_delete_mark_complete_definition.json deleted file mode 100644 index 305b422..0000000 --- a/globus_flows/copy_delete_mark_complete_definition.json +++ /dev/null @@ -1,375 +0,0 @@ -{ - "States": { - "Delete": { - "Next": "CreateTransferCompleteDir", - "Type": "Action", - "Catch": [ - { - "Next": "DeleteFailed", - "ResultPath": "$.DeleteErrorResult", - "ErrorEquals": [ - "ActionUnableToRun", - "ActionFailedException", - "ActionTimeout" - ] - } - ], - "Comment": "Use Transfer to delete the initial source ep/source path. It uses the same value for recursive as the transfer", - "WaitTime": 172800, - "ActionUrl": "https://actions.globus.org/transfer/delete", - "Parameters": { - "items.=": "[source.path]", - "label.=": "getattr('delete_label', 'Delete from Source for Move Flow Run with id ' + _context.run_id)", - "recursive.$": "$.SourceInfo.is_recursive", - "endpoint_id.$": "$.source.id" - }, - "ResultPath": "$.DeleteResult", - "ExceptionOnActionFailure": true - }, - "Transfer": { - "Next": "Delete", - "Type": "Action", - "Catch": [ - { - "Next": "TransferFailed", - "ResultPath": "$.TransferErrorResult", - "ErrorEquals": [ - "ActionUnableToRun", - "ActionFailedException", - "ActionTimeout" - ] - } - ], - "Comment": "Run the initial transfer operation from the source ep/source path to the destination ep/destination path", - "WaitTime": 172800, - "ActionUrl": "https://actions.globus.org/transfer/transfer", - "Parameters": { - "label.=": "getattr('transfer_label', 'Transfer for Move Flow Run with id ' + _context.run_id)", - "transfer_items": [ - { - "recursive.$": "$.SourceInfo.is_recursive", - "source_path.$": "$.source.path", - "destination_path.$": "$.Destination.destination_path" - } - ], - "source_endpoint_id.$": "$.source.id", - "destination_endpoint_id.$": "$.destination.id" - }, - "ResultPath": "$.TransferResult", - "ExceptionOnActionFailure": true - }, - "CreateTransferCompleteDir": { - "Next": "AllComplete", - "Type": "Action", - "Catch": [ - { - "Next": "CreateTransferCompleteDirFailed", - "ResultPath": "$.CreateTransferCompleteDirErrorResult", - "ErrorEquals": [ - "ActionUnableToRun", - "ActionFailedException", - "ActionTimeout" - ] - } - ], - "Comment": "Create directory called _transfer_complete in source folder.", - "WaitTime": 172800, - "ActionUrl": "https://actions.globus.org/transfer/mkdir", - "Parameters": { - "path.=": "`$.source.path` + '_complete'", - "endpoint_id.$": "$.source.id" - }, - "ResultPath": "$.TransferCompletePath", - "ExceptionOnActionFailure": true - }, - "AllComplete": { - "End": true, - "Type": "Pass", - "Comment": "Normal completion, so report success and exit", - "Parameters": { - "message": "Move operation complete" - }, - "ResultPath": "$.FlowResult" - }, - "CreateTransferCompleteDirFailed": { - "Type": "Fail", - "Cause": "CreateTransferCompleteDirFailed", - "Error": "See state in $.CreateTransferCompleteDirErrorResult of the run output; data transfer was successful.", - "Comment": "Report the error and end the flow execution" - }, - "DeleteFailed": { - "Type": "Fail", - "Cause": "DeleteFailed", - "Error": "See state in $.DeleteErrorResult of the run output; data transfer was successful.", - "Comment": "Report the error and end the flow execution" - }, - "SetSourceInfo": { - "Next": "LookupDestinationPath", - "Type": "ExpressionEval", - "Comment": "Set the recursive flag", - "Parameters": { - "source_file.=": "SourcePathInfo.details.DATA[0].name", - "is_recursive.=": "SourcePathInfo.details.DATA[0].is_folder", - "source_folder.=": "SourcePathInfo.details.path" - }, - "ResultPath": "$.SourceInfo" - }, - "FailSourceRoot": { - "Type": "Fail", - "Cause": "SourcePathIsRoot", - "Error": "The source path must be a sub-folder. It cannot be a root folder like '/' or '/~/'", - "Comment": "Report failure due to using a root path as the source" - }, - "TransferFailed": { - "Type": "Fail", - "Cause": "TransferFailed", - "Error": "See state in $.TransferErrorResult of the run output", - "Comment": "Report the error and end the flow execution" - }, - "LookupSourcePath": { - "Next": "CheckSourcePathInfo", - "Type": "Action", - "Catch": [ - { - "Next": "SourcePathMissing", - "ResultPath": "$.SourcePathLookupErrorResult", - "ErrorEquals": [ - "ActionUnableToRun", - "ActionFailedException", - "ActionTimeout" - ] - } - ], - "Comment": "Lookup the source path to determine its type (file/dir) to decide if transfer should be recursive", - "WaitTime": 172800, - "ActionUrl": "https://actions.globus.org/transfer/ls", - "Parameters": { - "path.$": "$.source.path", - "path_only": true, - "endpoint_id.$": "$.source.id" - }, - "ResultPath": "$.SourcePathInfo" - }, - "SourcePathMissing": { - "Next": "SourcePathMissingFail", - "Type": "ExpressionEval", - "Parameters": { - "error.=": "'Missing source path ' + source.path + ' on collection ' + source.id" - }, - "ResultPath": "$.FlowResult" - }, - "SetDestinationInfo": { - "Next": "SetDestinationPath", - "Type": "ExpressionEval", - "Comment": "Set information about the destination path", - "Parameters": { - "exists.=": "is_present('DestinationPathInfo.details.DATA[0]')", - "is_folder.=": "getattr('DestinationPathInfo.details.DATA[0].is_folder', False)", - "destination_file.=": "getattr('DestinationPathInfo.details.DATA[0].name', '/')", - "destination_folder.=": "DestinationPathInfo.details.path" - }, - "ResultPath": "$.DestinationInfo" - }, - "SetDestinationPath": { - "Next": "Transfer", - "Type": "ExpressionEval", - "Comment": "Compute Destination Path full string based on source and destination path lookup info", - "Parameters": { - "destination_path.=": "(destination.path + '/' + SourceInfo.source_file) if (DestinationInfo.is_folder or ((not DestinationInfo.exists) and SourceInfo.is_recursive)) else destination.path" - }, - "ResultPath": "$.Destination" - }, - "CheckSourcePathInfo": { - "Type": "Choice", - "Choices": [ - { - "Next": "SourcePathMissing", - "Variable": "$.SourcePathInfo.details.DATA[0]", - "IsPresent": false - } - ], - "Comment": "Determine the type of the source path", - "Default": "SetSourceInfo" - }, - "GetSourceCollection": { - "Next": "CheckSourceCollectionId", - "Type": "Action", - "Comment": "Get information about the source collection", - "WaitTime": 172800, - "ActionUrl": "https://actions.globus.org/transfer/collection_info", - "Parameters": { - "endpoint_id.$": "$.source.id" - }, - "ResultPath": "$.SourceEpInfo" - }, - "TestPathConstraints": { - "Type": "Choice", - "Choices": [ - { - "Next": "FailSrcAndDstMustBeDifferent", - "Variable": "$.source.id", - "StringEqualsPath": "$.destination.id" - }, - { - "Or": [ - { - "Variable": "$.source.path", - "StringEquals": "/" - }, - { - "Variable": "$.source.path", - "StringEquals": "/~/" - } - ], - "Next": "FailSourceRoot" - } - ], - "Default": "GetSourceCollection" - }, - "NoManagedCollections": { - "Next": "NoManagedCollectionFail", - "Type": "ExpressionEval", - "Parameters": { - "error.=": "'At least one of the collections ' + source.id + ' or ' + destination.id + ' must be managed.'" - }, - "ResultPath": "$.FlowResult" - }, - "LookupDestinationPath": { - "Next": "SetDestinationInfo", - "Type": "Action", - "Catch": [ - { - "Next": "SetDestinationMissingInfo", - "ResultPath": "$.SourcePathLookupErrorResult", - "ErrorEquals": [ - "ActionUnableToRun", - "ActionFailedException", - "ActionTimeout" - ] - } - ], - "Comment": "Lookup the destination path to determine its type (file/dir)", - "WaitTime": 172800, - "ActionUrl": "https://actions.globus.org/transfer/ls", - "Parameters": { - "path.$": "$.destination.path", - "path_only": true, - "endpoint_id.$": "$.destination.id" - }, - "ResultPath": "$.DestinationPathInfo" - }, - "SourcePathMissingFail": { - "Type": "Fail", - "Cause": "SourcePathMissing", - "Error": "See state in $.FlowResult of the run output", - "Comment": "Report the error and end the flow execution" - }, - "CheckSourceCollectionId": { - "Type": "Choice", - "Choices": [ - { - "Or": [ - { - "And": [ - { - "Variable": "$.SourceEpInfo.details.subscription_id", - "IsPresent": true - }, - { - "IsNull": false, - "Variable": "$.SourceEpInfo.details.subscription_id" - } - ] - }, - { - "And": [ - { - "Variable": "$.SourceEpInfo.details.entity_type", - "IsPresent": true - }, - { - "Variable": "$.SourceEpInfo.details.entity_type", - "StringEquals": "GCP_guest_collection" - } - ] - } - ], - "Next": "LookupSourcePath" - } - ], - "Comment": "Check that the source collection is managed", - "Default": "GetDestinationCollection" - }, - "NoManagedCollectionFail": { - "Type": "Fail", - "Cause": "NoManagedCollection", - "Error": "See state in $.FlowResult of the run output", - "Comment": "Report the error and end the flow execution" - }, - "GetDestinationCollection": { - "Next": "CheckDestinationCollectionId", - "Type": "Action", - "Comment": "Get information about the destination collection", - "WaitTime": 172800, - "ActionUrl": "https://actions.globus.org/transfer/collection_info", - "Parameters": { - "endpoint_id.$": "$.destination.id" - }, - "ResultPath": "$.DestinationEpInfo" - }, - "SetDestinationMissingInfo": { - "Next": "SetDestinationPath", - "Type": "Pass", - "Comment": "Set the expected destination information if the lookup fails", - "Parameters": { - "exists": false, - "is_folder": false - }, - "ResultPath": "$.DestinationInfo" - }, - "CheckDestinationCollectionId": { - "Type": "Choice", - "Choices": [ - { - "Or": [ - { - "And": [ - { - "Variable": "$.DestinationEpInfo.details.subscription_id", - "IsPresent": true - }, - { - "IsNull": false, - "Variable": "$.DestinationEpInfo.details.subscription_id" - } - ] - }, - { - "And": [ - { - "Variable": "$.DestinationEpInfo.details.entity_type", - "IsPresent": true - }, - { - "Variable": "$.DestinationEpInfo.details.entity_type", - "StringEquals": "GCP_guest_collection" - } - ] - } - ], - "Next": "LookupSourcePath" - } - ], - "Comment": "Check that the destination collection is managed", - "Default": "NoManagedCollections" - }, - "FailSrcAndDstMustBeDifferent": { - "Type": "Fail", - "Cause": "DuplicateSourceAndDestination", - "Error": "To reduce the risk of data loss, the source.id cannot be the same as the destination.id", - "Comment": "Report failure due to using the same collection for source and destination" - } - }, - "Comment": "A Flow for performing a logical 'move' operation by first transferring from a source to a destination and then deleting from the source", - "StartAt": "TestPathConstraints" - } \ No newline at end of file diff --git a/globus_flows/copy_delete_mark_complete_input_schema.json b/globus_flows/copy_delete_mark_complete_input_schema.json deleted file mode 100644 index 6b12404..0000000 --- a/globus_flows/copy_delete_mark_complete_input_schema.json +++ /dev/null @@ -1,78 +0,0 @@ -{ - "type": "object", - "required": [ - "source", - "destination" - ], - "properties": { - "source": { - "type": "object", - "title": "Source", - "format": "globus-collection", - "required": [ - "id", - "path" - ], - "properties": { - "id": { - "type": "string", - "title": "Source Collection ID", - "format": "uuid", - "pattern": "[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}", - "maxLength": 36, - "minLength": 36, - "description": "The UUID for the collection which serves as the source of the Move" - }, - "path": { - "type": "string", - "title": "Source Collection Path", - "description": "The path on the source collection for the data" - } - }, - "description": "Globus-provided flows require that at least one collection is managed under a subscription.", - "additionalProperties": false - }, - "destination": { - "type": "object", - "title": "Destination", - "format": "globus-collection", - "required": [ - "id", - "path" - ], - "properties": { - "id": { - "type": "string", - "title": "Destination Collection ID", - "format": "uuid", - "pattern": "[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}", - "maxLength": 36, - "minLength": 36, - "description": "The UUID for the collection which serves as the destination for the Move" - }, - "path": { - "type": "string", - "title": "Destination Collection Path", - "description": "The path on the destination collection where the data will be stored" - } - }, - "description": "Globus-provided flows require that at least one collection is managed under a subscription.", - "additionalProperties": false - }, - "delete_label": { - "type": "string", - "title": "Label for Delete Task from Source", - "pattern": "^[a-zA-Z0-9-_, ]+$", - "maxLength": 128, - "description": "A label placed on the Delete operation" - }, - "transfer_label": { - "type": "string", - "title": "Label for Transfer Task", - "pattern": "^[a-zA-Z0-9-_, ]+$", - "maxLength": 128, - "description": "A label placed on the Transfer operation" - } - }, - "additionalProperties": false -}