feat(gwas_catalog): sumstat harmonisation dag #55
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Context
The aim of this PR is to add the dag that can perform the harmonisation with capacity of ~70k GWAS Catalog summary statistics.
We want to be able to
Implementation details
To achieve this we decided to set up the google batch job(s) for each of the 10k sumstats
Each batch task will be an instance of the script that performs a small pipeline as describe below:
Eventually this requires
and additionally
Note
Above requirements force to create an additional layer of complexity (between the orchestration and gentropy) that will invoke gentropy steps in a pipeline and collect the metrics and logs, so we can decide in the orchestrator how to act.
To do that I have created an image that is based on original gentropy image -
europe-west1-docker.pkg.dev/open-targets-genetics-dev/gentropy-app/gentropy:dev
.New image ammends the script for harmonisation and tags it with
europe-west1-docker.pkg.dev/open-targets-genetics-dev/gentropy-app/ot_gentropy:${ref}
.Batch job
Although with benefits, the google batch job is not ideal for a number of reasons:
The solution to this issue it to inherit from the default operator to and add a method that parses the job definition from configuration.
As we do not control the names of the inputs and outputs from static config, we need to define the environments dynamicaly by listing the content of the input directory and output directory and generate a todo manifest that will set up the input and output paths. Each row of the manifest will correspond to a single environment.
This means that we need to limit the number of tasks that can be scheduled in a single batch job to ~10-40K.
To resolve this issue we need to partition the input dataset (no mater the source) into the n batches from which each will correspond to a single google batch job.
The result from a single batch task that runs the harmonisation job are that it can produce the logs and metrics of the job execution in an environment specific to the google batch without touching the business logic, so gentropy can stay platform agnostic.
BatchIndex
I have introduced the concepts for
BatchIndexOperator
andGeneticsBatchJobOperator
. These two operators are required to run in consequenceBatchIndexOperator -> GeneticsBatchJobOperator
. The first operator is responsible for defining theBatchIndexRow
that is a container object storing the clicommands
andgoogle.cloud.batch_v1.environments
objects. By defining theenvironments
this operator defines the number of corresponding google batch tasks that will be executed within the batch runs.The
BatchIndexOperator
specifies the interface that needs to be satisfied by the Implementator function that should consume input parameters comming from the task parameters and output theBatchIndex
that implements the task environments list.The output should be consumed by the
GenericBatchJobOperator