Skip to content

Commit

Permalink
implements gx error catching
Browse files Browse the repository at this point in the history
  • Loading branch information
BWMac committed Mar 5, 2024
1 parent b130250 commit 355ad6b
Show file tree
Hide file tree
Showing 3 changed files with 224 additions and 244 deletions.
29 changes: 19 additions & 10 deletions src/agoradatatools/gx.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import pandas as pd

from agoradatatools.errors import ADTDataValidationError

import great_expectations as gx
from great_expectations.checkpoint.types.checkpoint_result import CheckpointResult
from synapseclient import Activity, File, Synapse
Expand Down Expand Up @@ -107,16 +109,14 @@ def _upload_results_file_to_synapse(self, results_path: str) -> None:
),
)

def get_failed_expectations(
self, checkpoint_result: CheckpointResult
) -> typing.Dict[str, list]:
"""Gets the failed expectations from a CheckpointResult
def get_failed_expectations(self, checkpoint_result: CheckpointResult) -> str:
"""Gets the failed expectations from a CheckpointResult and returns them as a formatted string
Args:
checkpoint_result (CheckpointResult): CheckpointResult object
Returns:
fail_dict: Dictionary with information on which fields and expectations failed
fail_message: String with information on which fields and expectations failed
"""
fail_dict = {self.expectation_suite_name: {}}
run_name = list(checkpoint_result["run_results"].keys())[0]
Expand All @@ -131,7 +131,18 @@ def get_failed_expectations(
fail_dict[self.expectation_suite_name][column].append(
failed_expectation
)
return fail_dict
messages = []
for _, fields_dict in fail_dict.items():
for field, failed_expectations in fields_dict.items():
messages.append(
f"{field} has failed expectations {', '.join(failed_expectations)}"
)

fail_message = ("Great Expectations data validation has failed: ") + "; ".join(
messages
)

return fail_message

@staticmethod
def convert_nested_columns_to_json(
Expand Down Expand Up @@ -173,7 +184,5 @@ def run(self) -> typing.Union[typing.Dict[str, list], None]:
self._upload_results_file_to_synapse(latest_reults_path)

if not checkpoint_result.success:
fail_dict = self.get_failed_expectations(checkpoint_result)
return fail_dict

return None
fail_message = self.get_failed_expectations(checkpoint_result)
raise ADTDataValidationError(fail_message)
52 changes: 13 additions & 39 deletions src/agoradatatools/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pandas import DataFrame
from typer import Argument, Option, Typer

from agoradatatools.errors import ADTDataProcessingError, ADTDataValidationError
from agoradatatools.errors import ADTDataProcessingError
from agoradatatools.etl import extract, load, transform, utils
from agoradatatools.gx import GreatExpectationsRunner
from agoradatatools.logs import log_time
Expand Down Expand Up @@ -61,7 +61,6 @@ def process_dataset(
dataset_obj: dict,
staging_path: str,
syn: synapseclient.Synapse,
data_validation_error_list: list,
) -> tuple:
"""Takes in a dataset from the configuration file and passes it through the ETL process
Expand Down Expand Up @@ -134,19 +133,16 @@ def process_dataset(
else None
),
)
result = gx_runner.run()
gx_runner.run()

if result:
data_validation_error_list.append(result)

# syn_obj = load.load(
# file_path=json_path,
# provenance=dataset_obj[dataset_name]["provenance"],
# destination=dataset_obj[dataset_name]["destination"],
# syn=syn,
# )
syn_obj = load.load(
file_path=json_path,
provenance=dataset_obj[dataset_name]["provenance"],
destination=dataset_obj[dataset_name]["destination"],
syn=syn,
)

# return syn_obj
return syn_obj


def create_data_manifest(
Expand Down Expand Up @@ -197,48 +193,26 @@ def process_all_files(

load.create_temp_location(staging_path)

code_error_list = []
data_validation_error_list = []
error_list = []
if datasets:
for dataset in datasets:
try:
process_dataset(
dataset_obj=dataset,
staging_path=staging_path,
syn=syn,
data_validation_error_list=data_validation_error_list,
)
except Exception as e:
code_error_list.append(
error_list.append(
f"{list(dataset.keys())[0]}: " + str(e).replace("\n", "")
)

destination = config["destination"]

if code_error_list:
if error_list:
raise ADTDataProcessingError(
"\nData Processing has failed for one or more data sources. Refer to the list of errors below to address issues:\n"
+ "\n".join(code_error_list)
)

if data_validation_error_list:
print(data_validation_error_list)
breakpoint()
string_error_list = []
for error_dict in data_validation_error_list:
dataset_error_list = []
for dataset, errors in error_dict.items():
column = list(errors.keys())[0]
expectations = ", ".join(list(errors.values()))
dataset_error_string = f"Dataset {dataset} failed validation on column {column} with expectations {expectations}"
dataset_error_list.append(dataset_error_string)
string_error_list.extend(dataset_error_list)
print(string_error_list)
breakpoint()

raise ADTDataValidationError(
"\nData Validation has failed for one or more data sources. Refer to the list of errors below to address issues:\n"
+ "\n".join(data_validation_error_list)
+ "\n".join(error_list)
)

manifest_df = create_data_manifest(syn=syn, parent=destination)
Expand Down
Loading

0 comments on commit 355ad6b

Please sign in to comment.