Airflow doesn't show dags created by loop. #32865
Replies: 3 comments 1 reply
-
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval. |
Beta Was this translation helpful? Give feedback.
-
Can you recheck whether files are created in the file system i-e {airflow}/dags folder? Also check the dags_folder path configuration |
Beta Was this translation helpful? Give feedback.
-
Can you reformat your code i.e encapsulate it within triple back ticks rather than single? It would increase the chances of getting help dramatically. |
Beta Was this translation helpful? Give feedback.
-
Apache Airflow version
Other Airflow 2 version (please specify below)
What happened
Airflow 2.5.3 doesn't show dags created by loop add the code.
` import os
from datetime import datetime
import sys
from airflow import DAG
from airflow.models import Variable
from airflow.models.baseoperator import chain
from airflow.operators.empty import EmptyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from rappiflow.operators import MultiStatementSnowflakeOperator
from settings import get_default_args_opsgenie
from airflow.decorators import dag
from airflow.utils.dag_parsing_context import get_parsing_context
queries_base_path = os.path.join(os.path.dirname(file), "queries")
countries = ["CO", "MX", "BR", "AR", "CL", "UY", "PE", "CR", "EC"]
days_dnp_drt = 600
split_steps_int = 100
total_task_in_line_int = split_steps_int - 1
dag_id_lst = []
dag_lst_dct = {}
dag_values_dct = {}
settings = {
"snowflake_conn_id": "fin_conn_snowflake_repro",
"google_sheets_id": "conn_google_sheets",
}
default_args = get_default_args_opsgenie(
vertical, url_notifications, priority, owner_process
)
ENV = Variable.get("ENV", "")
current_dag_id = get_parsing_context().dag_id
print(get_parsing_context())
current_dag = None
if len(sys.argv) > 3:
print(sys.argv)
current_dag = sys.argv[3]
for ope in ["DNP", "DRT"]:
dag_lst_dct[ope] = {}
for country in countries:
temp_lst = []
for i in range(days_dnp_drt, 1, -split_steps_int):
dag_id_str = "FIN_UE_{}{}{}_{}".format(
ope, country, str(i), str(i - total_task_in_line_int)
)
dag_values_dct[dag_id_str] = {
"start": i,
"stop": i - total_task_in_line_int,
}
temp_lst.append(dag_id_str)
dag_lst_dct[ope].update({country: temp_lst})
for ope in ["DNP", "DRT"]:
for country in countries:
for dag_id in dag_lst_dct[ope][country]:
if current_dag is not None and current_dag != dag_id:
continue
@dag(
dag_id=dag_id,
schedule_interval=None,
start_date=datetime(2020, 1, 14),
max_active_runs=1,
default_args=default_args,
catchup=False,
template_searchpath=queries_base_path,
tags=["REPROCESO"],
)
def create_dag_by_country():
start_task = EmptyOperator(
task_id="START_TASK", trigger_rule="all_done"
)
end_task = EmptyOperator(task_id="END_TASK", trigger_rule="all_done")
ope_lst = [
MultiStatementSnowflakeOperator(
task_id="UE_" + ope + "" + country + "" + str(day),
sql="FIN_UE/FIN_UE_DNP_DRT/TBL_UE_{}.sql".format(ope),
snowflake_conn_id=settings["snowflake_conn_id"],
params={
"country": country,
"db": ENV,
"schema": "GLOBAL_FINANCES",
"months_dnp_drt": day,
},
trigger_rule="all_done",
)
for day in range(
dag_values_dct[dag_id]["start"],
dag_values_dct[dag_id]["stop"] - 1,
-1,
)
]
chain(*ope_lst)
start_task >> ope_lst[0]
ope_lst[-1] >> end_task
What you think should happen instead
No response
How to reproduce
Add the code and generate tasks
Operating System
Linux
Versions of Apache Airflow Providers
snowflake providers
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
Anything else
No response
Are you willing to submit PR?
Code of Conduct
Beta Was this translation helpful? Give feedback.
All reactions