Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IBCDPE-947] GX Validation Record Keeping #135

Merged
merged 23 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ jobs:
python-version: "3.9"
- run: pip install -U setuptools
- run: pip install .
- run: adt test_config.yaml --upload --platform GITHUB --token ${{secrets.SYNAPSE_PAT}}
- run: adt test_config.yaml --upload --platform GITHUB --run_id ${{ github.run_id }} --token ${{secrets.SYNAPSE_PAT}}

ghcr-publish:
needs: [build, test]
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,6 @@ staging/*

#test staging location
test_staging_dir/

# dev config file
dev_config.yaml
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ The agora-data-tools project follows the standard [trunk based development](http
adt test_config.yaml
```

If your changes have to do with the way that files are uploaded to Synapse, create a new configuration file by copying `test_config.yaml` and changing the `destination` and `gx_folder` fields to testing locations that you own. The command will change to be:
If your changes have to do with the way that files are uploaded to Synapse and/or uploading new records to the ADT GX Synapse table, create a new configuration file by copying `test_config.yaml` and changing the `destination`, `gx_folder`, and `gx_table` fields to testing locations that you own. The command will change to be:

```
adt my_dev_config.yaml --upload
Expand Down
2,378 changes: 1,196 additions & 1,182 deletions Pipfile.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
destination: &dest syn12177492
staging_path: ./staging
gx_folder: syn52948668
gx_table: syn60527066
sources:
- genes_biodomains:
genes_biodomains_files: &genes_biodomains_files
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ packages = find:
install_requires =
pandas~=2.0.0
numpy~=1.21
setuptools~=67.0.0
setuptools~=70.0.0
synapseclient~=4.0.0
PyYAML~=6.0
pyarrow~=14.0.1
Expand Down
64 changes: 47 additions & 17 deletions src/agoradatatools/gx.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,30 @@
import json
import logging
import os
import shutil
import json
import typing

import pandas as pd

from agoradatatools.errors import ADTDataValidationError
from typing import Optional

import great_expectations as gx
import pandas as pd
from great_expectations.checkpoint.types.checkpoint_result import CheckpointResult
from synapseclient import Activity, File, Synapse

from agoradatatools.reporter import DatasetReport

logger = logging.getLogger(__name__)
# Disable GX INFO logging
logging.getLogger("great_expectations").setLevel(logging.WARNING)


class GreatExpectationsRunner:
"""Class to run great expectations on a dataset and upload the HTML report to Synapse"""

failures: bool = False
failure_message: Optional[str] = None
report_file: Optional[str] = None
report_version: Optional[int] = None
report_link: Optional[str] = None

def __init__(
self,
syn: Synapse,
Expand All @@ -43,12 +48,12 @@ def __init__(
from expectations.expect_column_values_to_have_list_length import (
ExpectColumnValuesToHaveListLength,
)
from expectations.expect_column_values_to_have_list_members import (
ExpectColumnValuesToHaveListMembers,
)
from expectations.expect_column_values_to_have_list_length_in_range import (
ExpectColumnValuesToHaveListLengthInRange,
)
from expectations.expect_column_values_to_have_list_members import (
ExpectColumnValuesToHaveListMembers,
)
from expectations.expect_column_values_to_have_list_members_of_type import (
ExpectColumnValuesToHaveListMembersOfType,
)
Expand All @@ -74,7 +79,12 @@ def _check_if_expectation_suite_exists(self) -> bool:
return exists

def _get_results_path(self, checkpoint_result: CheckpointResult) -> str:
"""Gets the path to the most recent HTML report for a checkpoint, copies it to a Synapse-API friendly name, and returns the new path"""
"""Gets the path to the most recent HTML report for a checkpoint,
copies it to a Synapse-API friendly name, and returns the new path

Args:
checkpoint_result (CheckpointResult): CheckpointResult object from GX validation run.
"""
validation_results = checkpoint_result.list_validation_result_identifiers()
latest_validation_result = validation_results[0]

Expand All @@ -97,8 +107,13 @@ def _get_results_path(self, checkpoint_result: CheckpointResult) -> str:
return new_results_path

def _upload_results_file_to_synapse(self, results_path: str) -> None:
"""Uploads a results file to Synapse"""
self.syn.store(
"""Uploads a results file to Synapse. Assigns class attributes associated
with the report file.

Args:
results_path (str): Path to the GX report file.
"""
file = self.syn.store(
File(
results_path,
parentId=self.upload_folder,
Expand All @@ -109,12 +124,26 @@ def _upload_results_file_to_synapse(self, results_path: str) -> None:
),
forceVersion=True,
)
self.report_file = file.id
self.report_version = file.versionNumber
self.report_link = DatasetReport.format_link(
syn_id=file.id, version=file.versionNumber
)

@staticmethod
def convert_nested_columns_to_json(
df: pd.DataFrame, nested_columns: typing.List[str]
) -> pd.DataFrame:
"""Converts nested columns in a DataFrame to JSON-parseable strings"""
"""Converts nested columns in a DataFrame to JSON-parseable strings

Args:
df (pd.DataFrame): DataFrame
nested_columns (typing.List[str]): List of nested columns

Returns:
df (pd.DataFrame): DataFrame with nested columns converted to JSON-parseable strings
"""
df = df.copy()
for column in nested_columns:
df[column] = df[column].apply(json.dumps)
return df
Expand All @@ -126,7 +155,7 @@ def get_failed_expectations(self, checkpoint_result: CheckpointResult) -> str:
checkpoint_result (CheckpointResult): CheckpointResult object

Returns:
fail_message: String with information on which fields and expectations failed
fail_message (str): String with information on which fields and expectations failed
"""
fail_dict = {self.expectation_suite_name: {}}
expectation_results = checkpoint_result.list_validation_results()[0]["results"]
Expand All @@ -153,7 +182,8 @@ def get_failed_expectations(self, checkpoint_result: CheckpointResult) -> str:
return fail_message

def run(self) -> None:
"""Run great expectations on a dataset and upload the results to Synapse"""
"""Run great expectations on a dataset and upload the results to Synapse."""

if not self._check_if_expectation_suite_exists():
return

Expand Down Expand Up @@ -185,5 +215,5 @@ def run(self) -> None:
self._upload_results_file_to_synapse(latest_reults_path)

if not checkpoint_result.success:
fail_message = self.get_failed_expectations(checkpoint_result)
raise ADTDataValidationError(fail_message)
self.failures = True
self.failure_message = self.get_failed_expectations(checkpoint_result)
9 changes: 5 additions & 4 deletions src/agoradatatools/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ def format_seconds(seconds):
def time_function(func, *args, **kwargs):
"""Returns the elapsed time for a function to run."""
start_time = time.monotonic()
func(*args, **kwargs)
result = func(*args, **kwargs)
end_time = time.monotonic()
elapsed_time = end_time - start_time
elapsed_time_formatted = format_seconds(elapsed_time)
return elapsed_time_formatted
return elapsed_time_formatted, result


def log_time(func_name: str, logger: logging.Logger):
Expand All @@ -43,19 +43,20 @@ def wrapped(*args, **kwargs):
if func_name == "process_dataset":
dataset = next(iter(kwargs["dataset_obj"]))
logger.info("Now processing %s dataset", dataset)
elapsed_time_formatted = time_function(func, *args, **kwargs)
elapsed_time_formatted, result = time_function(func, *args, **kwargs)
logger.info("Processing complete for %s dataset", dataset)
string_list = [elapsed_time_formatted, dataset]
message = "Elapsed time: %s for %s dataset"

if func_name == "process_all_files":
logger.info("Agora Data Tools processing has started")
elapsed_time_formatted = time_function(func, *args, **kwargs)
elapsed_time_formatted, result = time_function(func, *args, **kwargs)
logger.info("Agora Data Tools processing has completed")
string_list = [elapsed_time_formatted]
message = "Elapsed time: %s for all data processing"

logger.info(message, *string_list)
return result

return wrapped

Expand Down
Loading