Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to deal with 'side input' recipe - Example static ocean grid #663

Open
jbusecke opened this issue Dec 14, 2023 · 3 comments
Open

How to deal with 'side input' recipe - Example static ocean grid #663

jbusecke opened this issue Dec 14, 2023 · 3 comments

Comments

@jbusecke
Copy link
Contributor

I just ran into this case over at leap-stc/data-management#75 and I believe it is actually a super relevant use case for pangeo-forge:

In principal the situation is the following:
The user provides

  • a) A set of 'data' nc files that are supposed to be concatenated along time (so far a vanilla PGF case)
  • b) A single file or multiple files which represent coordinates only that are importantly invariable along the concatenation dimension. These should be combineable to a single xarray Dataset with potentially many coordinates. It is very common for numerical ocean model output to be saved out this way.

We usually want two things from b):

  • Use some of the coordinates contained in b) to apply some sort of preprocessing on the input file/dataset level. A common example is that you have land/ocean masks which we want to apply (e.g. replace some fill value with nans). This operation might require us to carefully map different masks (velocity, tracer) to different data variables. We thus need the full information in b) available
  • Add the coordinates contained in b) to the final output

Both of these aspects are very important for the AR aspect of the resulting dataset, and present a huge advantage to the user of the resulting ARCO data.

I would be curious what folks here think is a good workflow to achieve this. Some thoughts from my side:

  • 🥱 Tired: Process and save both datasets separately and do the masking/combining in post. This would be super simple to do I think, but would fall short of our goals here since it would shift expensive compute (masking) to the user, and make it more complicated to use the data
  • ⚡️Wired: If we can somehow combine two pipelines with at custom preprocessor. Thinking out loud and pseudo:
input_urls = [...]
input_urls_static = [...]
pattern = pattern_from_file_sequence(input_urls, concat_dim='time')
pattern_static = get_pattern_from_maybe_merging_coordinates(input_urls_static)

# does this succeed with all coords stripped?
@dataclass
class Preprocess(beam.PTransform):
    ds_static: xr.Dataset
    @staticmethod
    def _merge_and_mask(item: Indexed[T], static_ds) -> Indexed[T]:
        index, ds = item
        ds = xr.merge([ds_static, ds], ...)
        # now ds contains all the necessary info
        ds = ds.where(...)
        return index, ds
    def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
        return pcoll | 'Mask variables' >> beam.Map(self._set_coords)

static_rec = (
    beam.Create(pattern_static.items())|
    | OpenURLWithFSSpec()
    | OpenWithXarray()
)
rec = (
    beam.Create(pattern.items())
    | OpenURLWithFSSpec()
    | OpenWithXarray()
    | Preprocess(ds_static=static_rec) # 
    | StoreToZarr()
)

Making this work would really enable PGF to just fully ingest output from ocean models fresh of the HPC 🍞

  • 🧘 Inspired: Not sure if there is an even sleeker option haha. Excited to hear what others think
@jbusecke jbusecke changed the title How to deal with 'side input' data - Example static ocean grid How to deal with 'side input' recipe - Example static ocean grid Dec 14, 2023
@cisaacstern
Copy link
Member

Thanks for raising this, Julius.

I think something like your pseudocode example is entirely possible in Beam.

The challenge I see is that to deploy this, the pipeline object needs to get involved, i.e. IIUC composite transforms without a pipeline object cannot operate as a side input, so it would need to be something like:

with beam.Pipeline() as p:
    ds_static_side_input = p | static_rec
    recipe = (
        beam.Create(pattern.items())
        | OpenURLWithFSSpec()
        | OpenWithXarray()
        | Preprocess(ds_static=ds_static_side_input)
        | StoreToZarr()
    )

which means that pangeo-forge-runner will need to be made aware of this as a possible pattern, since that is where the pipelines are instantiated.

Other than that, I'm not aware of any fundamental technical blocker, will just require someone spending some time on it to explore further.

@jbusecke
Copy link
Contributor Author

Thats great to hear @cisaacstern. What do you suggest as the next action item? xref/move this issue in -runner? I would love to work on this soon, but am not confident I can make much of a dent by myself.

@keewis
Copy link
Contributor

keewis commented Jan 3, 2024

Not sure if this is helpful, but from the little I know of beam it sounds like it might be possible to use branching for this. In other words, you might be able join two different PCollections together, if you can figure out how to do "broadcasting" in beam.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants