diff --git a/airflow/api_connexion/endpoints/backfill_endpoint.py b/airflow/api_connexion/endpoints/backfill_endpoint.py new file mode 100644 index 0000000000000..f974be4d75d82 --- /dev/null +++ b/airflow/api_connexion/endpoints/backfill_endpoint.py @@ -0,0 +1,181 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import logging +from functools import wraps +from typing import TYPE_CHECKING + +import pendulum +from sqlalchemy import select + +from airflow.api_connexion import security +from airflow.api_connexion.exceptions import Conflict, NotFound +from airflow.api_connexion.schemas.backfill_schema import ( + BackfillCollection, + backfill_collection_schema, + backfill_schema, +) +from airflow.models.backfill import Backfill +from airflow.models.serialized_dag import SerializedDagModel +from airflow.utils import timezone +from airflow.utils.session import NEW_SESSION, provide_session +from airflow.www.decorators import action_logging + +if TYPE_CHECKING: + from sqlalchemy.orm import Session + + from airflow.api_connexion.types import APIResponse + +log = logging.getLogger(__name__) + +RESOURCE_EVENT_PREFIX = "dag" + + +def backfill_to_dag(func): + """ + Enrich the request with dag_id. + + :meta private: + """ + + @wraps(func) + def wrapper(*, backfill_id, session, **kwargs): + backfill = session.get(Backfill, backfill_id) + if not backfill: + raise NotFound("Backfill not found") + return func(dag_id=backfill.dag_id, backfill_id=backfill_id, session=session, **kwargs) + + return wrapper + + +@provide_session +def _create_backfill( + *, + dag_id: str, + from_date: str, + to_date: str, + max_active_runs: int, + reverse: bool, + dag_run_conf: dict | None, + session: Session = NEW_SESSION, +) -> Backfill: + serdag = session.get(SerializedDagModel, dag_id) + if not serdag: + raise NotFound(f"Could not find dag {dag_id}") + + br = Backfill( + dag_id=dag_id, + from_date=pendulum.parse(from_date), + to_date=pendulum.parse(to_date), + max_active_runs=max_active_runs, + dag_run_conf=dag_run_conf, + ) + session.add(br) + session.commit() + return br + + +@security.requires_access_dag("GET") +@action_logging +@provide_session +def list_backfills(dag_id, session): + backfills = session.scalars(select(Backfill).where(Backfill.dag_id == dag_id)).all() + obj = BackfillCollection( + backfills=backfills, + total_entries=len(backfills), + ) + return backfill_collection_schema.dump(obj) + + +@provide_session +@backfill_to_dag +@security.requires_access_dag("PUT") +@action_logging +def pause_backfill(*, backfill_id, session, **kwargs): + br = session.get(Backfill, backfill_id) + if br.completed_at: + raise Conflict("Backfill is already completed.") + if br.is_paused is False: + br.is_paused = True + session.commit() + return backfill_schema.dump(br) + + +@provide_session +@backfill_to_dag +@security.requires_access_dag("PUT") +@action_logging +def unpause_backfill(*, backfill_id, session, **kwargs): + br = session.get(Backfill, backfill_id) + if br.completed_at: + raise Conflict("Backfill is already completed.") + if br.is_paused: + br.is_paused = False + session.commit() + return backfill_schema.dump(br) + + +@provide_session +@backfill_to_dag +@security.requires_access_dag("PUT") +@action_logging +def cancel_backfill(*, backfill_id, session, **kwargs): + br: Backfill = session.get(Backfill, backfill_id) + if br.completed_at is not None: + raise Conflict("Backfill is already completed.") + + br.completed_at = timezone.utcnow() + + # first, pause + if not br.is_paused: + br.is_paused = True + session.commit() + return backfill_schema.dump(br) + + +@provide_session +@backfill_to_dag +@security.requires_access_dag("GET") +@action_logging +def get_backfill(*, backfill_id: int, session: Session = NEW_SESSION, **kwargs): + backfill = session.get(Backfill, backfill_id) + if backfill: + return backfill_schema.dump(backfill) + raise NotFound("Backfill not found") + + +@security.requires_access_dag("PUT") +@action_logging +def create_backfill( + dag_id: str, + from_date: str, + to_date: str, + max_active_runs: int = 10, + reverse: bool = False, + dag_run_conf: dict | None = None, +) -> APIResponse: + backfill_obj = _create_backfill( + dag_id=dag_id, + from_date=from_date, + to_date=to_date, + max_active_runs=max_active_runs, + reverse=reverse, + dag_run_conf=dag_run_conf, + ) + return backfill_schema.dump(backfill_obj) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index 0c4b0414775f1..15ad6fd8a4f63 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -245,6 +245,200 @@ servers: description: Apache Airflow Stable API. paths: + # Database entities + /backfills: + get: + summary: List backfills + x-openapi-router-controller: airflow.api_connexion.endpoints.backfill_endpoint + operationId: list_backfills + tags: [Backfill] + parameters: + - name: dag_id + in: query + schema: + type: string + required: true + description: | + List backfills for this dag. + responses: + "200": + description: Success. + content: + application/json: + schema: + $ref: "#/components/schemas/BackfillCollection" + "401": + $ref: "#/components/responses/Unauthenticated" + "403": + $ref: "#/components/responses/PermissionDenied" + + post: + summary: Create a backfill job. + x-openapi-router-controller: airflow.api_connexion.endpoints.backfill_endpoint + operationId: create_backfill + tags: [Backfill] + parameters: + - name: dag_id + in: query + schema: + type: string + required: true + description: | + Create dag runs for this dag. + + - name: from_date + in: query + schema: + type: string + format: date-time + required: true + description: | + Create dag runs with logical dates from this date onward, including this date. + + - name: to_date + in: query + schema: + type: string + format: date-time + required: true + description: | + Create dag runs for logical dates up to but not including this date. + + - name: max_active_runs + in: query + schema: + type: integer + required: false + description: | + Maximum number of active DAG runs for the the backfill. + + - name: reverse + in: query + schema: + type: boolean + required: false + description: | + If true, run the dag runs in descending order of logical date. + + - name: config + in: query + schema: + # todo: AIP-78 make this object + type: string + required: false + description: | + If true, run the dag runs in descending order of logical date. + responses: + "200": + description: Success. + content: + application/json: + schema: + $ref: "#/components/schemas/Backfill" + "400": + $ref: "#/components/responses/BadRequest" + "401": + $ref: "#/components/responses/Unauthenticated" + "403": + $ref: "#/components/responses/PermissionDenied" + + /backfills/{backfill_id}: + parameters: + - $ref: "#/components/parameters/BackfillIdPath" + get: + summary: Get a backfill + x-openapi-router-controller: airflow.api_connexion.endpoints.backfill_endpoint + operationId: get_backfill + tags: [Backfill] + responses: + "200": + description: Success. + content: + application/json: + schema: + $ref: "#/components/schemas/Backfill" + "401": + $ref: "#/components/responses/Unauthenticated" + "403": + $ref: "#/components/responses/PermissionDenied" + "404": + $ref: "#/components/responses/NotFound" + + /backfills/{backfill_id}/pause: + parameters: + - $ref: "#/components/parameters/BackfillIdPath" + post: + summary: Pause a backfill + x-openapi-router-controller: airflow.api_connexion.endpoints.backfill_endpoint + operationId: pause_backfill + tags: [Backfill] + responses: + "200": + description: Success. + content: + application/json: + schema: + $ref: "#/components/schemas/Backfill" + "401": + $ref: "#/components/responses/Unauthenticated" + "403": + $ref: "#/components/responses/PermissionDenied" + "404": + $ref: "#/components/responses/NotFound" + "409": + $ref: "#/components/responses/Conflict" + + /backfills/{backfill_id}/unpause: + parameters: + - $ref: "#/components/parameters/BackfillIdPath" + post: + summary: Pause a backfill + x-openapi-router-controller: airflow.api_connexion.endpoints.backfill_endpoint + operationId: unpause_backfill + tags: [Backfill] + responses: + "200": + description: Success. + content: + application/json: + schema: + $ref: "#/components/schemas/Backfill" + "401": + $ref: "#/components/responses/Unauthenticated" + "403": + $ref: "#/components/responses/PermissionDenied" + "404": + $ref: "#/components/responses/NotFound" + "409": + $ref: "#/components/responses/Conflict" + + /backfills/{backfill_id}/cancel: + parameters: + - $ref: "#/components/parameters/BackfillIdPath" + post: + summary: Cancel a backfill + description: | + When a backfill is cancelled, all queued dag runs will be marked as failed. + Running dag runs will be allowed to continue. + x-openapi-router-controller: airflow.api_connexion.endpoints.backfill_endpoint + operationId: cancel_backfill + tags: [Backfill] + responses: + "200": + description: Success. + content: + application/json: + schema: + $ref: "#/components/schemas/Backfill" + "401": + $ref: "#/components/responses/Unauthenticated" + "403": + $ref: "#/components/responses/PermissionDenied" + "404": + $ref: "#/components/responses/NotFound" + "409": + $ref: "#/components/responses/Conflict" + # Database entities /connections: get: @@ -2704,6 +2898,66 @@ components: $ref: "#/components/schemas/UserCollectionItem" - $ref: "#/components/schemas/CollectionInfo" + Backfill: + description: > + Backfill entity object. + + Represents one backfill run / request. + type: object + properties: + id: + type: integer + description: id + dag_id: + type: string + description: The dag_id for the backfill. + from_date: + type: string + nullable: true + description: From date of the backfill (inclusive). + to_date: + type: string + nullable: true + description: To date of the backfill (exclusive). + dag_run_conf: + type: string + nullable: true + description: Dag run conf to be forwarded to the dag runs. + is_paused: + type: boolean + nullable: true + description: is_paused + max_active_runs: + type: integer + nullable: true + description: max_active_runs + created_at: + type: string + nullable: true + description: created_at + completed_at: + type: string + nullable: true + description: completed_at + updated_at: + type: string + nullable: true + description: updated_at + + + BackfillCollection: + type: object + description: | + Collection of backfill entities. + allOf: + - type: object + properties: + backfills: + type: array + items: + $ref: "#/components/schemas/Backfill" + - $ref: "#/components/schemas/CollectionInfo" + ConnectionCollectionItem: description: > Connection collection item. @@ -5125,6 +5379,36 @@ components: # Reusable path, query, header and cookie parameters parameters: + + BackfillIdPath: + in: path + name: backfill_id + schema: + type: integer + required: true + description: | + The integer id identifying the backfill entity. + + FromDate: + in: query + name: from_date + schema: + type: string + format: date-time + required: false + description: | + From date. + + ToDate: + in: query + name: to_date + schema: + type: string + format: date-time + required: false + description: | + To date. + # Pagination parameters PageOffset: in: query @@ -5691,6 +5975,13 @@ components: schema: $ref: "#/components/schemas/Error" # 409 + "Conflict": + description: There is some kind of conflict with the request. + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + # 409 "AlreadyExists": description: An existing resource conflicts with the request. content: diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index cefe16d863f52..8ff2541353688 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -41,7 +41,6 @@ class Backfill(Base): Controls whether new dag runs will be created for this backfill. Does not pause existing dag runs. - todo: AIP-78 Add test """ max_active_runs = Column(Integer, default=10, nullable=False) created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False) @@ -49,12 +48,6 @@ class Backfill(Base): updated_at = Column(UtcDateTime, default=timezone.utcnow, onupdate=timezone.utcnow, nullable=False) -# todo: AIP-78 implement clear_failed_tasks?` -# todo: AIP-78 implement clear_dag_run? - -# todo: (AIP-78) should backfill be supported for things with no schedule, or statically partitioned assets? - - class BackfillDagRun(Base): """Mapping table between backfill run and dag run.""" diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index 60fd384df00a7..3616be30a1fac 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -6,6 +6,50 @@ import type { CamelCasedPropertiesDeep } from "type-fest"; */ export interface paths { + "/backfills": { + get: operations["list_backfills"]; + post: operations["create_backfill"]; + }; + "/backfills/{backfill_id}": { + get: operations["get_backfill"]; + parameters: { + path: { + /** The integer id identifying the backfill entity. */ + backfill_id: components["parameters"]["BackfillIdPath"]; + }; + }; + }; + "/backfills/{backfill_id}/pause": { + post: operations["pause_backfill"]; + parameters: { + path: { + /** The integer id identifying the backfill entity. */ + backfill_id: components["parameters"]["BackfillIdPath"]; + }; + }; + }; + "/backfills/{backfill_id}/unpause": { + post: operations["unpause_backfill"]; + parameters: { + path: { + /** The integer id identifying the backfill entity. */ + backfill_id: components["parameters"]["BackfillIdPath"]; + }; + }; + }; + "/backfills/{backfill_id}/cancel": { + /** + * When a backfill is cancelled, all queued dag runs will be marked as failed. + * Running dag runs will be allowed to continue. + */ + post: operations["cancel_backfill"]; + parameters: { + path: { + /** The integer id identifying the backfill entity. */ + backfill_id: components["parameters"]["BackfillIdPath"]; + }; + }; + }; "/connections": { get: operations["get_connections"]; post: operations["post_connection"]; @@ -861,6 +905,36 @@ export interface components { UserCollection: { users?: components["schemas"]["UserCollectionItem"][]; } & components["schemas"]["CollectionInfo"]; + /** + * @description Backfill entity object. + * Represents one backfill run / request. + */ + Backfill: { + /** @description id */ + id?: number; + /** @description The dag_id for the backfill. */ + dag_id?: string; + /** @description From date of the backfill (inclusive). */ + from_date?: string | null; + /** @description To date of the backfill (exclusive). */ + to_date?: string | null; + /** @description Dag run conf to be forwarded to the dag runs. */ + dag_run_conf?: string | null; + /** @description is_paused */ + is_paused?: boolean | null; + /** @description max_active_runs */ + max_active_runs?: number | null; + /** @description created_at */ + created_at?: string | null; + /** @description completed_at */ + completed_at?: string | null; + /** @description updated_at */ + updated_at?: string | null; + }; + /** @description Collection of backfill entities. */ + BackfillCollection: { + backfills?: components["schemas"]["Backfill"][]; + } & components["schemas"]["CollectionInfo"]; /** * @description Connection collection item. * The password and extra fields are only available when retrieving a single object due to the sensitivity of this data. @@ -2405,6 +2479,12 @@ export interface components { "application/json": components["schemas"]["Error"]; }; }; + /** There is some kind of conflict with the request. */ + Conflict: { + content: { + "application/json": components["schemas"]["Error"]; + }; + }; /** An existing resource conflicts with the request. */ AlreadyExists: { content: { @@ -2419,6 +2499,12 @@ export interface components { }; }; parameters: { + /** @description The integer id identifying the backfill entity. */ + BackfillIdPath: number; + /** @description From date. */ + FromDate: string; + /** @description To date. */ + ToDate: string; /** @description The number of items to skip before starting to collect the result set. */ PageOffset: number; /** @description The numbers of items to return. */ @@ -2621,6 +2707,136 @@ export interface components { } export interface operations { + list_backfills: { + parameters: { + query: { + /** List backfills for this dag. */ + dag_id: string; + }; + }; + responses: { + /** Success. */ + 200: { + content: { + "application/json": components["schemas"]["BackfillCollection"]; + }; + }; + 401: components["responses"]["Unauthenticated"]; + 403: components["responses"]["PermissionDenied"]; + }; + }; + create_backfill: { + parameters: { + query: { + /** Create dag runs for this dag. */ + dag_id: string; + /** Create dag runs with logical dates from this date onward, including this date. */ + from_date: string; + /** Create dag runs for logical dates up to but not including this date. */ + to_date: string; + /** Maximum number of active DAG runs for the the backfill. */ + max_active_runs?: number; + /** If true, run the dag runs in descending order of logical date. */ + reverse?: boolean; + /** If true, run the dag runs in descending order of logical date. */ + config?: string; + }; + }; + responses: { + /** Success. */ + 200: { + content: { + "application/json": components["schemas"]["Backfill"]; + }; + }; + 400: components["responses"]["BadRequest"]; + 401: components["responses"]["Unauthenticated"]; + 403: components["responses"]["PermissionDenied"]; + }; + }; + get_backfill: { + parameters: { + path: { + /** The integer id identifying the backfill entity. */ + backfill_id: components["parameters"]["BackfillIdPath"]; + }; + }; + responses: { + /** Success. */ + 200: { + content: { + "application/json": components["schemas"]["Backfill"]; + }; + }; + 401: components["responses"]["Unauthenticated"]; + 403: components["responses"]["PermissionDenied"]; + 404: components["responses"]["NotFound"]; + }; + }; + pause_backfill: { + parameters: { + path: { + /** The integer id identifying the backfill entity. */ + backfill_id: components["parameters"]["BackfillIdPath"]; + }; + }; + responses: { + /** Success. */ + 200: { + content: { + "application/json": components["schemas"]["Backfill"]; + }; + }; + 401: components["responses"]["Unauthenticated"]; + 403: components["responses"]["PermissionDenied"]; + 404: components["responses"]["NotFound"]; + 409: components["responses"]["Conflict"]; + }; + }; + unpause_backfill: { + parameters: { + path: { + /** The integer id identifying the backfill entity. */ + backfill_id: components["parameters"]["BackfillIdPath"]; + }; + }; + responses: { + /** Success. */ + 200: { + content: { + "application/json": components["schemas"]["Backfill"]; + }; + }; + 401: components["responses"]["Unauthenticated"]; + 403: components["responses"]["PermissionDenied"]; + 404: components["responses"]["NotFound"]; + 409: components["responses"]["Conflict"]; + }; + }; + /** + * When a backfill is cancelled, all queued dag runs will be marked as failed. + * Running dag runs will be allowed to continue. + */ + cancel_backfill: { + parameters: { + path: { + /** The integer id identifying the backfill entity. */ + backfill_id: components["parameters"]["BackfillIdPath"]; + }; + }; + responses: { + /** Success. */ + 200: { + content: { + "application/json": components["schemas"]["Backfill"]; + }; + }; + 401: components["responses"]["Unauthenticated"]; + 403: components["responses"]["PermissionDenied"]; + 404: components["responses"]["NotFound"]; + 409: components["responses"]["Conflict"]; + }; + }; get_connections: { parameters: { query: { @@ -5048,6 +5264,12 @@ export type User = CamelCasedPropertiesDeep; export type UserCollection = CamelCasedPropertiesDeep< components["schemas"]["UserCollection"] >; +export type Backfill = CamelCasedPropertiesDeep< + components["schemas"]["Backfill"] +>; +export type BackfillCollection = CamelCasedPropertiesDeep< + components["schemas"]["BackfillCollection"] +>; export type ConnectionCollectionItem = CamelCasedPropertiesDeep< components["schemas"]["ConnectionCollectionItem"] >; @@ -5305,6 +5527,24 @@ export type HealthStatus = CamelCasedPropertiesDeep< export type Operations = operations; /* Types for operation variables */ +export type ListBackfillsVariables = CamelCasedPropertiesDeep< + operations["list_backfills"]["parameters"]["query"] +>; +export type CreateBackfillVariables = CamelCasedPropertiesDeep< + operations["create_backfill"]["parameters"]["query"] +>; +export type GetBackfillVariables = CamelCasedPropertiesDeep< + operations["get_backfill"]["parameters"]["path"] +>; +export type PauseBackfillVariables = CamelCasedPropertiesDeep< + operations["pause_backfill"]["parameters"]["path"] +>; +export type UnpauseBackfillVariables = CamelCasedPropertiesDeep< + operations["unpause_backfill"]["parameters"]["path"] +>; +export type CancelBackfillVariables = CamelCasedPropertiesDeep< + operations["cancel_backfill"]["parameters"]["path"] +>; export type GetConnectionsVariables = CamelCasedPropertiesDeep< operations["get_connections"]["parameters"]["query"] >; diff --git a/tests/api_connexion/endpoints/test_backfill_endpoint.py b/tests/api_connexion/endpoints/test_backfill_endpoint.py new file mode 100644 index 0000000000000..51a4faf40055c --- /dev/null +++ b/tests/api_connexion/endpoints/test_backfill_endpoint.py @@ -0,0 +1,440 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import os +from datetime import datetime +from unittest import mock +from urllib.parse import urlencode + +import pendulum +import pytest + +from airflow.models import DagBag, DagModel +from airflow.models.backfill import Backfill +from airflow.models.dag import DAG +from airflow.models.serialized_dag import SerializedDagModel +from airflow.operators.empty import EmptyOperator +from airflow.security import permissions +from airflow.utils import timezone +from airflow.utils.session import provide_session +from tests.test_utils.api_connexion_utils import create_user, delete_user +from tests.test_utils.db import clear_db_backfills, clear_db_dags, clear_db_runs, clear_db_serialized_dags + +pytestmark = [pytest.mark.db_test] + + +DAG_ID = "test_dag" +TASK_ID = "op1" +DAG2_ID = "test_dag2" +DAG3_ID = "test_dag3" +UTC_JSON_REPR = "UTC" if pendulum.__version__.startswith("3") else "Timezone('UTC')" + + +@pytest.fixture(scope="module") +def configured_app(minimal_app_for_api): + app = minimal_app_for_api + + create_user( + app, # type: ignore + username="test", + role_name="Test", + permissions=[ + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG), + ], + ) + create_user(app, username="test_no_permissions", role_name="TestNoPermissions") # type: ignore + create_user(app, username="test_granular_permissions", role_name="TestGranularDag") # type: ignore + app.appbuilder.sm.sync_perm_for_dag( # type: ignore + "TEST_DAG_1", + access_control={ + "TestGranularDag": { + permissions.RESOURCE_DAG: {permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ} + }, + }, + ) + + with DAG( + DAG_ID, + schedule=None, + start_date=datetime(2020, 6, 15), + doc_md="details", + params={"foo": 1}, + tags=["example"], + ) as dag: + EmptyOperator(task_id=TASK_ID) + + with DAG(DAG2_ID, schedule=None, start_date=datetime(2020, 6, 15)) as dag2: # no doc_md + EmptyOperator(task_id=TASK_ID) + + with DAG(DAG3_ID, schedule=None) as dag3: # DAG start_date set to None + EmptyOperator(task_id=TASK_ID, start_date=datetime(2019, 6, 12)) + + dag_bag = DagBag(os.devnull, include_examples=False) + dag_bag.dags = {dag.dag_id: dag, dag2.dag_id: dag2, dag3.dag_id: dag3} + + app.dag_bag = dag_bag + + yield app + + delete_user(app, username="test") # type: ignore + delete_user(app, username="test_no_permissions") # type: ignore + delete_user(app, username="test_granular_permissions") # type: ignore + + +class TestBackfillEndpoint: + @staticmethod + def clean_db(): + clear_db_backfills() + clear_db_runs() + clear_db_dags() + clear_db_serialized_dags() + + @pytest.fixture(autouse=True) + def setup_attrs(self, configured_app) -> None: + self.clean_db() + self.app = configured_app + self.client = self.app.test_client() # type:ignore + self.dag_id = DAG_ID + self.dag2_id = DAG2_ID + self.dag3_id = DAG3_ID + + def teardown_method(self) -> None: + self.clean_db() + + @provide_session + def _create_dag_models(self, *, count=1, dag_id_prefix="TEST_DAG", is_paused=False, session=None): + dags = [] + for num in range(1, count + 1): + dag_model = DagModel( + dag_id=f"{dag_id_prefix}_{num}", + fileloc=f"/tmp/dag_{num}.py", + is_active=True, + timetable_summary="0 0 * * *", + is_paused=is_paused, + ) + session.add(dag_model) + dags.append(dag_model) + return dags + + @provide_session + def _create_deactivated_dag(self, session=None): + dag_model = DagModel( + dag_id="TEST_DAG_DELETED_1", + fileloc="/tmp/dag_del_1.py", + schedule_interval="2 2 * * *", + is_active=False, + ) + session.add(dag_model) + + +class TestListBackfills(TestBackfillEndpoint): + def test_should_respond_200(self, session): + (dag,) = self._create_dag_models() + from_date = timezone.utcnow() + to_date = timezone.utcnow() + b = Backfill(dag_id=dag.dag_id, from_date=from_date, to_date=to_date) + session.add(b) + session.commit() + response = self.client.get( + f"/api/v1/backfills?dag_id={dag.dag_id}", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 200 + assert response.json == { + "backfills": [ + { + "completed_at": mock.ANY, + "created_at": mock.ANY, + "dag_id": "TEST_DAG_1", + "dag_run_conf": None, + "from_date": from_date.isoformat(), + "id": b.id, + "is_paused": False, + "max_active_runs": 10, + "to_date": to_date.isoformat(), + "updated_at": mock.ANY, + } + ], + "total_entries": 1, + } + + @pytest.mark.parametrize( + "user, expected", + [ + ("test_granular_permissions", 200), + ("test_no_permissions", 403), + ("test", 200), + (None, 401), + ], + ) + def test_should_respond_200_with_granular_dag_access(self, user, expected, session): + (dag,) = self._create_dag_models() + from_date = timezone.utcnow() + to_date = timezone.utcnow() + b = Backfill( + dag_id=dag.dag_id, + from_date=from_date, + to_date=to_date, + ) + + session.add(b) + session.commit() + kwargs = {} + if user: + kwargs.update(environ_overrides={"REMOTE_USER": user}) + response = self.client.get("/api/v1/backfills?dag_id=TEST_DAG_1", **kwargs) + assert response.status_code == expected + + +class TestGetBackfill(TestBackfillEndpoint): + def test_should_respond_200(self, session): + (dag,) = self._create_dag_models() + from_date = timezone.utcnow() + to_date = timezone.utcnow() + backfill = Backfill(dag_id=dag.dag_id, from_date=from_date, to_date=to_date) + session.add(backfill) + session.commit() + response = self.client.get( + f"/api/v1/backfills/{backfill.id}", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 200 + assert response.json == { + "completed_at": mock.ANY, + "created_at": mock.ANY, + "dag_id": "TEST_DAG_1", + "dag_run_conf": None, + "from_date": from_date.isoformat(), + "id": backfill.id, + "is_paused": False, + "max_active_runs": 10, + "to_date": to_date.isoformat(), + "updated_at": mock.ANY, + } + + def test_no_exist(self, session): + response = self.client.get( + f"/api/v1/backfills/{23198409834208}", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 404 + assert response.json.get("title") == "Backfill not found" + + @pytest.mark.parametrize( + "user, expected", + [ + ("test_granular_permissions", 200), + ("test_no_permissions", 403), + ("test", 200), + (None, 401), + ], + ) + def test_should_respond_200_with_granular_dag_access(self, user, expected, session): + (dag,) = self._create_dag_models() + from_date = timezone.utcnow() + to_date = timezone.utcnow() + backfill = Backfill( + dag_id=dag.dag_id, + from_date=from_date, + to_date=to_date, + ) + session.add(backfill) + session.commit() + kwargs = {} + if user: + kwargs.update(environ_overrides={"REMOTE_USER": user}) + response = self.client.get(f"/api/v1/backfills/{backfill.id}", **kwargs) + assert response.status_code == expected + + +class TestCreateBackfill(TestBackfillEndpoint): + @pytest.mark.parametrize( + "user, expected", + [ + ("test_granular_permissions", 200), + ("test_no_permissions", 403), + ("test", 200), + (None, 401), + ], + ) + def test_create_backfill(self, user, expected, session, dag_maker): + with dag_maker(session=session, dag_id="TEST_DAG_1", schedule="0 * * * *") as dag: + EmptyOperator(task_id="mytask") + session.add(SerializedDagModel(dag)) + session.commit() + session.query(DagModel).all() + from_date = pendulum.parse("2024-01-01") + from_date_iso = from_date.isoformat() + to_date = pendulum.parse("2024-02-01") + to_date_iso = to_date.isoformat() + max_active_runs = 5 + query = urlencode( + query={ + "dag_id": dag.dag_id, + "from_date": f"{from_date_iso}", + "to_date": f"{to_date_iso}", + "max_active_runs": max_active_runs, + "reverse": False, + } + ) + kwargs = {} + if user: + kwargs.update(environ_overrides={"REMOTE_USER": user}) + + response = self.client.post( + f"/api/v1/backfills?{query}", + **kwargs, + ) + assert response.status_code == expected + if expected < 300: + assert response.json == { + "completed_at": mock.ANY, + "created_at": mock.ANY, + "dag_id": "TEST_DAG_1", + "dag_run_conf": None, + "from_date": from_date_iso, + "id": mock.ANY, + "is_paused": False, + "max_active_runs": 5, + "to_date": to_date_iso, + "updated_at": mock.ANY, + } + + +class TestPauseBackfill(TestBackfillEndpoint): + def test_should_respond_200(self, session): + (dag,) = self._create_dag_models() + from_date = timezone.utcnow() + to_date = timezone.utcnow() + backfill = Backfill(dag_id=dag.dag_id, from_date=from_date, to_date=to_date) + session.add(backfill) + session.commit() + response = self.client.post( + f"/api/v1/backfills/{backfill.id}/pause", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 200 + assert response.json == { + "completed_at": mock.ANY, + "created_at": mock.ANY, + "dag_id": "TEST_DAG_1", + "dag_run_conf": None, + "from_date": from_date.isoformat(), + "id": backfill.id, + "is_paused": True, + "max_active_runs": 10, + "to_date": to_date.isoformat(), + "updated_at": mock.ANY, + } + + @pytest.mark.parametrize( + "user, expected", + [ + ("test_granular_permissions", 200), + ("test_no_permissions", 403), + ("test", 200), + (None, 401), + ], + ) + def test_should_respond_200_with_granular_dag_access(self, user, expected, session): + (dag,) = self._create_dag_models() + from_date = timezone.utcnow() + to_date = timezone.utcnow() + backfill = Backfill( + dag_id=dag.dag_id, + from_date=from_date, + to_date=to_date, + ) + session.add(backfill) + session.commit() + kwargs = {} + if user: + kwargs.update(environ_overrides={"REMOTE_USER": user}) + response = self.client.post(f"/api/v1/backfills/{backfill.id}/pause", **kwargs) + assert response.status_code == expected + + +class TestCancelBackfill(TestBackfillEndpoint): + def test_should_respond_200(self, session): + (dag,) = self._create_dag_models() + from_date = timezone.utcnow() + to_date = timezone.utcnow() + backfill = Backfill(dag_id=dag.dag_id, from_date=from_date, to_date=to_date) + session.add(backfill) + session.commit() + response = self.client.post( + f"/api/v1/backfills/{backfill.id}/cancel", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 200 + assert response.json == { + "completed_at": mock.ANY, + "created_at": mock.ANY, + "dag_id": "TEST_DAG_1", + "dag_run_conf": None, + "from_date": from_date.isoformat(), + "id": backfill.id, + "is_paused": True, + "max_active_runs": 10, + "to_date": to_date.isoformat(), + "updated_at": mock.ANY, + } + assert pendulum.parse(response.json["completed_at"]) + # now it is marked as completed + assert pendulum.parse(response.json["completed_at"]) + + # get conflict when canceling already-canceled backfill + response = self.client.post( + f"/api/v1/backfills/{backfill.id}/cancel", environ_overrides={"REMOTE_USER": "test"} + ) + assert response.status_code == 409 + + @pytest.mark.parametrize( + "user, expected", + [ + ("test_granular_permissions", 200), + ("test_no_permissions", 403), + ("test", 200), + (None, 401), + ], + ) + def test_should_respond_200_with_granular_dag_access(self, user, expected, session): + (dag,) = self._create_dag_models() + from_date = timezone.utcnow() + to_date = timezone.utcnow() + backfill = Backfill( + dag_id=dag.dag_id, + from_date=from_date, + to_date=to_date, + ) + session.add(backfill) + session.commit() + kwargs = {} + if user: + kwargs.update(environ_overrides={"REMOTE_USER": user}) + response = self.client.post(f"/api/v1/backfills/{backfill.id}/cancel", **kwargs) + assert response.status_code == expected + if response.status_code < 300: + # now it is marked as completed + assert pendulum.parse(response.json["completed_at"]) + + # get conflict when canceling already-canceled backfill + response = self.client.post(f"/api/v1/backfills/{backfill.id}/cancel", **kwargs) + assert response.status_code == 409 diff --git a/tests/test_utils/db.py b/tests/test_utils/db.py index ceb6bc94b8dce..77875bb03ec51 100644 --- a/tests/test_utils/db.py +++ b/tests/test_utils/db.py @@ -35,6 +35,7 @@ Variable, XCom, ) +from airflow.models.backfill import Backfill, BackfillDagRun from airflow.models.dag import DagOwnerAttributes from airflow.models.dagcode import DagCode from airflow.models.dagwarning import DagWarning @@ -66,6 +67,12 @@ def clear_db_runs(): pass +def clear_db_backfills(): + with create_session() as session: + session.query(BackfillDagRun).delete() + session.query(Backfill).delete() + + def clear_db_datasets(): with create_session() as session: session.query(DatasetEvent).delete()