diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 2c7378d22ce9d..2a230a77cd8b7 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -1127,6 +1127,7 @@ def adopt_or_reset_orphaned_tasks(self, session: Session = None): num_failed = ( session.query(SchedulerJob) .filter( + SchedulerJob.job_type == "SchedulerJob", SchedulerJob.state == State.RUNNING, SchedulerJob.latest_heartbeat < (timezone.utcnow() - timedelta(seconds=timeout)), ) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 4922617f876eb..53809603d6d09 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -37,6 +37,7 @@ from airflow.exceptions import AirflowException from airflow.executors.base_executor import BaseExecutor from airflow.jobs.backfill_job import BackfillJob +from airflow.jobs.base_job import BaseJob from airflow.jobs.scheduler_job import SchedulerJob from airflow.models import DAG, DagBag, DagModel, Pool, TaskInstance from airflow.models.dagrun import DagRun @@ -2445,6 +2446,36 @@ def test_adopt_or_reset_orphaned_tasks_stale_scheduler_jobs(self, dag_maker): if old_job.processor_agent: old_job.processor_agent.end() + def test_adopt_or_reset_orphaned_tasks_only_fails_scheduler_jobs(self, caplog): + """Make sure we only set SchedulerJobs to failed, not all jobs""" + session = settings.Session() + + self.scheduler_job = SchedulerJob(subdir=os.devnull) + self.scheduler_job.state = State.RUNNING + self.scheduler_job.latest_heartbeat = timezone.utcnow() + session.add(self.scheduler_job) + session.flush() + + old_job = SchedulerJob(subdir=os.devnull) + old_job.state = State.RUNNING + old_job.latest_heartbeat = timezone.utcnow() - timedelta(minutes=15) + session.add(old_job) + session.flush() + + old_task_job = BaseJob() # Imagine it's a LocalTaskJob, but this is easier to provision + old_task_job.state = State.RUNNING + old_task_job.latest_heartbeat = timezone.utcnow() - timedelta(minutes=15) + session.add(old_task_job) + session.flush() + + with caplog.at_level('INFO', logger='airflow.jobs.scheduler_job'): + self.scheduler_job.adopt_or_reset_orphaned_tasks(session=session) + session.expire_all() + + assert old_job.state == State.FAILED + assert old_task_job.state == State.RUNNING + assert 'Marked 1 SchedulerJob instances as failed' in caplog.messages + def test_send_sla_callbacks_to_processor_sla_disabled(self, dag_maker): """Test SLA Callbacks are not sent when check_slas is False""" dag_id = 'test_send_sla_callbacks_to_processor_sla_disabled'