From fa0b99891f56b71466299aa4729c7193e609b263 Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Wed, 3 Nov 2021 00:45:41 -0600 Subject: [PATCH] Only mark SchedulerJobs as failed, not any jobs (#19375) In `adopt_or_reset_orphaned_tasks`, we set any SchedulerJobs that have failed `scheduler_health_check_threshold` to failed, however a missing condition was allowing that timeout to apply to all jobs, not just SchedulerJobs. This is because polymorphic identity isn't included for `update()`: https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.update So if we had any running LocalTaskJobs that, for whatever reason, aren't heartbeating faster than `scheduler_health_check_threshold`, their state gets set to failed and they subsequently exit with a log line similar to: State of this instance has been externally set to scheduled. Terminating instance. Note that the state it is set to can be different (e.g. queued or up_for_retry) simply depending on how quickly the scheduler has progressed that task_instance again. (cherry picked from commit 38d329bd112e8be891f077b4e3300182930cf74d) --- airflow/jobs/scheduler_job.py | 1 + tests/jobs/test_scheduler_job.py | 31 +++++++++++++++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 2c7378d22ce9..2a230a77cd8b 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -1127,6 +1127,7 @@ def adopt_or_reset_orphaned_tasks(self, session: Session = None): num_failed = ( session.query(SchedulerJob) .filter( + SchedulerJob.job_type == "SchedulerJob", SchedulerJob.state == State.RUNNING, SchedulerJob.latest_heartbeat < (timezone.utcnow() - timedelta(seconds=timeout)), ) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 4922617f876e..53809603d6d0 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -37,6 +37,7 @@ from airflow.exceptions import AirflowException from airflow.executors.base_executor import BaseExecutor from airflow.jobs.backfill_job import BackfillJob +from airflow.jobs.base_job import BaseJob from airflow.jobs.scheduler_job import SchedulerJob from airflow.models import DAG, DagBag, DagModel, Pool, TaskInstance from airflow.models.dagrun import DagRun @@ -2445,6 +2446,36 @@ def test_adopt_or_reset_orphaned_tasks_stale_scheduler_jobs(self, dag_maker): if old_job.processor_agent: old_job.processor_agent.end() + def test_adopt_or_reset_orphaned_tasks_only_fails_scheduler_jobs(self, caplog): + """Make sure we only set SchedulerJobs to failed, not all jobs""" + session = settings.Session() + + self.scheduler_job = SchedulerJob(subdir=os.devnull) + self.scheduler_job.state = State.RUNNING + self.scheduler_job.latest_heartbeat = timezone.utcnow() + session.add(self.scheduler_job) + session.flush() + + old_job = SchedulerJob(subdir=os.devnull) + old_job.state = State.RUNNING + old_job.latest_heartbeat = timezone.utcnow() - timedelta(minutes=15) + session.add(old_job) + session.flush() + + old_task_job = BaseJob() # Imagine it's a LocalTaskJob, but this is easier to provision + old_task_job.state = State.RUNNING + old_task_job.latest_heartbeat = timezone.utcnow() - timedelta(minutes=15) + session.add(old_task_job) + session.flush() + + with caplog.at_level('INFO', logger='airflow.jobs.scheduler_job'): + self.scheduler_job.adopt_or_reset_orphaned_tasks(session=session) + session.expire_all() + + assert old_job.state == State.FAILED + assert old_task_job.state == State.RUNNING + assert 'Marked 1 SchedulerJob instances as failed' in caplog.messages + def test_send_sla_callbacks_to_processor_sla_disabled(self, dag_maker): """Test SLA Callbacks are not sent when check_slas is False""" dag_id = 'test_send_sla_callbacks_to_processor_sla_disabled'