Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IBCDPE-946] Adds Optional Uploading #133

Merged
merged 6 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ jobs:
python-version: "3.9"
- run: pip install -U setuptools
- run: pip install .
- run: adt test_config.yaml -t ${{secrets.SYNAPSE_PAT}}
- run: adt test_config.yaml --upload --platform GITHUB --token ${{secrets.SYNAPSE_PAT}}

ghcr-publish:
needs: [build, test]
Expand Down
93 changes: 53 additions & 40 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@

## Contributing

We welcome all contributions! That said, this is a Sage Bionetworks owned project, and we use JIRA ([AG](https://sagebionetworks.jira.com/jira/software/c/projects/AG/boards/91)/[IBCDPE](https://sagebionetworks.jira.com/jira/software/c/projects/IBCDPE/boards/189)) to track any bug/feature requests. This guide will be more focussed on a Sage Bio employee's development workflow. If you are a Sage Bio employee, make sure to assign yourself the JIRA ticket if you decide to work on it.
We welcome all contributions! That said, this is a Sage Bionetworks owned project, and we use JIRA ([AG](https://sagebionetworks.jira.com/jira/software/c/projects/AG/boards/91)/[IBCDPE](https://sagebionetworks.jira.com/jira/software/c/projects/IBCDPE/boards/189)) to track any bug/feature requests. This guide will be more focussed on a Sage Bio employee's development workflow. If you are a Sage Bio employee, make sure to assign yourself the JIRA ticket if you decide to work on it.

## Coding Style

Expand All @@ -24,7 +23,7 @@ The code in this package is also automatically formatted by `black` for consiste

### Install development dependencies

Please follow the [README.md](README.md) to install the package for development purposes. Be sure you run this:
Please follow the [README.md](README.md) to install the package for development purposes. Be sure you run this:

```
pipenv install --dev
Expand All @@ -33,53 +32,66 @@ pipenv install --dev
### Developing at Sage Bio

The agora-data-tools project follows the standard [trunk based development](https://trunkbaseddevelopment.com/) development strategy.

> To ensure the most fluid development, do not push to `dev`!

1. Please ask for write permissions to contribute directly to this repository.
1. Make sure you are always creating feature branches from the `dev` branch. We use branches instead of forks, because CI/CD cannot access secrets across Github forks.

```shell
git checkout dev
git pull
```
```shell
git checkout dev
git pull
```

1. Create a feature branch from the `dev` branch. Use the Id of the JIRA issue that you are addressing and name the branch after the issue with some more detail like `{user}/{JIRA}-123/add-some-new-feature`.

```shell
git checkout dev
git checkout -b tyu/JIRA-123/some-new-feature
```
```shell
git checkout dev
git checkout -b tyu/JIRA-123/some-new-feature
```

1. At this point, you have only created the branch locally, you need to push this to your fork on GitHub.

```shell
git push --set-upstream origin tyu/JIRA-123/some-new-feature
```
```shell
git push --set-upstream origin tyu/JIRA-123/some-new-feature
```

You should now be able to see the branch on GitHub. Make commits as you deem necessary. It helps to provide useful commit messages - a commit message saying 'Update' is a lot less helpful than saying 'Remove X parameter because it was unused'.
You should now be able to see the branch on GitHub. Make commits as you deem necessary. It helps to provide useful commit messages - a commit message saying 'Update' is a lot less helpful than saying 'Remove X parameter because it was unused'.

```shell
git commit changed_file.txt -m "Remove X parameter because it was unused"
git push
```
```shell
git commit changed_file.txt -m "Remove X parameter because it was unused"
git push
```

1. Once you have made your additions or changes, make sure you write tests and run the test suite. More information on testing below.
1. Once you have made your additions or changes, make sure you write tests and run the test suite. More information on testing below.

```shell
pytest -vs tests/
```
```shell
pytest -vs tests/
```

1. Make sure to run the auto python code formatter, black.

```shell
black ./
```
```shell
black ./
```

1. Test your changes by running `agora-data-tools` locally.

```
adt test_config.yaml
```

If your changes have to do with the way that files are uploaded to Synapse, create a new configuration file by copying `test_config.yaml` and changing the `destination` and `gx_folder` fields to testing locations that you own. The command will change to be:

```
adt my_dev_config.yaml --upload
```

1. Once you have completed all the steps above, create a pull request from the feature branch to the `dev` branch of the Sage-Bionetworks/agora-data-tools repo.

> *A code maintainer must review and accept your pull request.* Most code reviews can be done asyncronously. For more complex code updates, an "in-person" or zoom code review can happen between the reviewer(s) and contributor.
> _A code maintainer must review and accept your pull request._ Most code reviews can be done asyncronously. For more complex code updates, an "in-person" or zoom code review can happen between the reviewer(s) and contributor.

This package uses [semantic versioning](https://semver.org/) for releasing new versions. A github release should occur at least once a quarter to capture the changes between releases. Currently releases are minted by admins of this repo, but there is no formal process of when releases are minted except for more freqeunt releases leading to smaller changelogs.
This package uses [semantic versioning](https://semver.org/) for releasing new versions. A github release should occur at least once a quarter to capture the changes between releases. Currently releases are minted by admins of this repo, but there is no formal process of when releases are minted except for more freqeunt releases leading to smaller changelogs.

<!-- This package uses [semantic versioning](https://semver.org/) for releasing new versions. The version should be updated on the `dev` branch as changes are reviewed and merged in by a code maintainer. The version for the package is maintained in the [agoradatatools/__init__.py](agoradatatools/__init__.py) file. A github release should also occur every time `dev` is pushed into `main` and it should match the version for the package. -->

Expand Down Expand Up @@ -134,28 +146,29 @@ Follow gitflow best practices as linked above.

### Transforms

This package has a `src/agoradatatools/etl/transform` submodule. This folder houses all the individual transform modules required for the package. Here are the steps to add more transforms:
This package has a `src/agoradatatools/etl/transform` submodule. This folder houses all the individual transform modules required for the package. Here are the steps to add more transforms:

1. Create new script in the transform submodule that matches the dataset name and name the function `transform_...`. For example, if you have a dataset named `genome_variants`, your new script would be `src/agoradatatools/etl/transform/transform_genome_variants.py`.
1. Create new script in the transform submodule that matches the dataset name and name the function `transform_...`. For example, if you have a dataset named `genome_variants`, your new script would be `src/agoradatatools/etl/transform/transform_genome_variants.py`.
1. Register the new transform function in `src/agoradatatools/etl/transform/__init__.py`. Look in that file for examples.
1. Modify the `apply_custom_transformations` in `src/agoradatatools/process.py` to include your new transform.
1. Write a test for the transform:
- For transform tests, we are using a [Data-Driven Testing](https://www.develer.com/en/blog/data-driven-testing-with-python/) strategy
- To contribute new tests, assets in the form of input and output data files are needed.
- The input file is loaded to serve as the data fed into the transform function, while the output file is loaded in to check the function output against.
- These tests should include multiple ways of evaluating the transform function, including one test that should pass (good input data) and at least one that should fail (bad input data).
- For some functions, it may be appropriate to include multiple passing datasets (e.g. for functions that are written to handle imperfect data) and/or multiple failing datasets (e.g. for transforms operating on datasets that can be unclean in multiple distinct ways).
- Each transform function should have its own folder in `test_assets` to hold its input and output data files. Inputs should be in CSV form and outputs in JSON form.
- Use `pytest.mark.parameterize` to loop through multiple datasets in a single test.
- The class `TestTransformGenesBiodomains` can be used as an example for future tests contibuted.
- For transform tests, we are using a [Data-Driven Testing](https://www.develer.com/en/blog/data-driven-testing-with-python/) strategy
- To contribute new tests, assets in the form of input and output data files are needed.
- The input file is loaded to serve as the data fed into the transform function, while the output file is loaded in to check the function output against.
- These tests should include multiple ways of evaluating the transform function, including one test that should pass (good input data) and at least one that should fail (bad input data).
- For some functions, it may be appropriate to include multiple passing datasets (e.g. for functions that are written to handle imperfect data) and/or multiple failing datasets (e.g. for transforms operating on datasets that can be unclean in multiple distinct ways).
- Each transform function should have its own folder in `test_assets` to hold its input and output data files. Inputs should be in CSV form and outputs in JSON form.
- Use `pytest.mark.parameterize` to loop through multiple datasets in a single test.
- The class `TestTransformGenesBiodomains` can be used as an example for future tests contibuted.

### Great Expectations

This package uses [Great Expectations](https://greatexpectations.io/) to validate output data. The `src/agoradatatools/great_expectations` folder houses our file system data context and Great Expectations-specific configuration files. Eventually, our goal is for each `agora-data-tools` dataset to be convered by an expectation suite. To add data validation for more datasets, follow these steps:
This package uses [Great Expectations](https://greatexpectations.io/) to validate output data. The `src/agoradatatools/great_expectations` folder houses our file system data context and Great Expectations-specific configuration files. Eventually, our goal is for each `agora-data-tools` dataset to be convered by an expectation suite. To add data validation for more datasets, follow these steps:

1. Create a new expectation suite by defining the expectations for the dataset in a Jupyter Notebook inside the `gx_suite_definitions` folder. Use `metabolomics.ipynb` as an example. You can find a catalog of existing expectations [here](https://greatexpectations.io/expectations/).
1. Run the notebook to generate the new expectation suite. It should populate as a JSON file in the `/great_expectations/expectations` folder.
1. Add support for running Great Expectations on a dataset by adding `gx_enabled: true` to the configuration for the datatset in both `test_config.yaml` and `config.yaml`. After updating the config files reports should be uploaded in the proper locations ([Prod](https://www.synapse.org/#!Synapse:syn52948668), [Testing](https://www.synapse.org/#!Synapse:syn52948670)) when data processing is complete.
1. Add support for running Great Expectations on a dataset by adding `gx_enabled: true` to the configuration for the datatset in both `test_config.yaml` and `config.yaml`. After updating the config files reports should be uploaded in the proper locations ([Prod](https://www.synapse.org/#!Synapse:syn52948668), [Testing](https://www.synapse.org/#!Synapse:syn52948670)) when data processing is complete.
- You can prevent Great Expectations from running for a dataset by removing the `gx_enabled: true` from the configuration for the dataset.
1. Test data processing by running `adt test_config.yaml` and ensure that HTML reports with all expectations are generated and uploaded to the proper folder in Synapse.

#### Custom Expectations
Expand Down
6 changes: 4 additions & 2 deletions src/agoradatatools/gx.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(
syn: Synapse,
dataset_path: str,
dataset_name: str,
upload_folder: str,
upload_folder: str = None,
nested_columns: typing.List[str] = None,
):
"""Initialize the class"""
Expand Down Expand Up @@ -180,7 +180,9 @@ def run(self) -> None:
f"Data validation complete for {self.expectation_suite_name}. Uploading results to Synapse."
)
latest_reults_path = self._get_results_path(checkpoint_result)
self._upload_results_file_to_synapse(latest_reults_path)

if self.upload_folder:
self._upload_results_file_to_synapse(latest_reults_path)

if not checkpoint_result.success:
fail_message = self.get_failed_expectations(checkpoint_result)
Expand Down
87 changes: 61 additions & 26 deletions src/agoradatatools/process.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import logging
import typing
from enum import Enum

import synapseclient
from pandas import DataFrame
Expand All @@ -12,6 +14,12 @@
logger = logging.getLogger(__name__)


class Platform(Enum):
LOCAL = "LOCAL"
GITHUB = "GITHUB"
NEXTFLOW = "NEXTFLOW"


# TODO refactor to avoid so many if's - maybe some sort of mapping to callables
def apply_custom_transformations(datasets: dict, dataset_name: str, dataset_obj: dict):
if not isinstance(datasets, dict) or not isinstance(dataset_name, str):
Expand Down Expand Up @@ -62,17 +70,16 @@ def process_dataset(
staging_path: str,
gx_folder: str,
syn: synapseclient.Synapse,
) -> tuple:
upload: bool = True,
) -> None:
"""Takes in a dataset from the configuration file and passes it through the ETL process

Args:
dataset_obj (dict): A dataset defined in the configuration file
staging_path (str): Staging path
gx_folder (str): Synapse ID of the folder where Great Expectations reports should be uploaded
syn (synapseclient.Synapse): synapseclient.Synapse session.

Returns:
syn_obj (tuple): Tuple containing the id and version number of the uploaded file.
upload (bool, optional): Whether or not to upload the data to Synapse. Defaults to True.
"""

dataset_name = list(dataset_obj.keys())[0]
Expand Down Expand Up @@ -128,7 +135,7 @@ def process_dataset(
syn=syn,
dataset_path=json_path,
dataset_name=dataset_name,
upload_folder=gx_folder,
upload_folder=gx_folder if upload else None,
nested_columns=(
dataset_obj[dataset_name]["gx_nested_columns"]
if "gx_nested_columns" in dataset_obj[dataset_name].keys()
Expand All @@ -137,34 +144,32 @@ def process_dataset(
)
gx_runner.run()

syn_obj = load.load(
file_path=json_path,
provenance=dataset_obj[dataset_name]["provenance"],
destination=dataset_obj[dataset_name]["destination"],
syn=syn,
)

return syn_obj
BWMac marked this conversation as resolved.
Show resolved Hide resolved
if upload:
load.load(
file_path=json_path,
provenance=dataset_obj[dataset_name]["provenance"],
destination=dataset_obj[dataset_name]["destination"],
syn=syn,
)


def create_data_manifest(
syn: synapseclient.Synapse, parent: synapseclient.Folder = None
) -> DataFrame:
) -> typing.Union[DataFrame, None]:
"""Creates data manifest (dataframe) that has the IDs and version numbers of child synapse folders

Args:
syn (synapseclient.Synapse): Synapse client session.
parent (synapseclient.Folder/str, optional): synapse folder or synapse id pointing to parent synapse folder. Defaults to None.

Returns:
DataFrame: Dataframe containing IDs and version numbers of folders within the parent directory
Dataframe containing IDs and version numbers of folders within the parent directory, or None if parent is None
"""

if not parent:
return None

folders = syn.getChildren(parent)
folder = [folders]
folder = [
{"id": folder["id"], "version": folder["versionNumber"]} for folder in folders
]
Expand All @@ -176,17 +181,28 @@ def create_data_manifest(
def process_all_files(
syn: synapseclient.Synapse,
config_path: str = None,
platform: Platform = Platform.LOCAL,
upload: bool = True,
):
"""This function will read through the entire configuration and process each file listed.

Args:
syn (synapseclient.Session): Synapse client session
config_path (str, optional): path to configuration file. Defaults to None.
platform (Platform, optional): Platform where the process is being run. One of LOCAL, GITHUB, NEXTFLOW. Defaults to LOCAL.
upload (bool, optional): Whether or not to upload the data to Synapse. Defaults to True.
"""
if platform == Platform.LOCAL and upload is True:
logger.warning(
"""Data will be uploaded to Synapse despite the platform being set to `LOCAL`.
Make sure you have provided a configuration file with alternative upload `destination` and `gx_folder`.
See the contributing guide for more information."""
)

config = utils._get_config(config_path=config_path)

datasets = config["datasets"]
destination = config["destination"]

# create staging location
staging_path = config.get("staging_path", None)
Expand All @@ -204,14 +220,13 @@ def process_all_files(
staging_path=staging_path,
gx_folder=config["gx_folder"],
syn=syn,
upload=upload,
)
except Exception as e:
error_list.append(
f"{list(dataset.keys())[0]}: " + str(e).replace("\n", "")
)

destination = config["destination"]

if error_list:
raise ADTDataProcessingError(
"\nData Processing has failed for one or more data sources. Refer to the list of errors below to address issues:\n"
Expand All @@ -223,18 +238,33 @@ def process_all_files(
df=manifest_df, staging_path=staging_path, filename="data_manifest.csv"
)

load.load(
file_path=manifest_path,
provenance=manifest_df["id"].to_list(),
destination=destination,
syn=syn,
)
if upload:
load.load(
file_path=manifest_path,
provenance=manifest_df["id"].to_list(),
destination=destination,
syn=syn,
)


app = Typer()


input_path_arg = Argument(..., help="Path to configuration file for processing run")

platform_opt = Option(
"LOCAL",
"--platform",
"-p",
help="Platform that is running the process. Must be one of LOCAL, GITHUB, or NEXTFLOW.",
show_default=True,
)
upload_opt = Option(
False,
"--upload",
"-u",
help="Toggles whether or not files will be uploaded to Synapse.",
show_default=True,
)
synapse_auth_opt = Option(
None,
"--token",
Expand All @@ -247,10 +277,15 @@ def process_all_files(
@app.command()
def process(
config_path: str = input_path_arg,
platform: str = platform_opt,
upload: bool = upload_opt,
auth_token: str = synapse_auth_opt,
):
syn = utils._login_to_synapse(token=auth_token)
process_all_files(syn=syn, config_path=config_path)
platform_enum = Platform(platform)
process_all_files(
syn=syn, config_path=config_path, platform=platform_enum, upload=upload
)


if __name__ == "__main__":
Expand Down
Loading
Loading