Skip to content

Commit

Permalink
Merge pull request #57 from bento-platform/features/new-workflow-system
Browse files Browse the repository at this point in the history
feat: implement new workflow system
  • Loading branch information
davidlougheed authored Nov 30, 2023
2 parents 3b29d39 + 4881d86 commit 1fad838
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 85 deletions.
6 changes: 3 additions & 3 deletions etc/example.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

GOHAN_DEBUG=false
GOHAN_SERVICE_CONTACT=someone@somewhere.ca
GOHAN_SEMVER=4.0.1
GOHAN_SEMVER=5.0.0
GOHAN_SERVICES="gateway api elasticsearch kibana drs authorization"

# GOOS=linux
Expand Down Expand Up @@ -39,8 +39,8 @@ GOHAN_API_IMAGE=gohan-api
GOHAN_API_VERSION=latest

GOHAN_API_BUILDER_BASE_IMAGE=golang:1.21-bookworm
GOHAN_API_DEV_BASE_IMAGE=ghcr.io/bento-platform/bento_base_image:golang-debian-2023.10.20
GOHAN_API_PROD_BASE_IMAGE=ghcr.io/bento-platform/bento_base_image:plain-debian-2023.10.20
GOHAN_API_DEV_BASE_IMAGE=ghcr.io/bento-platform/bento_base_image:golang-debian-2023.11.10
GOHAN_API_PROD_BASE_IMAGE=ghcr.io/bento-platform/bento_base_image:plain-debian-2023.11.10

GOHAN_API_CONTAINER_NAME=gohan-api
GOHAN_API_SERVICE_HOST=0.0.0.0
Expand Down
56 changes: 32 additions & 24 deletions src/api/workflows/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,53 +10,61 @@ type WorkflowSchema map[string]interface{}
var WORKFLOW_VARIANT_SCHEMA WorkflowSchema = map[string]interface{}{
"ingestion": map[string]interface{}{
"vcf_gz": map[string]interface{}{
"name": "Compressed-VCF Elasticsearch Indexing",
"description": "This ingestion workflow will validate and ingest a BGZip-Compressed-VCF into Elasticsearch.",
"name": "Compressed VCF Elasticsearch Indexing",
"description": "This ingestion workflow will validate and ingest a BGZip-compressed VCF into Elasticsearch.",
"data_type": "variant",
"tags": []string{"variant"},
"file": "vcf_gz.wdl",
"action": "ingestion",
"type": "ingestion",
"inputs": []map[string]interface{}{
// User inputs:
{
"id": "vcf_gz_file_names",
"type": "file[]",
"required": true,
"extensions": []string{".vcf.gz"},
"id": "project_dataset",
"type": "project:dataset",
"required": true,
},
{
"id": "vcf_gz_file_names",
"type": "file[]",
"required": true,
"pattern": "^.*\\.vcf\\.gz$",
},
{
"id": "assembly_id",
"type": "enum",
"required": true,
"values": []c.AssemblyId{a.GRCh38, a.GRCh37},
"default": "GRCh38",
},
{
"id": "filter_out_references",
"type": "enum",
"type": "boolean",
"required": true,
"values": []string{"true", "false"}, // simulate boolean type
"default": "false",
},
// Injected inputs:
{
"id": "gohan_url",
"type": "string",
"required": true,
"value": "FROM_CONFIG",
"hidden": true,
"id": "gohan_url",
"type": "service-url",
"required": true,
"injected": true,
"service_kind": "gohan",
},
},
"outputs": []map[string]interface{}{
{
"id": "txt_output",
"type": "file",
"value": "{txt_output}",
"id": "access_token",
"type": "secret",
"required": true,
"injected": true,
"key": "access_token",
},
{
"id": "err_output",
"type": "file",
"value": "{err_output}",
"id": "validate_ssl",
"type": "config",
"required": true,
"injected": true,
"key": "validate_ssl",
},
},
},
},
"analysis": map[string]interface{}{},
"export": map[string]interface{}{},
}
130 changes: 72 additions & 58 deletions src/api/workflows/vcf_gz.wdl
Original file line number Diff line number Diff line change
@@ -1,112 +1,126 @@
version 1.0

workflow vcf_gz {
String gohan_url
Array[File] vcf_gz_file_names
String assembly_id
String project_id
String dataset_id
String filter_out_references
String secret__access_token
input {
String gohan_url
Array[File] vcf_gz_file_names
String assembly_id
String project_dataset
Boolean filter_out_references
String access_token
Boolean validate_ssl
}

call project_and_dataset_id {
input: project_dataset = project_dataset
}

scatter(file_name in vcf_gz_file_names) {
call vcf_gz_gohan {
input: gohan_url = gohan_url,
vcf_gz_file_name = file_name,
assembly_id = assembly_id,
project = project_id,
dataset = dataset_id,
project = project_and_dataset_id.out[0],
dataset = project_and_dataset_id.out[1],
filter_out_references = filter_out_references,
access_token = secret__access_token,
access_token = access_token,
validate_ssl = validate_ssl
}
}
}

task project_and_dataset_id {
input {
String project_dataset
}
command <<< python3 -c 'import json; print(json.dumps("~{project_dataset}".split(":")))' >>>
output {
Array[String] out = read_json(stdout())
}
}

task vcf_gz_gohan {
String gohan_url
String vcf_gz_file_name
String assembly_id
String project
String dataset
String filter_out_references
String access_token

command {
echo "Using temporary-token : ${access_token}"

QUERY="fileNames=${vcf_gz_file_name}&assemblyId=${assembly_id}&dataset=${dataset}&project=${project}&filterOutReferences=${filter_out_references}"
AUTH_HEADER="Authorization: Bearer ${access_token}"
input {
String gohan_url
String vcf_gz_file_name
String assembly_id
String project
String dataset
Boolean filter_out_references
String access_token
Boolean validate_ssl
}

command <<<
QUERY='fileNames=~{vcf_gz_file_name}&assemblyId=~{assembly_id}&dataset=~{dataset}&project=~{project}&filterOutReferences=~{true="true" false="false" filter_out_references}'

AUTH_HEADER='Authorization: Bearer ~{access_token}'

# TODO: refactor
# append temporary-token header if present
if [ "${access_token}" == "" ]
then
RUN_RESPONSE=$(curl -vvv "${gohan_url}/private/variants/ingestion/run?$QUERY" -k | sed 's/"/\"/g')
else
RUN_RESPONSE=$(curl -vvv -H "$AUTH_HEADER" "${gohan_url}/private/variants/ingestion/run?$QUERY" -k | sed 's/"/\"/g')
fi
RUN_RESPONSE=$(curl -vvv \
-H "${AUTH_HEADER}" \
~{true="" false="-k" validate_ssl} \
"~{gohan_url}/private/variants/ingestion/run?${QUERY}" | sed 's/"/\"/g')

echo $RUN_RESPONSE
echo "${RUN_RESPONSE}"

# reformat response string to include double quotes in the json object
RUN_RESPONSE_WITH_QUOTES=$(echo $RUN_RESPONSE | sed 's/"/\"/g')
echo $RUN_RESPONSE_WITH_QUOTES
echo "${RUN_RESPONSE_WITH_QUOTES}"

# obtain request id from the response for this one file just requested to process
REQUEST_ID=$(echo $RUN_RESPONSE_WITH_QUOTES | jq -r '.[] |"\(.id)"')
echo $REQUEST_ID
echo "${REQUEST_ID}"

# give it a second..
sleep 1s

# "while loop to ping '/variants/ingestion/requests' and wait for this file ingestion to complete or display an error..."
while :
do
# TODO: refactor
# fetch run requests
# append temporary-token header if present
if [ "${access_token}" == "" ]
then
REQUESTS=$(curl -vvv "${gohan_url}/private/variants/ingestion/requests" -k)
else
REQUESTS=$(curl -vvv -H "$AUTH_HEADER" "${gohan_url}/private/variants/ingestion/requests" -k)
fi
REQUESTS=$(curl -vvv \
-H "${AUTH_HEADER}" \
~{true="" false="-k" validate_ssl} \
"~{gohan_url}/private/variants/ingestion/requests")

echo $REQUESTS
echo "${REQUESTS}"

# reformat response string to include double quotes in the json object
REQ_WITH_QUOTES=$(echo $REQUESTS | sed 's/"/\"/g')
echo $REQ_WITH_QUOTES
echo "${REQ_WITH_QUOTES}"

# organize json objects as individual lines per response object (file being processed)
JQ_RES=$(echo $REQ_WITH_QUOTES | jq -r '.[] | "\(.id) \(.filename) \(.state)"')
echo "$JQ_RES"

echo "${JQ_RES}"

# determine the state of the run request by filename
THIS_FILE_RESULT=$(echo "$JQ_RES" | grep $REQUEST_ID | tr ' ' '\n' | grep . | tail -n1)
echo $THIS_FILE_RESULT
echo "${THIS_FILE_RESULT}"

if [ "$THIS_FILE_RESULT" == "Done" ] || [ "$THIS_FILE_RESULT" == "Error" ]
then
WITH_ERROR_MESSAGE=
if [ "${THIS_FILE_RESULT}" == "Done" ] || [ "${THIS_FILE_RESULT}" == "Error" ]; then
WITH_ERROR_MESSAGE=''

if [ "$THIS_FILE_RESULT" == "Error" ]
then
if [ "${THIS_FILE_RESULT}" == "Error" ]; then
WITH_ERROR_MESSAGE=" in error!"
echo "This is what we found from the /variants/ingestion/requests :"
echo "$THIS_FILE_RESULT"
echo "${THIS_FILE_RESULT}"
fi

echo "File ${vcf_gz_file_name} with assembly id ${assembly_id} done processing $WITH_ERROR_MESSAGE"
echo "File ~{vcf_gz_file_name} with assembly id ~{assembly_id} done processing ${WITH_ERROR_MESSAGE}"

break
elif [ "$THIS_FILE_RESULT" == "" ]
then
echo "Something went wrong. Got invalid response from Gohan API : $REQUESTS"
elif [ "${THIS_FILE_RESULT}" == "" ]; then
echo "Something went wrong. Got invalid response from Gohan API: ${REQUESTS}"
break
else
echo "Waiting 5 seconds.."
echo '~{vcf_gz_file_name}: Waiting 5 seconds...'
sleep 5s
fi
done
>>>

output {
String out = stdout()
String err = stderr()
}
}

0 comments on commit 1fad838

Please sign in to comment.