Skip to content

Commit

Permalink
Only mark SchedulerJobs as failed, not any jobs (apache#19375)
Browse files Browse the repository at this point in the history
In `adopt_or_reset_orphaned_tasks`, we set any SchedulerJobs that have
failed `scheduler_health_check_threshold` to failed, however a missing
condition was allowing that timeout to apply to all jobs, not just SchedulerJobs.
This is because polymorphic identity isn't included for `update()`:
https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.update

So if we had any running LocalTaskJobs that, for whatever reason, aren't
heartbeating faster than `scheduler_health_check_threshold`, their state
gets set to failed and they subsequently exit with a log line similar to:

    State of this instance has been externally set to scheduled. Terminating instance.

Note that the state it is set to can be different (e.g. queued or
up_for_retry) simply depending on how quickly the scheduler has
progressed that task_instance again.

(cherry picked from commit 38d329b)
  • Loading branch information
jedcunningham committed Nov 3, 2021
1 parent ba46937 commit fa0b998
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 0 deletions.
1 change: 1 addition & 0 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,7 @@ def adopt_or_reset_orphaned_tasks(self, session: Session = None):
num_failed = (
session.query(SchedulerJob)
.filter(
SchedulerJob.job_type == "SchedulerJob",
SchedulerJob.state == State.RUNNING,
SchedulerJob.latest_heartbeat < (timezone.utcnow() - timedelta(seconds=timeout)),
)
Expand Down
31 changes: 31 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import BaseExecutor
from airflow.jobs.backfill_job import BackfillJob
from airflow.jobs.base_job import BaseJob
from airflow.jobs.scheduler_job import SchedulerJob
from airflow.models import DAG, DagBag, DagModel, Pool, TaskInstance
from airflow.models.dagrun import DagRun
Expand Down Expand Up @@ -2445,6 +2446,36 @@ def test_adopt_or_reset_orphaned_tasks_stale_scheduler_jobs(self, dag_maker):
if old_job.processor_agent:
old_job.processor_agent.end()

def test_adopt_or_reset_orphaned_tasks_only_fails_scheduler_jobs(self, caplog):
"""Make sure we only set SchedulerJobs to failed, not all jobs"""
session = settings.Session()

self.scheduler_job = SchedulerJob(subdir=os.devnull)
self.scheduler_job.state = State.RUNNING
self.scheduler_job.latest_heartbeat = timezone.utcnow()
session.add(self.scheduler_job)
session.flush()

old_job = SchedulerJob(subdir=os.devnull)
old_job.state = State.RUNNING
old_job.latest_heartbeat = timezone.utcnow() - timedelta(minutes=15)
session.add(old_job)
session.flush()

old_task_job = BaseJob() # Imagine it's a LocalTaskJob, but this is easier to provision
old_task_job.state = State.RUNNING
old_task_job.latest_heartbeat = timezone.utcnow() - timedelta(minutes=15)
session.add(old_task_job)
session.flush()

with caplog.at_level('INFO', logger='airflow.jobs.scheduler_job'):
self.scheduler_job.adopt_or_reset_orphaned_tasks(session=session)
session.expire_all()

assert old_job.state == State.FAILED
assert old_task_job.state == State.RUNNING
assert 'Marked 1 SchedulerJob instances as failed' in caplog.messages

def test_send_sla_callbacks_to_processor_sla_disabled(self, dag_maker):
"""Test SLA Callbacks are not sent when check_slas is False"""
dag_id = 'test_send_sla_callbacks_to_processor_sla_disabled'
Expand Down

0 comments on commit fa0b998

Please sign in to comment.