Skip to content

Commit

Permalink
Add unit test to catch timeout errors. (apache#42269)
Browse files Browse the repository at this point in the history
Co-authored-by: Ulada Zakharava <vlada_zakharava@epam.com>
  • Loading branch information
VladaZakharova and Ulada Zakharava authored Sep 18, 2024
1 parent 1e71096 commit d7c4924
Showing 1 changed file with 40 additions and 0 deletions.
40 changes: 40 additions & 0 deletions tests/models/test_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,46 @@ def test_serialized_dag_errors_are_import_errors(self, mock_serialize, caplog):
assert "SerializationError" in err
session.rollback()

def test_timeout_dag_errors_are_import_errors(self, tmp_path, caplog):
"""
Test that if the DAG contains Timeout error it will be still loaded to DB as import_errors
"""
code_to_save = """
# Define Dag to load
import datetime
import time
import airflow
from airflow.operators.python import PythonOperator
time.sleep(31)
with airflow.DAG(
"import_timeout",
start_date=datetime.datetime(2022, 1, 1),
schedule=None) as dag:
def f():
print("Sleeping")
time.sleep(2)
for ind in range(10):
PythonOperator(
dag=dag,
task_id=f"sleep_2_{ind}",
python_callable=f,
)
"""
with open("tmp_file.py", "w") as f:
f.write(code_to_save)

dagbag = DagBag(dag_folder=os.fspath("tmp_file.py"), include_examples=False)
dag = dagbag._load_modules_from_file("tmp_file.py", safe_mode=False)

assert dag is not None
assert "tmp_file.py" in dagbag.import_errors
assert "DagBag import timeout for" in caplog.text

@patch("airflow.models.dagbag.DagBag.collect_dags")
@patch("airflow.models.serialized_dag.SerializedDagModel.write_dag")
@patch("airflow.models.dag.DAG.bulk_write_to_db")
Expand Down

0 comments on commit d7c4924

Please sign in to comment.