-
Notifications
You must be signed in to change notification settings - Fork 17
dynamic dag
The purpose of this describe the to-be process to execute Airflow DAGs to generate COVID-19 related ETL processes such as notebooks, S3 deployments and Snowflake table population.
We use most of the times convention over configuration principles to make things as dynamic as possible. The goal is to reduce Airflow Variables to the minimal, mostly for credentials and connections.
The proposed architecture will have a single application stored in /dag
folder (in the DagBag path) that generates one DAG
for each data source. These DAGs contains:
- Extraction from source systems, using notebooks from
/notebooks
folder. These notebooks MUST put their outputs to/output
folder in the repository. Contents of/output
are not tracked by git.- The generated file name MUST be prefixed with the basename of the notebook. Example:
notebooks/JHU_COVID-19.ipynb
will generate its file(s) asoutput/JHU_COVID-19*.csv
. This will help downstream tasks to find the generated files in the output folder.
- The generated file name MUST be prefixed with the basename of the notebook. Example:
- Ingestion to S3 bucket. Bucket name SHOULD be an Airflow variable.
- Ingestion to Snowflake database. The database table names MUST be the same as the CSV file basenames.
- Later, execute data quality checks
- The DAG SHOULD notify the users in case of failures / DQ issues.
- The
Schedule
should of the DAG SHOULD beNone
The single python file (proposed name: covid_etl_dag.py
) will generate the above mentioned DAGs and Tasks. By utilizing naming conventions and existing files the program flow is the following:
- Looks for ipynb files
/notebooks
folder. Generate a DAG for each notebook with the ID ofetl_{{basename of the filename}}
. Example:etl_JHU_COVID-19
. - Generate the necessary tasks inside the dynamically created DAG, as:
cleanup_output_folder
execute_notebook
upload_to_s3
upload_to_snowflake
send_email
- Set up of dependencies
By using @szilardhuber's code, we can create trigger DAG for github sources that looks for changes in the github repository and triggers the specific subdag (like etl_JHU_COVID-19
). The proposed schedule is every ten minutes.
I propose to have a dag that pulls the github master every ten minutes in case the ENVIRONMENT
variable set to production
. We can make it trigger based too (webhooks or something).