diff --git a/404.html b/404.html index ff8ddb5..d7c3d38 100644 --- a/404.html +++ b/404.html @@ -10,8 +10,9 @@ + - + @@ -19,14 +20,17 @@ - + - + + + + @@ -51,9 +55,7 @@ - - - + @@ -86,7 +88,9 @@ -
+ + +
+ + +
  • + + Tutorial + +
  • @@ -434,7 +493,6 @@ -

    prefect-great-expectations

    @@ -453,12 +511,13 @@

    prefect-great-expectations

    Welcome!

    -

    Prefect Collection containing integrations for interacting with Great Expectations

    +

    Prefect integration for interacting with Great Expectations.

    +

    Great Expectations is a Python library for data quality. It provides a framework to validate your state of data.

    Getting Started

    Python setup

    Requires an installation of Python 3.7+.

    -

    We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.

    -

    These tasks are designed to work with Prefect 2.0. For more information about how to use Prefect, please refer to the Prefect documentation.

    +

    We recommend using a Python virtual environment manager such as pipenv, conda, or virtualenv.

    +

    These tasks are designed to work with Prefect 2. For more information about how to use Prefect, please refer to the Prefect documentation.

    Installation

    Install prefect-great-expectations with pip:

    pip install prefect-great-expectations
    @@ -467,13 +526,14 @@ 

    Write and run a flow

    from prefect import flow
     from prefect_great_expectations import run_checkpoint_validation
     
    -
     @flow
     def example_flow():
        run_checkpoint_validation(checkpoint_name="my_checkpoint")
     
     example_flow()
     
    +

    Tutorial

    +

    For a larger example, check out the tutorial.

    Resources

    If you encounter any bugs while using prefect-great-expectations, feel free to open an issue in the prefect-great-expectations repository.

    If you have any questions or issues while using prefect-great-expectations, you can find help in either the Prefect Discourse forum or the Prefect Slack community.

    @@ -529,7 +589,7 @@

    Development

    - + @@ -540,7 +600,7 @@

    Development

    - + @@ -551,7 +611,7 @@

    Development

    - + @@ -562,7 +622,7 @@

    Development

    - + @@ -573,7 +633,7 @@

    Development

    - + @@ -584,7 +644,7 @@

    Development

    - + @@ -595,7 +655,7 @@

    Development

    - + @@ -606,7 +666,7 @@

    Development

    - + @@ -617,7 +677,7 @@

    Development

    - +
    @@ -631,10 +691,11 @@

    Development

    - + + - + diff --git a/search/search_index.json b/search/search_index.json index 15eb905..ab61788 100644 --- a/search/search_index.json +++ b/search/search_index.json @@ -1 +1 @@ -{"config":{"lang":["en"],"separator":"[\\s\\-]+","pipeline":["stopWordFilter"]},"docs":[{"location":"","title":"prefect-great-expectations","text":""},{"location":"#welcome","title":"Welcome!","text":"

    Prefect Collection containing integrations for interacting with Great Expectations

    "},{"location":"#getting-started","title":"Getting Started","text":""},{"location":"#python-setup","title":"Python setup","text":"

    Requires an installation of Python 3.7+.

    We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.

    These tasks are designed to work with Prefect 2.0. For more information about how to use Prefect, please refer to the Prefect documentation.

    "},{"location":"#installation","title":"Installation","text":"

    Install prefect-great-expectations with pip:

    pip install prefect-great-expectations\n
    "},{"location":"#write-and-run-a-flow","title":"Write and run a flow","text":"
    from prefect import flow\nfrom prefect_great_expectations import run_checkpoint_validation\n\n\n@flow\ndef example_flow():\n   run_checkpoint_validation(checkpoint_name=\"my_checkpoint\")\n\nexample_flow()\n
    "},{"location":"#resources","title":"Resources","text":"

    If you encounter any bugs while using prefect-great-expectations, feel free to open an issue in the prefect-great-expectations repository.

    If you have any questions or issues while using prefect-great-expectations, you can find help in either the Prefect Discourse forum or the Prefect Slack community.

    Feel free to \u2b50\ufe0f or watch prefect-great-expectations for updates too!

    "},{"location":"#development","title":"Development","text":"

    If you'd like to install a version of prefect-great-expectations for development, clone the repository and perform an editable install with pip:

    git clone https://github.com/PrefectHQ/prefect-great-expectations.git\n\ncd prefect-great-expectations/\n\npip install -e \".[dev]\"\n\n# Install linting pre-commit hooks\npre-commit install\n
    "},{"location":"validation/","title":"Validation","text":""},{"location":"validation/#prefect_great_expectations.validation","title":"prefect_great_expectations.validation","text":"

    Tasks for performing Great Expectations validations

    "},{"location":"validation/#prefect_great_expectations.validation.GreatExpectationValidationError","title":"GreatExpectationValidationError","text":"

    Signals that a task failed due to a failed Great Expectations validation.

    Parameters:

    Name Type Description Default result CheckpointResult

    A CheckpointResult containing details of the failed validation.

    required Source code in prefect_great_expectations/validation.py
    class GreatExpectationValidationError(Exception):\n\"\"\"\n    Signals that a task failed due to a failed Great\n    Expectations validation.\n\n    Args:\n        result: A CheckpointResult containing details\n            of the failed validation.\n    \"\"\"\n\n    def __init__(self, result: CheckpointResult):\n        self.result = result\n        super().__init__(\n            \"Great Expectations Validation failed. \"\n            \"Check result on this exception for more details.\"\n        )\n
    "},{"location":"validation/#prefect_great_expectations.validation.run_checkpoint_validation","title":"run_checkpoint_validation","text":"

    Task that performs a Great Expectations validation based on the provided checkpoint and data context.

    Parameters:

    Name Type Description Default run_name Optional[str]

    The name of the Great Expectations validation run. Defaults to timestamp if not provided.

    None checkpoint_name Optional[str]

    The name of the Checkpoint to use for validation.

    None checkpoint Optional[Checkpoint]

    A Checkpoint object to use for validation. Overrides checkpoint_name if both are provided.

    None checkpoint_kwargs Optional[Dict]

    A dictionary with values used to provide configuration to the task's Checkpint at runtime. Keys should match the parameters of CheckpointConfig.

    None data_context_root_dir Optional[Union[str, Path]]

    Path to the great_expectations directory.

    None data_context Optional[DataContext]

    A DataContext object to use during validation. Overrides data_context_root_dir if both are provided.

    None runtime_environment Optional[Dict]

    A dictionary with values to overwrite config in great_expectations.yml at run time.

    None raise_on_validation_failure bool

    If True, the task will raise a GreatExpectationValidationError when validation fails. If False, the task will return the result of the validation.

    True

    Raises:

    Type Description GreatExpectationValidationError

    Signals that a GE validation failed. Details of the failure can be found by inspecting the result attribute of the exception.

    Returns:

    Name Type Description CheckpointResult

    Detailed result of the validation run in the task.

    Examples:

    Run a validation with a checkpoint named 'my_checkpoint':

    from prefect import flow\nfrom prefect_great_expectations import run_checkpoint_validation\n\n\n@flow\ndef example_flow():\n    run_checkpoint_validation(checkpoint_name=\"my_checkpoint\")\n\nexample_flow()\n

    Run a validation with a custom path to the data context:

    from prefect import flow\nfrom prefect_great_expectations import run_checkpoint_validation\n\n\n@flow\ndef example_flow():\n    run_checkpoint_validation(\n        checkpoint_name=\"my_checkpoint\",\n        data_context_root_dir=\"my_data_context/\"\n    )\n\nexample_flow()\n
    Source code in prefect_great_expectations/validation.py
    @task\ndef run_checkpoint_validation(\n    run_name: Optional[str] = None,\n    checkpoint_name: Optional[str] = None,\n    checkpoint: Optional[Checkpoint] = None,\n    checkpoint_kwargs: Optional[Dict] = None,\n    data_context_root_dir: Optional[Union[str, Path]] = None,\n    data_context: Optional[DataContext] = None,\n    runtime_environment: Optional[Dict] = None,\n    raise_on_validation_failure: bool = True,\n):\n\"\"\"\n    Task that performs a Great Expectations validation based on the provided checkpoint\n        and data context.\n\n    Args:\n        run_name: The name of the Great Expectations validation run. Defaults to\n            timestamp if not provided.\n        checkpoint_name: The name of the Checkpoint to use for validation.\n        checkpoint: A Checkpoint object to use for validation. Overrides\n            `checkpoint_name` if both are provided.\n        checkpoint_kwargs: A dictionary with values used to provide configuration to\n            the task's Checkpint at runtime. Keys should match the parameters of\n            `CheckpointConfig`.\n        data_context_root_dir: Path to the great_expectations directory.\n        data_context: A DataContext object to use during validation. Overrides\n            `data_context_root_dir` if both are provided.\n        runtime_environment: A dictionary with values to overwrite config in\n            `great_expectations.yml` at run time.\n        raise_on_validation_failure: If `True`, the task will raise a\n            GreatExpectationValidationError when validation fails. If `False`,\n            the task will return the result of the validation.\n\n    Raises:\n        GreatExpectationValidationError: Signals that a GE validation failed.\n            Details of the failure can be found by inspecting the `result`\n            attribute of the exception.\n\n    Returns:\n        CheckpointResult: Detailed result of the validation run in the task.\n\n    Examples:\n        Run a validation with a checkpoint named 'my_checkpoint':\n\n        ```python\n        from prefect import flow\n        from prefect_great_expectations import run_checkpoint_validation\n\n\n        @flow\n        def example_flow():\n            run_checkpoint_validation(checkpoint_name=\"my_checkpoint\")\n\n        example_flow()\n        ```\n\n        Run a validation with a custom path to the data context:\n\n        ```python\n        from prefect import flow\n        from prefect_great_expectations import run_checkpoint_validation\n\n\n        @flow\n        def example_flow():\n            run_checkpoint_validation(\n                checkpoint_name=\"my_checkpoint\",\n                data_context_root_dir=\"my_data_context/\"\n            )\n\n        example_flow()\n        ```\n    \"\"\"\n    logger = get_run_logger()\n\n    logger.info(\"Running Great Expectations validation...\")\n\n    runtime_environment = runtime_environment or {}\n    checkpoint_kwargs = checkpoint_kwargs or {}\n\n    data_context_root_dir = (\n        str(data_context_root_dir) if data_context_root_dir else None\n    )\n\n    if data_context:\n        logger.debug(\"Using provided GE Data Context\")\n    else:\n        logger.debug(\"Loading GE Data Context from %s\", data_context_root_dir)\n        data_context = DataContext(\n            context_root_dir=data_context_root_dir,\n            runtime_environment=runtime_environment,\n        )\n\n    if checkpoint:\n        logger.debug(\"Using provided GE Checkpoint\")\n    else:\n        logger.debug(\"Loading GE Checkpoint with name %s\", checkpoint_name)\n        checkpoint = data_context.get_checkpoint(checkpoint_name)\n\n    result = checkpoint.run(run_name=run_name, **checkpoint_kwargs)\n\n    if not result.success:\n        logger.warn(\n            \"Great Expectations validation run %s failed\", result.run_id.run_name\n        )\n        if raise_on_validation_failure:\n            raise GreatExpectationValidationError(result)\n    else:\n        logger.info(\n            \"Great Expectations validation run %s succeeded\", result.run_id.run_name\n        )\n\n    return result\n
    "}]} \ No newline at end of file +{"config":{"lang":["en"],"separator":"[\\s\\-]+","pipeline":["stopWordFilter"]},"docs":[{"location":"","title":"prefect-great-expectations","text":""},{"location":"#welcome","title":"Welcome!","text":"

    Prefect integration for interacting with Great Expectations.

    Great Expectations is a Python library for data quality. It provides a framework to validate your state of data.

    "},{"location":"#getting-started","title":"Getting Started","text":""},{"location":"#python-setup","title":"Python setup","text":"

    Requires an installation of Python 3.7+.

    We recommend using a Python virtual environment manager such as pipenv, conda, or virtualenv.

    These tasks are designed to work with Prefect 2. For more information about how to use Prefect, please refer to the Prefect documentation.

    "},{"location":"#installation","title":"Installation","text":"

    Install prefect-great-expectations with pip:

    pip install prefect-great-expectations\n
    "},{"location":"#write-and-run-a-flow","title":"Write and run a flow","text":"
    from prefect import flow\nfrom prefect_great_expectations import run_checkpoint_validation\n\n@flow\ndef example_flow():\n   run_checkpoint_validation(checkpoint_name=\"my_checkpoint\")\n\nexample_flow()\n
    "},{"location":"#tutorial","title":"Tutorial","text":"

    For a larger example, check out the tutorial.

    "},{"location":"#resources","title":"Resources","text":"

    If you encounter any bugs while using prefect-great-expectations, feel free to open an issue in the prefect-great-expectations repository.

    If you have any questions or issues while using prefect-great-expectations, you can find help in either the Prefect Discourse forum or the Prefect Slack community.

    Feel free to \u2b50\ufe0f or watch prefect-great-expectations for updates too!

    "},{"location":"#development","title":"Development","text":"

    If you'd like to install a version of prefect-great-expectations for development, clone the repository and perform an editable install with pip:

    git clone https://github.com/PrefectHQ/prefect-great-expectations.git\n\ncd prefect-great-expectations/\n\npip install -e \".[dev]\"\n\n# Install linting pre-commit hooks\npre-commit install\n
    "},{"location":"tutorial/","title":"How to Use Great Expectations with Prefect","text":"

    This guide will help you use Great Expectations with Prefect.

    Prefect is a workflow orchestration and observation platform that enables data engineers, ML engineers, and data scientists to stop wondering about their workflows. The Prefect open source library allows users to create workflows using Python and add retries, logging, caching, scheduling, failure notifications, and much more. Prefect Cloud offers all that goodness plus a hosted platform, automations, and enterprise features for users who need them. Prefect Cloud provides free and paid tiers.

    Prefect can be used with Great Expectations validations so that you can be confident about the state of your data. With a Prefect deployment, you can productionize your workflow and run data quality checks in reaction to the arrival of new data or on a schedule.

    "},{"location":"tutorial/#doing-it","title":"Doing it","text":""},{"location":"tutorial/#install","title":"Install","text":"

    Install the Great Expectations, Prefect, and prefect-great-expectations libraries into the same Python virtual environment.

    pip install great_expectations prefect prefect_great_expectations\n

    If you have any issues installing Prefect, check out the Prefect installation docs.

    "},{"location":"tutorial/#create-an-expectation-suite-and-checkpoint","title":"Create an Expectation Suite and Checkpoint","text":"

    Here's an example of a script to create an Expectation Suite and Checkpoint. This script is based on the Great Expectations Quickstart.

    import great_expectations as gx\n\ndef create_expectation_suite_and_checkpoint():\n    \"\"\"Create a DataContext, connect to data, create Expectations, create and return a checkpoint.\"\"\"\n\n    context = gx.get_context()\n\n    validator = context.sources.pandas_default.read_csv(\n        \"https://raw.githubusercontent.com/great-expectations/gx_tutorials/main/data/yellow_tripdata_sample_2019-01.csv\"\n    )\n    validator.expect_column_values_to_not_be_null(\"pickup_datetime\")\n\n    # this expectation will fail\n    validator.expect_column_values_to_be_between(\n        \"passenger_count\", min_value=1, max_value=5\n    )\n\n    # checkpoints are reusble and only need to be created once\n    checkpoint = gx.checkpoint.SimpleCheckpoint(\n        name=\"taxi_check\",\n        data_context=context,\n        validator=validator,\n    )\n\n    return checkpoint\n
    "},{"location":"tutorial/#create-a-prefect-flow","title":"Create a Prefect flow","text":"

    Like Great Expectations, Prefect is a Pythonic framework. In Prefect, you bring your Python code and sprinkle in task and flow decorators to gain observation and orchestration capabilities.

    Let's add a second function that we'll decorate with a Prefect flow decorator. Our flow function uses the run_checkpoint_validation task from the prefect_great_expectations library. This prebuilt function is a Prefect task that runs a Great Expectations validation. The run_checkpoint_validation can take a Great Expectations checkpoint as an argument.

    from prefect import flow\nfrom prefect_great_expectations import run_checkpoint_validation\n\n@flow\ndef validation_flow(checkpoint):\n    \"\"\"Creates a task that validates a run of a Great Expectations checkpoint\"\"\"\n    res = run_checkpoint_validation(checkpoint=checkpoint)\n    return \n

    Finally in our script, let's call our functions.

    if __name__ == \"__main__\":\n    checkpoint = create_expectation_suite_and_checkpoint()\n    validation_flow(checkpoint=checkpoint)\n

    Note that the second expectation will fail because the passenger_count column has some 6 values in the data. That's intentional so that we can see a failure example. Here's the output in our terminal window.

    18:00:41.816 | INFO    | prefect.engine - Created flow run 'unyielding-husky' for flow 'validation-flow'\n18:00:43.847 | INFO    | Flow run 'unyielding-husky' - Created task run 'run_checkpoint_validation-0' for task 'run_checkpoint_validation'\n18:00:43.849 | INFO    | Flow run 'unyielding-husky' - Executing 'run_checkpoint_validation-0' immediately...\n18:00:44.786 | INFO    | Task run 'run_checkpoint_validation-0' - Running Great Expectations validation...\n...\n18:00:45.057 | ERROR   | Task run 'run_checkpoint_validation-0' - Encountered exception during execution:\n...\n    raise GreatExpectationValidationError(result)\nprefect_great_expectations.validation.GreatExpectationValidationError: Great Expectations Validation failed. Check result on this exception for more details.\n18:00:46.423 | ERROR   | Task run 'run_checkpoint_validation-0' - Finished in state Failed('Task run encountered an exception: prefect_great_expectations.validation.GreatExpectationValidationError: Great Expectations Validation failed. Check result on this exception for more details.\\n')\n18:00:46.424 | ERROR   | Flow run 'unyielding-husky' - Encountered exception during execution:\n18:00:46.916 | ERROR   | Flow run 'unyielding-husky' - Finished in state Failed('Flow run encountered an exception. prefect_great_expectations.validation.GreatExpectationValidationError: Great Expectations Validation failed...\n
    "},{"location":"tutorial/#avoid-raising-an-exception-on-validation-failure","title":"Avoid raising an exception on validation failure","text":"

    If we want to avoid raising an exception when the validation fails, we can set the raise_on_result argument to False in the run_checkpoint_validation task.

    @flow\ndef validation_flow(checkpoint):\n    \"\"\"Creates a task that validates a run of a Great Expectations checkpoint\"\"\"\n    res = run_checkpoint_validation(\n        checkpoint=checkpoint, raise_on_validation_failure=False\n    )\n    return\n

    Now when we run our script we don't get an exception.

    18:06:03.007 | INFO    | prefect.engine - Created flow run 'affable-malamute' for flow 'validation-flow'\n18:06:03.624 | INFO    | Flow run 'affable-malamute' - Created task run 'run_checkpoint_validation-0' for task 'run_checkpoint_validation'\n18:06:03.626 | INFO    | Flow run 'affable-malamute' - Executing 'run_checkpoint_validation-0' immediately...\n18:06:03.880 | INFO    | Task run 'run_checkpoint_validation-0' - Running Great Expectations validation...\n...\n18:06:04.138 | WARNING | Task run 'run_checkpoint_validation-0' - Great Expectations validation run  failed\n18:06:04.298 | INFO    | Task run 'run_checkpoint_validation-0' - Finished in state Completed()\n18:06:04.401 | INFO    | Flow run 'affable-malamute' - Finished in state Completed('All states completed.')\n

    For more information about the run_checkpoint_validation task, refer to the prefect-great-expectations documentation.

    "},{"location":"tutorial/#log-prints-for-more-information","title":"Log prints for more information","text":"

    In the example above, we don't see all the relevant info for our validation failure. Let's print information about our validation results and log that information by passing log_prints=True to the flow decorator.

    @flow(log_prints=True)\ndef validation_flow(checkpoint):\n    \"\"\"Creates a task that validates a run of a Great Expectations checkpoint\"\"\"\n    res = run_checkpoint_validation(\n        checkpoint=checkpoint, raise_on_validation_failure=False\n    )\n    print(res)\n    return\n

    Now we can see lots of relevant information in our terminal window, including the following.

    ...\n \"partial_unexpected_counts\": [\n    {\n        \"value\": 6,\n        \"count\": 20\n    } \n...\n

    Looks like we have 20 rows with a 6 in the passenger_count column.

    "},{"location":"tutorial/#add-artifacts","title":"Add artifacts","text":"

    If we fire up a locally hosted Prefect server or log in to our Prefect Cloud account, we can see the same information in the Prefect UI. In addtion, if we log in to Prefect Cloud we can create an artifact to share with our Prefect workspace collaborators. Let's do that now.

    1. Head over to https://app.prefect.cloud/ and sign up for a free account or log in to your existing account.
    2. Authenticate your command line client with prefect cloud login.
    3. Create an artifact to share your Great Expectations validation results with your collaborators.

    Prefect artifacts will persist the validation results from a flow run and display them in the UI. Let's create a Markdown artifact with the validation results.

    from prefect.artifacts import create_markdown_artifact\n\n@flow(log_prints=True)\ndef validation_flow(checkpoint):\n    \"\"\"Creates a task that validates a run of a Great Expectations checkpoint\"\"\"\n    res = run_checkpoint_validation(\n        checkpoint=checkpoint, raise_on_validation_failure=False\n    )\n\n    create_markdown_artifact(\n        f\"\"\"# Result of Great Expectations validation run\n\n        {res}\n        \"\"\",\n        description=\"GX validation for Taxi Data\",\n        key=\"green-taxi-data\",\n    )\n\n    return\n

    The UI gives you lots of visibilty into the state of your flow runs.

    Your artifact displays validation results for human consumption.

    Alternatively, you could share a link to your Great Expectations Data Docs in an artifact.

    "},{"location":"tutorial/#wrap","title":"Wrap","text":"

    You've seen how to use Prefect with Great Expectations.

    "},{"location":"tutorial/#where-to-go-from-here","title":"Where to go from here","text":"

    Prefect deployments allow you to run your flow in response to events such as the arrival of new data. You can also run on many types of schedules and on the infrastructure of your choice.

    There's lots more to explore for additional observability and orchestration with Prefect.

    Happy engineering!

    "},{"location":"validation/","title":"Validation","text":"

    Tasks for performing Great Expectations validations

    "},{"location":"validation/#prefect_great_expectations.validation.GreatExpectationValidationError","title":"GreatExpectationValidationError","text":"

    Bases: Exception

    Signals that a task failed due to a failed Great Expectations validation.

    Parameters:

    Name Type Description Default result CheckpointResult

    A CheckpointResult containing details of the failed validation.

    required Source code in prefect_great_expectations/validation.py
    class GreatExpectationValidationError(Exception):\n    \"\"\"\n    Signals that a task failed due to a failed Great\n    Expectations validation.\n\n    Args:\n        result: A CheckpointResult containing details\n            of the failed validation.\n    \"\"\"\n\n    def __init__(self, result: CheckpointResult):\n        self.result = result\n        super().__init__(\n            \"Great Expectations Validation failed. \"\n            \"Check result on this exception for more details.\"\n        )\n
    "},{"location":"validation/#prefect_great_expectations.validation.run_checkpoint_validation","title":"run_checkpoint_validation(run_name=None, checkpoint_name=None, checkpoint=None, checkpoint_kwargs=None, data_context_root_dir=None, data_context=None, runtime_environment=None, raise_on_validation_failure=True)","text":"

    Task that performs a Great Expectations validation based on the provided checkpoint and data context.

    Parameters:

    Name Type Description Default run_name Optional[str]

    The name of the Great Expectations validation run. Defaults to timestamp if not provided.

    None checkpoint_name Optional[str]

    The name of the Checkpoint to use for validation.

    None checkpoint Optional[Checkpoint]

    A Checkpoint object to use for validation. Overrides checkpoint_name if both are provided.

    None checkpoint_kwargs Optional[Dict]

    A dictionary with values used to provide configuration to the task's Checkpint at runtime. Keys should match the parameters of CheckpointConfig.

    None data_context_root_dir Optional[Union[str, Path]]

    Path to the great_expectations directory.

    None data_context Optional[DataContext]

    A DataContext object to use during validation. Overrides data_context_root_dir if both are provided.

    None runtime_environment Optional[Dict]

    A dictionary with values to overwrite config in great_expectations.yml at run time.

    None raise_on_validation_failure bool

    If True, the task will raise a GreatExpectationValidationError when validation fails. If False, the task will return the result of the validation.

    True

    Raises:

    Type Description GreatExpectationValidationError

    Signals that a GE validation failed. Details of the failure can be found by inspecting the result attribute of the exception.

    Returns:

    Name Type Description CheckpointResult

    Detailed result of the validation run in the task.

    Examples:

    Run a validation with a checkpoint named 'my_checkpoint':

    from prefect import flow\nfrom prefect_great_expectations import run_checkpoint_validation\n\n\n@flow\ndef example_flow():\n    run_checkpoint_validation(checkpoint_name=\"my_checkpoint\")\n\nexample_flow()\n

    Run a validation with a custom path to the data context:

    from prefect import flow\nfrom prefect_great_expectations import run_checkpoint_validation\n\n\n@flow\ndef example_flow():\n    run_checkpoint_validation(\n        checkpoint_name=\"my_checkpoint\",\n        data_context_root_dir=\"my_data_context/\"\n    )\n\nexample_flow()\n
    Source code in prefect_great_expectations/validation.py
    @task\ndef run_checkpoint_validation(\n    run_name: Optional[str] = None,\n    checkpoint_name: Optional[str] = None,\n    checkpoint: Optional[Checkpoint] = None,\n    checkpoint_kwargs: Optional[Dict] = None,\n    data_context_root_dir: Optional[Union[str, Path]] = None,\n    data_context: Optional[DataContext] = None,\n    runtime_environment: Optional[Dict] = None,\n    raise_on_validation_failure: bool = True,\n):\n    \"\"\"\n    Task that performs a Great Expectations validation based on the provided checkpoint\n        and data context.\n\n    Args:\n        run_name: The name of the Great Expectations validation run. Defaults to\n            timestamp if not provided.\n        checkpoint_name: The name of the Checkpoint to use for validation.\n        checkpoint: A Checkpoint object to use for validation. Overrides\n            `checkpoint_name` if both are provided.\n        checkpoint_kwargs: A dictionary with values used to provide configuration to\n            the task's Checkpint at runtime. Keys should match the parameters of\n            `CheckpointConfig`.\n        data_context_root_dir: Path to the great_expectations directory.\n        data_context: A DataContext object to use during validation. Overrides\n            `data_context_root_dir` if both are provided.\n        runtime_environment: A dictionary with values to overwrite config in\n            `great_expectations.yml` at run time.\n        raise_on_validation_failure: If `True`, the task will raise a\n            GreatExpectationValidationError when validation fails. If `False`,\n            the task will return the result of the validation.\n\n    Raises:\n        GreatExpectationValidationError: Signals that a GE validation failed.\n            Details of the failure can be found by inspecting the `result`\n            attribute of the exception.\n\n    Returns:\n        CheckpointResult: Detailed result of the validation run in the task.\n\n    Examples:\n        Run a validation with a checkpoint named 'my_checkpoint':\n\n        ```python\n        from prefect import flow\n        from prefect_great_expectations import run_checkpoint_validation\n\n\n        @flow\n        def example_flow():\n            run_checkpoint_validation(checkpoint_name=\"my_checkpoint\")\n\n        example_flow()\n        ```\n\n        Run a validation with a custom path to the data context:\n\n        ```python\n        from prefect import flow\n        from prefect_great_expectations import run_checkpoint_validation\n\n\n        @flow\n        def example_flow():\n            run_checkpoint_validation(\n                checkpoint_name=\"my_checkpoint\",\n                data_context_root_dir=\"my_data_context/\"\n            )\n\n        example_flow()\n        ```\n    \"\"\"\n    logger = get_run_logger()\n\n    logger.info(\"Running Great Expectations validation...\")\n\n    runtime_environment = runtime_environment or {}\n    checkpoint_kwargs = checkpoint_kwargs or {}\n\n    data_context_root_dir = (\n        str(data_context_root_dir) if data_context_root_dir else None\n    )\n\n    if data_context:\n        logger.debug(\"Using provided GE Data Context\")\n    else:\n        logger.debug(\"Loading GE Data Context from %s\", data_context_root_dir)\n        data_context = DataContext(\n            context_root_dir=data_context_root_dir,\n            runtime_environment=runtime_environment,\n        )\n\n    if checkpoint:\n        logger.debug(\"Using provided GE Checkpoint\")\n    else:\n        logger.debug(\"Loading GE Checkpoint with name %s\", checkpoint_name)\n        checkpoint = data_context.get_checkpoint(checkpoint_name)\n\n    result = checkpoint.run(run_name=run_name, **checkpoint_kwargs)\n\n    if not result.success:\n        logger.warning(\n            \"Great Expectations validation run %s failed\",\n            result.run_id.run_name if result.run_id.run_name else \"\",\n        )\n        if raise_on_validation_failure:\n            raise GreatExpectationValidationError(result)\n    else:\n        logger.info(\n            \"Great Expectations validation run %s succeeded\",\n            result.run_id.run_name if result.run_id.run_name else \"\",\n        )\n\n    return result\n
    "}]} \ No newline at end of file diff --git a/sitemap.xml b/sitemap.xml index 7856695..eac3848 100644 --- a/sitemap.xml +++ b/sitemap.xml @@ -2,12 +2,17 @@ https://prefecthq.github.io/prefect-great-expectations/ - 2023-02-07 + 2023-10-05 + daily + + + https://prefecthq.github.io/prefect-great-expectations/tutorial/ + 2023-10-05 daily https://prefecthq.github.io/prefect-great-expectations/validation/ - 2023-02-07 + 2023-10-05 daily \ No newline at end of file diff --git a/sitemap.xml.gz b/sitemap.xml.gz index bceb58c..6d204e4 100644 Binary files a/sitemap.xml.gz and b/sitemap.xml.gz differ diff --git a/tutorial/index.html b/tutorial/index.html new file mode 100644 index 0000000..943ab9a --- /dev/null +++ b/tutorial/index.html @@ -0,0 +1,824 @@ + + + + + + + + + + + + + + + + + + + + + + + + + Tutorial - prefect-great-expectations + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
    + + + + Skip to content + + +
    +
    + +
    + + + + + + +
    + + +
    + +
    + + + + + + +
    +
    + + + +
    +
    +
    + + + + + +
    +
    +
    + + + +
    +
    + +
    +
    + + + +
    +
    + + + + + + + +

    How to Use Great Expectations with Prefect

    +

    This guide will help you use Great Expectations with Prefect.

    +

    Prefect is a workflow orchestration and observation platform that enables data engineers, ML engineers, and data scientists to stop wondering about their workflows. The Prefect open source library allows users to create workflows using Python and add retries, logging, caching, scheduling, failure notifications, and much more. Prefect Cloud offers all that goodness plus a hosted platform, automations, and enterprise features for users who need them. Prefect Cloud provides free and paid tiers.

    +

    Prefect can be used with Great Expectations validations so that you can be confident about the state of your data. With a Prefect deployment, you can productionize your workflow and run data quality checks in reaction to the arrival of new data or on a schedule.

    +

    Doing it

    +

    Install

    +

    Install the Great Expectations, Prefect, and prefect-great-expectations libraries into the same Python virtual environment.

    +
    pip install great_expectations prefect prefect_great_expectations
    +
    +

    If you have any issues installing Prefect, check out the Prefect installation docs.

    +

    Create an Expectation Suite and Checkpoint

    +

    Here's an example of a script to create an Expectation Suite and Checkpoint. This script is based on the Great Expectations Quickstart.

    +
    import great_expectations as gx
    +
    +def create_expectation_suite_and_checkpoint():
    +    """Create a DataContext, connect to data, create Expectations, create and return a checkpoint."""
    +
    +    context = gx.get_context()
    +
    +    validator = context.sources.pandas_default.read_csv(
    +        "https://raw.githubusercontent.com/great-expectations/gx_tutorials/main/data/yellow_tripdata_sample_2019-01.csv"
    +    )
    +    validator.expect_column_values_to_not_be_null("pickup_datetime")
    +
    +    # this expectation will fail
    +    validator.expect_column_values_to_be_between(
    +        "passenger_count", min_value=1, max_value=5
    +    )
    +
    +    # checkpoints are reusble and only need to be created once
    +    checkpoint = gx.checkpoint.SimpleCheckpoint(
    +        name="taxi_check",
    +        data_context=context,
    +        validator=validator,
    +    )
    +
    +    return checkpoint
    +
    +

    Create a Prefect flow

    +

    Like Great Expectations, Prefect is a Pythonic framework. In Prefect, you bring your Python code and sprinkle in task and flow decorators to gain observation and orchestration capabilities.

    +

    Let's add a second function that we'll decorate with a Prefect flow decorator. Our flow function uses the run_checkpoint_validation task from the prefect_great_expectations library. This prebuilt function is a Prefect task that runs a Great Expectations validation. The run_checkpoint_validation can take a Great Expectations checkpoint as an argument.

    +
    from prefect import flow
    +from prefect_great_expectations import run_checkpoint_validation
    +
    +@flow
    +def validation_flow(checkpoint):
    +    """Creates a task that validates a run of a Great Expectations checkpoint"""
    +    res = run_checkpoint_validation(checkpoint=checkpoint)
    +    return 
    +
    +

    Finally in our script, let's call our functions.

    +
    if __name__ == "__main__":
    +    checkpoint = create_expectation_suite_and_checkpoint()
    +    validation_flow(checkpoint=checkpoint)
    +
    +

    Note that the second expectation will fail because the passenger_count column has some 6 values in the data. That's intentional so that we can see a failure example. Here's the output in our terminal window.

    +
    18:00:41.816 | INFO    | prefect.engine - Created flow run 'unyielding-husky' for flow 'validation-flow'
    +18:00:43.847 | INFO    | Flow run 'unyielding-husky' - Created task run 'run_checkpoint_validation-0' for task 'run_checkpoint_validation'
    +18:00:43.849 | INFO    | Flow run 'unyielding-husky' - Executing 'run_checkpoint_validation-0' immediately...
    +18:00:44.786 | INFO    | Task run 'run_checkpoint_validation-0' - Running Great Expectations validation...
    +...
    +18:00:45.057 | ERROR   | Task run 'run_checkpoint_validation-0' - Encountered exception during execution:
    +...
    +    raise GreatExpectationValidationError(result)
    +prefect_great_expectations.validation.GreatExpectationValidationError: Great Expectations Validation failed. Check result on this exception for more details.
    +18:00:46.423 | ERROR   | Task run 'run_checkpoint_validation-0' - Finished in state Failed('Task run encountered an exception: prefect_great_expectations.validation.GreatExpectationValidationError: Great Expectations Validation failed. Check result on this exception for more details.\n')
    +18:00:46.424 | ERROR   | Flow run 'unyielding-husky' - Encountered exception during execution:
    +18:00:46.916 | ERROR   | Flow run 'unyielding-husky' - Finished in state Failed('Flow run encountered an exception. prefect_great_expectations.validation.GreatExpectationValidationError: Great Expectations Validation failed...
    +
    +

    Avoid raising an exception on validation failure

    +

    If we want to avoid raising an exception when the validation fails, we can set the raise_on_result argument to False in the run_checkpoint_validation task.

    +
    @flow
    +def validation_flow(checkpoint):
    +    """Creates a task that validates a run of a Great Expectations checkpoint"""
    +    res = run_checkpoint_validation(
    +        checkpoint=checkpoint, raise_on_validation_failure=False
    +    )
    +    return
    +
    +

    Now when we run our script we don't get an exception.

    +
    18:06:03.007 | INFO    | prefect.engine - Created flow run 'affable-malamute' for flow 'validation-flow'
    +18:06:03.624 | INFO    | Flow run 'affable-malamute' - Created task run 'run_checkpoint_validation-0' for task 'run_checkpoint_validation'
    +18:06:03.626 | INFO    | Flow run 'affable-malamute' - Executing 'run_checkpoint_validation-0' immediately...
    +18:06:03.880 | INFO    | Task run 'run_checkpoint_validation-0' - Running Great Expectations validation...
    +...
    +18:06:04.138 | WARNING | Task run 'run_checkpoint_validation-0' - Great Expectations validation run  failed
    +18:06:04.298 | INFO    | Task run 'run_checkpoint_validation-0' - Finished in state Completed()
    +18:06:04.401 | INFO    | Flow run 'affable-malamute' - Finished in state Completed('All states completed.')
    +
    +

    For more information about the run_checkpoint_validation task, refer to the prefect-great-expectations documentation.

    +

    Log prints for more information

    +

    In the example above, we don't see all the relevant info for our validation failure. Let's print information about our validation results and log that information by passing log_prints=True to the flow decorator.

    +
    @flow(log_prints=True)
    +def validation_flow(checkpoint):
    +    """Creates a task that validates a run of a Great Expectations checkpoint"""
    +    res = run_checkpoint_validation(
    +        checkpoint=checkpoint, raise_on_validation_failure=False
    +    )
    +    print(res)
    +    return
    +
    +

    Now we can see lots of relevant information in our terminal window, including the following.

    +
    ...
    + "partial_unexpected_counts": [
    +    {
    +        "value": 6,
    +        "count": 20
    +    } 
    +...
    +
    +

    Looks like we have 20 rows with a 6 in the passenger_count column.

    +

    Add artifacts

    +

    If we fire up a locally hosted Prefect server or log in to our Prefect Cloud account, we can see the same information in the Prefect UI. In addtion, if we log in to Prefect Cloud we can create an artifact to share with our Prefect workspace collaborators. Let's do that now.

    +
      +
    1. Head over to https://app.prefect.cloud/ and sign up for a free account or log in to your existing account.
    2. +
    3. Authenticate your command line client with prefect cloud login.
    4. +
    5. Create an artifact to share your Great Expectations validation results with your collaborators.
    6. +
    +

    Prefect artifacts will persist the validation results from a flow run and display them in the UI. Let's create a Markdown artifact with the validation results.

    +
    from prefect.artifacts import create_markdown_artifact
    +
    +@flow(log_prints=True)
    +def validation_flow(checkpoint):
    +    """Creates a task that validates a run of a Great Expectations checkpoint"""
    +    res = run_checkpoint_validation(
    +        checkpoint=checkpoint, raise_on_validation_failure=False
    +    )
    +
    +    create_markdown_artifact(
    +        f"""# Result of Great Expectations validation run
    +
    +        {res}
    +        """,
    +        description="GX validation for Taxi Data",
    +        key="green-taxi-data",
    +    )
    +
    +    return
    +
    +

    The UI gives you lots of visibilty into the state of your flow runs.

    +

    Screenshot of flow run with logs in Prefect UI

    +

    Your artifact displays validation results for human consumption.

    +

    Screenshot of artifact in Prefect UI

    +

    Alternatively, you could share a link to your Great Expectations Data Docs in an artifact.

    +

    Wrap

    +

    You've seen how to use Prefect with Great Expectations.

    +

    Where to go from here

    +

    Prefect deployments allow you to run your flow in response to events such as the arrival of new data. You can also run on many types of schedules and on the infrastructure of your choice.

    +

    There's lots more to explore for additional observability and orchestration with Prefect.

    +

    Happy engineering!

    + + + + + + + + +
    +
    + + +
    + +
    + + + +
    +
    +
    +
    + + + + + + + + + + \ No newline at end of file diff --git a/validation/index.html b/validation/index.html index 0055fb6..f1d7cf7 100644 --- a/validation/index.html +++ b/validation/index.html @@ -11,11 +11,12 @@ - + + - + @@ -23,15 +24,18 @@ - + - + + + + @@ -55,9 +59,7 @@ - - - + @@ -95,7 +97,9 @@ -
    + + +