Skip to content

Commit

Permalink
Use apache-airflow-providers-openlineage, bump min Airflow version …
Browse files Browse the repository at this point in the history
…to 2.7 and Python version to 3.8 (#2103)

Fixes issue reported where deployments don't contain the
`openlineage-airflow`
library installed but instead contain
`apache-airflow-providers-openlineage`.
To fix the issue we transform the usages from `openlineage-airflow` to 
corresponding available source code in the
`apache-airflow-providers-openlineage`
Airflow OSS provider.

Since the apache-airflow-providers-openlineage depends on Airflow >=
2.7,
we also correspondingly bump the Airflow version.
Airflow 2.7.0 is also the first release that drops support for
end-of-life Python 3.7,
and hence, we bump up the min Python version to 3.8

related: astronomer/issues-airflow#503

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com>
  • Loading branch information
3 people authored Jan 24, 2024
1 parent 6007144 commit 5c011cb
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 135 deletions.
87 changes: 2 additions & 85 deletions .github/workflows/ci-python-sdk.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -538,96 +538,14 @@ jobs:
AZURE_WASB_CONN_STRING: ${{ secrets.AZURE_WASB_CONN_STRING }}
AZURE_WASB_ACCESS_KEY: ${{ secrets.AZURE_WASB_ACCESS_KEY }}

Run-example-dag-tests-Airflow-2-2-5:
if: >-
github.event_name == 'push' ||
(
github.event_name == 'pull_request' &&
github.event.pull_request.head.repo.fork == false
) ||
(
github.event_name == 'pull_request_target' &&
contains(github.event.pull_request.labels.*.name, 'safe to test')
)||
(
github.event_name == 'release'
)
runs-on: ubuntu-latest
services:
postgres:
# Docker Hub image
image: dimberman/pagila-test
env:
POSTGRES_PASSWORD: postgres
# Set health checks to wait until postgres has started
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 5432:5432
sftp:
image: ghcr.io/astronomer/astro-sdk/sftp_docker
ports:
- 2222:22
ftp:
image: ghcr.io/astronomer/astro-sdk/ftp_docker
ports:
- 21:21
- 30000-30009:30000-30009
env:
FTP_USER_NAME: ${{ secrets.SFTP_USERNAME }}
FTP_USER_PASS: ${{ secrets.SFTP_PASSWORD }}
FTP_USER_HOME: /home/foo
PUBLICHOST: "localhost"
steps:
- uses: actions/checkout@v3
if: github.event_name != 'pull_request_target'

- name: Checkout pull/${{ github.event.number }}
uses: actions/checkout@v3
with:
ref: ${{ github.event.pull_request.head.sha }}
if: github.event_name == 'pull_request_target'

- uses: actions/setup-python@v3
with:
python-version: '3.8'
architecture: 'x64'
- uses: actions/cache@v3
with:
path: |
~/.cache/pip
.nox
key: ${{ runner.os }}-2.2.5-${{ hashFiles('python-sdk/pyproject.toml') }}-${{ hashFiles('python-sdk/src/astro/__init__.py') }}
- run: cat ../.github/ci-test-connections.yaml > test-connections.yaml
- run: python -c 'import os; print(os.getenv("GOOGLE_APPLICATION_CREDENTIALS_JSON", "").strip())' > ${{ env.GOOGLE_APPLICATION_CREDENTIALS }}
- run: sqlite3 /tmp/sqlite_default.db "VACUUM;"
- run: pip3 install nox
- run: nox -s "test-3.8(airflow='2.2.5')" -- "tests_integration/test_example_dags.py" "tests_integration/integration_test_dag.py" -k "not redshift"
env:
GOOGLE_APPLICATION_CREDENTIALS_JSON: ${{ secrets.GOOGLE_APPLICATION_CREDENTIALS_JSON }}
GOOGLE_APPLICATION_CREDENTIALS: /tmp/google_credentials.json
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
REDSHIFT_NATIVE_LOAD_IAM_ROLE_ARN: ${{ secrets.REDSHIFT_NATIVE_LOAD_IAM_ROLE_ARN }}
REDSHIFT_USERNAME: ${{ secrets.REDSHIFT_USERNAME }}
REDSHIFT_PASSWORD: ${{ secrets.REDSHIFT_PASSWORD }}
SNOWFLAKE_ACCOUNT_NAME: ${{ secrets.SNOWFLAKE_UNAME }}
SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}
DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}
AIRFLOW__ASTRO_SDK__DATABRICKS_CLUSTER_ID: ${{ secrets.DATABRICKS_CLUSTER_ID }}
AZURE_WASB_CONN_STRING: ${{ secrets.AZURE_WASB_CONN_STRING }}
AZURE_WASB_ACCESS_KEY: ${{ secrets.AZURE_WASB_ACCESS_KEY }}

Generate-Constraints:
if: (github.event_name == 'release' || github.event_name == 'push')
strategy:
fail-fast: false
matrix:
python: [ '3.7', '3.8', '3.9', '3.10' ]
airflow: [ '2.2.5', '2.3', '2.4', '2.5', '2.6', '2.7', '2.8']
python: [ '3.8', '3.9', '3.10', '3.11' ]
airflow: [ '2.7', '2.8']
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
Expand Down Expand Up @@ -724,7 +642,6 @@ jobs:
name: Build and publish Python 🐍 distributions 📦 to PyPI
needs:
- Run-Unit-tests-Airflow-2-8
- Run-example-dag-tests-Airflow-2-2-5
- Run-Integration-tests-Airflow-2-8
- Run-load-file-Integration-Airflow-2-8
- Run-example-dag-Integration-Airflow-2-8
Expand Down
31 changes: 7 additions & 24 deletions python-sdk/noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,17 @@ def dev(session: nox.Session) -> None:
session.install("-e", ".[all,tests]")


@nox.session(python=["3.7", "3.8", "3.9", "3.10", "3.11"])
@nox.parametrize("airflow", ["2.2.5", "2.4", "2.5", "2.6", "2.7", "2.8"])
@nox.session(python=["3.8", "3.9", "3.10", "3.11"])
@nox.parametrize("airflow", ["2.7", "2.8"])
def test(session: nox.Session, airflow) -> None:
"""Run both unit and integration tests."""
env = {
"AIRFLOW_HOME": f"~/airflow-{airflow}-python-{session.python}",
"AIRFLOW__CORE__ALLOWED_DESERIALIZATION_CLASSES": "airflow\\.* astro\\.*",
}

if airflow == "2.2.5":
env[
"AIRFLOW__CORE__XCOM_BACKEND"
] = "astro.custom_backend.astro_custom_backend.AstroCustomXcomBackend"
env["AIRFLOW__ASTRO_SDK__STORE_DATA_LOCAL_DEV"] = "True"

# If you need a pinned version of a provider to be present in a nox session then
# update the constraints file used below with that version of provider
# For example as part of MSSQL support we need apache-airflow-providers-microsoft-mssql>=3.2 and this
# has been updated in the below constraint file.
session.install(f"apache-airflow=={airflow}", "-c", "tests/modified_constraint_file.txt")
session.install("-e", ".[all,tests]", "-c", "tests/modified_constraint_file.txt")
session.install("apache-airflow-providers-common-sql==1.2.0")
# install smart-open 6.3.0 since it has FTP implementation
session.install("smart-open>=6.3.0")
else:
env["AIRFLOW__CORE__ALLOWED_DESERIALIZATION_CLASSES"] = "airflow\\.* astro\\.*"

session.install(f"apache-airflow~={airflow}")
session.install("-e", ".[all,tests]")
session.install(f"apache-airflow~={airflow}")
session.install("-e", ".[all,tests]")

# Log all the installed dependencies
session.log("Installed Dependencies:")
Expand Down Expand Up @@ -150,8 +133,8 @@ def build_docs(session: nox.Session) -> None:
session.run("make", "html")


@nox.session(python=["3.7", "3.8", "3.9", "3.10", "3.11"])
@nox.parametrize("airflow", ["2.2.5", "2.3", "2.4", "2.5", "2.6", "2.7", "2.8"])
@nox.session(python=["3.8", "3.9", "3.10", "3.11"])
@nox.parametrize("airflow", ["2.7", "2.8"])
def generate_constraints(session: nox.Session, airflow) -> None:
"""Generate constraints file"""
session.install("wheel")
Expand Down
10 changes: 4 additions & 6 deletions python-sdk/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@ authors = [
readme = "README.md"
license = { file = "LICENSE" }

requires-python = ">=3.7"
requires-python = ">=3.8"
dependencies = [
"apache-airflow>=2.0",
"apache-airflow>=2.7",
"attrs>=20.3.0",
"pandas",
"pyarrow",
"python-frontmatter",
"smart-open",
"SQLAlchemy>=1.3.18",
"cached_property>=1.5.0;python_version<='3.7'",
"Flask-Session<0.6.0" # This release breaking our tests, let's pin it as a temporary workaround
]

Expand All @@ -36,7 +35,6 @@ classifiers = [
"Framework :: Apache Airflow",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
Expand Down Expand Up @@ -87,7 +85,7 @@ ftp = [
"apache-airflow-providers-ftp>=3.0.0",
"smart-open>=5.2.1",
]
openlineage = ["openlineage-airflow>=0.17.0"]
openlineage = ["apache-airflow-providers-openlineage>=1.4.0"]

databricks = [
"databricks-cli",
Expand Down Expand Up @@ -125,7 +123,7 @@ all = [
"databricks-sql-connector<2.9.0",
"s3fs",
"protobuf",
"openlineage-airflow>=0.17.0",
"apache-airflow-providers-openlineage>=1.4.0",
"apache-airflow-providers-microsoft-azure",
"azure-storage-blob",
"apache-airflow-providers-microsoft-mssql>=3.2",
Expand Down
4 changes: 1 addition & 3 deletions python-sdk/src/astro/lineage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
log = logging.getLogger(__name__)

try:
from openlineage.airflow.extractors import TaskMetadata
from openlineage.airflow.extractors.base import BaseExtractor, OperatorLineage
from openlineage.airflow.utils import get_job_name
from airflow.providers.openlineage.extractors import OperatorLineage
from openlineage.client.facet import (
BaseFacet,
DataQualityMetricsInputDatasetFacet,
Expand Down
25 changes: 8 additions & 17 deletions python-sdk/tests_integration/extractors/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import pendulum
import pytest
from airflow.models.taskinstance import TaskInstance
from airflow.providers.openlineage.extractors.base import DefaultExtractor
from airflow.providers.openlineage.extractors.manager import ExtractorManager
from airflow.utils import timezone
from openlineage.airflow.extractors import Extractors
from openlineage.airflow.extractors.base import DefaultExtractor
from openlineage.client.facet import DataQualityMetricsInputDatasetFacet, OutputStatisticsOutputDatasetFacet
from openlineage.client.run import Dataset as OpenlineageDataset

Expand Down Expand Up @@ -114,14 +114,13 @@ def test_python_sdk_load_file_extract_on_complete(mock_xcom_pull):
tzinfo = pendulum.timezone("UTC")
execution_date = timezone.datetime(2022, 1, 1, 1, 0, 0, tzinfo=tzinfo)
task_instance = TaskInstance(task=load_file_operator, run_id=execution_date)
python_sdk_extractor = Extractors().get_extractor_class(LoadFileOperator)
python_sdk_extractor = ExtractorManager().get_extractor_class(LoadFileOperator)
assert python_sdk_extractor is DefaultExtractor

task_meta_extract = python_sdk_extractor(load_file_operator).extract()
assert task_meta_extract is None

task_meta = python_sdk_extractor(load_file_operator).extract_on_complete(task_instance=task_instance)
assert task_meta.name == f"adhoc_airflow.{task_id}"
assert task_meta.inputs[0].facets["input_file_facet"] == INPUT_STATS[0].facets["input_file_facet"]
assert task_meta.job_facets == {}
assert task_meta.run_facets == {}
Expand Down Expand Up @@ -156,12 +155,11 @@ def test_python_sdk_export_file_extract_on_complete():
)

task_instance = TaskInstance(task=export_file_operator)
python_sdk_extractor = Extractors().get_extractor_class(ExportToFileOperator)
python_sdk_extractor = ExtractorManager().get_extractor_class(ExportToFileOperator)
assert python_sdk_extractor is DefaultExtractor
task_meta_extract = python_sdk_extractor(export_file_operator).extract()
assert task_meta_extract is None
task_meta = python_sdk_extractor(export_file_operator).extract_on_complete(task_instance=task_instance)
assert task_meta.name == f"adhoc_airflow.{task_id}"
assert (
task_meta.inputs[0].facets["dataQualityMetrics"]
== INPUT_STATS_FOR_EXPORT_FILE[0].facets["dataQualityMetrics"]
Expand All @@ -179,7 +177,6 @@ def test_append_op_extract_on_complete():
"""
Test extractor ``extract_on_complete`` get called and collect lineage for append operator
"""
task_id = "append_table"

src_table_operator = LoadFileOperator(
task_id="load_file",
Expand All @@ -203,12 +200,11 @@ def test_append_op_extract_on_complete():
tzinfo = pendulum.timezone("UTC")
execution_date = timezone.datetime(2022, 1, 1, 1, 0, 0, tzinfo=tzinfo)
task_instance = TaskInstance(task=op, run_id=execution_date)
python_sdk_extractor = Extractors().get_extractor_class(AppendOperator)
python_sdk_extractor = ExtractorManager().get_extractor_class(AppendOperator)
assert python_sdk_extractor is DefaultExtractor
task_meta_extract = python_sdk_extractor(op).extract()
assert task_meta_extract is None
task_meta = python_sdk_extractor(op).extract_on_complete(task_instance=task_instance)
assert task_meta.name == f"adhoc_airflow.{task_id}"
assert task_meta.inputs[0].name == f"astronomer-dag-authoring.astronomer-dag-authoring.{src_table.name}"
assert task_meta.inputs[0].namespace == "bigquery"
assert task_meta.inputs[0].facets is not None
Expand All @@ -221,7 +217,6 @@ def test_merge_op_extract_on_complete():
"""
Test extractor ``extract_on_complete`` get called and collect lineage for merge operator
"""
task_id = "merge"
src_table_operator = LoadFileOperator(
task_id="load_file",
input_file=File(path="gs://astro-sdk/workspace/sample_pattern.csv", filetype=FileType.CSV),
Expand All @@ -246,13 +241,12 @@ def test_merge_op_extract_on_complete():
execution_date = timezone.datetime(2022, 1, 1, 1, 0, 0, tzinfo=tzinfo)
task_instance = TaskInstance(task=op, run_id=execution_date)

python_sdk_extractor = Extractors().get_extractor_class(MergeOperator)
python_sdk_extractor = ExtractorManager().get_extractor_class(MergeOperator)
assert python_sdk_extractor is DefaultExtractor
task_meta_extract = python_sdk_extractor(op).extract()
assert task_meta_extract is None
task_meta = python_sdk_extractor(op).extract_on_complete(task_instance=task_instance)

assert task_meta.name == f"adhoc_airflow.{task_id}"
assert task_meta.inputs[0].name == f"astronomer-dag-authoring.astro.{src_table.name}"
assert task_meta.inputs[0].namespace == "bigquery"
assert task_meta.inputs[0].facets is not None
Expand All @@ -277,7 +271,6 @@ def test_python_sdk_transform_extract_on_complete():
imdb_table = load_file.execute(context=create_context(load_file))

output_table = Table(name="test_name", conn_id="gcp_conn", metadata=Metadata(schema="astro"))
task_id = "top_five_animations"

@aql.transform
def top_five_animations(input_table: Table) -> str:
Expand All @@ -290,12 +283,11 @@ def top_five_animations(input_table: Table) -> str:
execution_date = timezone.datetime(2022, 1, 1, 1, 0, 0, tzinfo=tzinfo)
task_instance = TaskInstance(task=task.operator, run_id=execution_date)

python_sdk_extractor = Extractors().get_extractor_class(TransformOperator)
python_sdk_extractor = ExtractorManager().get_extractor_class(TransformOperator)
assert python_sdk_extractor is DefaultExtractor
task_meta_extract = python_sdk_extractor(task.operator).extract()
assert task_meta_extract is None
task_meta = python_sdk_extractor(task.operator).extract_on_complete(task_instance=task_instance)
assert task_meta.name == f"adhoc_airflow.{task_id}"
source_code = task_meta.job_facets.get("sourceCode")
# check for transform code return is present in source code facet.
validate_string = """return "SELECT title, rating FROM {{ input_table }} LIMIT 5;"""
Expand Down Expand Up @@ -343,12 +335,11 @@ def aggregate_data(df: pd.DataFrame):
tzinfo = pendulum.timezone("UTC")
execution_date = timezone.datetime(2022, 1, 1, 1, 0, 0, tzinfo=tzinfo)
task_instance = TaskInstance(task=task[0].operator, run_id=execution_date)
python_sdk_extractor = Extractors().get_extractor_class(DataframeOperator)
python_sdk_extractor = ExtractorManager().get_extractor_class(DataframeOperator)
assert python_sdk_extractor is DefaultExtractor
task_meta_extract = python_sdk_extractor(task[0].operator).extract()
assert task_meta_extract is None
task_meta = python_sdk_extractor(task[0].operator).extract_on_complete(task_instance=task_instance)
assert task_meta.name == "adhoc_airflow.aggregate_data"
assert task_meta.outputs[0].facets["schema"].fields[0].name == test_schema_name
assert task_meta.outputs[0].facets["schema"].fields[0].type == test_db_name
assert task_meta.outputs[0].facets["dataSource"].name == test_tbl_name
Expand Down

0 comments on commit 5c011cb

Please sign in to comment.