Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Config to Pass Around Injected Args #652

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions examples/feedstock/gpcp_from_gcs_w_custom_transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
from dataclasses import dataclass, field
from typing import Union

import apache_beam as beam
import pandas as pd
import zarr

from pangeo_forge_recipes.config import Config
from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
from pangeo_forge_recipes.storage import FSSpecTarget
from pangeo_forge_recipes.transforms import (
OpenURLWithFSSpec,
OpenWithXarray,
RequiredAtRuntimeDefault,
StoreToZarr,
)

dates = [
d.to_pydatetime().strftime("%Y%m%d")
for d in pd.date_range("1996-10-01", "1999-02-01", freq="D")
]


def make_url(time):
url_base = "https://storage.googleapis.com/pforge-test-data"
return f"{url_base}/gpcp/v01r03_daily_d{time}.nc"


concat_dim = ConcatDim("time", dates, nitems_per_file=1)
pattern = FilePattern(make_url, concat_dim)


def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore:
# This fails integration test if not imported here
# TODO: see if --setup-file option for runner fixes this
import xarray as xr

ds = xr.open_dataset(store, engine="zarr", chunks={})
assert ds.title == (
"Global Precipitation Climatatology Project (GPCP) " "Climate Data Record (CDR), Daily V1.3"
)
# Making sure that the native chunking is different from the dynamic chunking
assert ds.chunks["time"][0] == 1

return store


@dataclass
class MyCustomTransform(beam.PTransform):

target_root: Union[str, FSSpecTarget, RequiredAtRuntimeDefault] = field(
default_factory=RequiredAtRuntimeDefault
)

def expand(
self,
pcoll: beam.PCollection,
) -> beam.PCollection:
return pcoll


config = Config()

recipe = (
beam.Create(pattern.items())
| OpenURLWithFSSpec()
| OpenWithXarray(file_type=pattern.file_type, xarray_open_kwargs={"decode_coords": "all"})
| MyCustomTransform(target_root=config.target_storage)
| StoreToZarr(
store_name="gpcp.zarr",
combine_dims=pattern.combine_dim_keys,
)
| "Test dataset" >> beam.Map(test_ds)
)
2 changes: 2 additions & 0 deletions examples/feedstock/meta.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
recipes:
- id: "gpcp-from-gcs"
object: "gpcp_from_gcs:recipe"
- id: "gpcp-from-gcs-w-custom-transform"
object: "gpcp_from_gcs_w_custom_transform:recipe"
- id: "gpcp-from-gcs-dynamic-chunks"
object: "gpcp_from_gcs_dynamic_chunks:recipe"
- id: "noaa-oisst"
Expand Down
41 changes: 41 additions & 0 deletions pangeo_forge_recipes/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from dataclasses import dataclass, field
from typing import Optional, Union

from pangeo_forge_recipes.storage import CacheFSSpecTarget, FSSpecTarget
from pangeo_forge_recipes.transforms import RequiredAtRuntimeDefault


@dataclass
class Config:
Comment on lines +8 to +9
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add this to the API Reference section of the docs.

And possibly link to it in either/or:

  • The discussion of "Deploy-time configurable keyword arguments" under **Recipe composition > Transforms"
  • a new Advanced Topics section, e.g. "Custom Transforms"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks 💯 I'll get around to this after hearing back from Yuvi and other folks

"""a simple class that contains all possible injection spec values

folks can import it into their recipes to pass injection spec values
around to custom beam routines as the example below shows
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conventionally, our docstrings tend to be a bit less colloquial, could we formalize the language a bit here? Thanks!


Examples:
```python
from config import Config

@dataclass
class MyCustomTransform(beam.PTransform):

# this custom transform class is not listed
# in `pangeo_forge_recipes.injections.py:get_injection_specs()`
# and the instance attr will therefore not be dependency injected
# but we can use the `config.Config` to pass around the injections
target_root: None

def expand(self,
pcoll: beam.PCollection[Tuple[Index, xr.Dataset]],
) -> beam.PCollection[Tuple[Index, xr.Dataset]]:
return pcoll

config = Config()
recipe = (beam.Create() | MyCustomTransform(target_storage=config.target_storage))
```
"""

target_storage: Union[str, FSSpecTarget, RequiredAtRuntimeDefault] = field(
default_factory=RequiredAtRuntimeDefault
)
input_cache_storage: Optional[Union[str, CacheFSSpecTarget]] = ""
4 changes: 4 additions & 0 deletions pangeo_forge_recipes/injections.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
def get_injection_specs():
return {
"Config": {
"target_storage": "TARGET_STORAGE",
"input_cache_storage": "INPUT_CACHE_STORAGE",
},
"StoreToZarr": {
"target_root": "TARGET_STORAGE",
},
Comment on lines 7 to 9
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"StoreToZarr": {
"target_root": "TARGET_STORAGE",
},

Should we take this a step further, and actually use this config object internally?

Meaning delete the StoreToZarr, WriteCombinedReference, and OpenURLWithFSSpec injections specs here, and instead doing something like:

# transforms.py

from .config import Config

c = Config()
...

class StoreToZarr(PTransform):
    ...
-   target_root: Union[str, FSSpecTarget, RequiredAtRuntimeDefault] = field(
-       default_factory=RequiredAtRuntimeDefault
+   target_root: Union[str, FSSpecTarget, RequiredAtRuntimeDefault] = c.target_storage

Without fiddling with it a bit I'm not sure exactly how this would work, but conceptually, what do we think about making Config the interface to all injections, including for internal transforms?

Copy link
Contributor Author

@ranchodeluxe ranchodeluxe Nov 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do enjoy where you're going with this ✨ Given that dependency injection is targeting the recipe this won't work AFAICT without more complicated acrobatics in the AST rewriter which I think we should avoid. So my feedback here is that we just do what you are proposing in the recipes explicitly:

benefits

  • recipes < 0.10.x (some version with this change) can easily be backward compatible and still leverage the existing dep injection

  • recipes >= 0.10.x (some version with this change) are now explicit and folks need less explanation about how this all works (except the explanation about dep injection). For example, with this change you probably don't need to even write documentation about how to write custom transforms b/c StoreToZarr, WriteCombinedReference and OpenURLWithFSSpec recipe examples already show it how it works

  • it's only a couple/few call sites we'd be passing conf.target_storage and conf.input_cache_storage to per recipe

drawbacks

  • if the number of transforms we write requiring conf.target_storage and conf.input_cache_storage dramatically increases then, yes, it'd seem we might be doing an unnecessary amount of input passing

Expand Down