-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Should we just adopt xarray-beam as our internal data model? #256
Comments
My main two questions would be
I will say that having the option to run a recipe using Dask has been helpful, since I've been able to manually run recipes wherever I happen to have a dask cluster. But as bakeries get more attention that will be less important. |
I'll write a more detailed response soon, but I will answer this question now:
Yes, keys can accommodate MergeDims. An in progress solution for this can be found here: google/xarray-beam#39 |
Probably not, also because Beam pipelines can be run locally with the direct runner (or, in Jupyter notebooks) and can be run in unit tests. Beam's Python SDK uses the Python logging module. I use PyCharm as my IDE, and I've use it's flavor of the Python debugger to step into Beam steps (transformations).
I don't have direct experience with this, and will have to investigate further. However, Beam pipelines "compile" onto Apache Spark environments. Thus, anywhere Spark can run (AWS, Azure, GCP, Databrick's offering, HPC), so can Beam pipelines. From what I've read, Beam is described as a high level API for agnostic MapReduce environments. Again, I will do some research to find out how easy the dev ops are in practice. |
The model I picked for distributed data in xarray-beam is similar to dask.array. You can have datasets of different size, but offsets are integer offsets from the origin, rather than index labels like in dask.dataframe. If you don't know dataset sizes ahead of time, you would need to calculate the information before creating Is there a particular example you are worried about?
I think this would be straightforward to implement. |
Hey @rabernat @cisaacstern, @sharkinsspatial and @TomAugspurger: How about we schedule a meeting to discuss execution engines and their tradeoffs? |
@alxmrs - thanks for keeping alive this important issue. I didn't mean to shut down the discussion at today's meeting. I'm just hyper focused on getting actually launched atm. But it would actually be great if you (Alex) could keep moving this design discussion and prototyping forward in parallel to the deployment work. Rather than scheduling a meeting right now, let's see if we can iterate a bit asynchronously, using this issue for discussion. Let me kick things off by suggesting that we are convolving two separate but related questions:
As a specific example of these tradeoffs, let's look at how Prefect does mapping. A prefect task can be either regular task of a mapped task. Many mapped tasks can be chained together. However, chained maps must be the same length. There is no mechanism for aggregation or grouping of tasks, as there is in Beam (and also possibly Dask, via blockwise). If we wanted to move to a model where data is actually passed between tasks (rather than written to disk via side effects), that means that _each stage of a Pangeo Forge Pipeline in Prefect would need to use the same number of tasks. For the typical NetCDF to Zarr workflow, assuming that the first layer contains one file per tasks, this would imply A specific proposal for investigation: what if we adopt xarray-beam's general concept where every stage can be thought of as a generator which yields a
Some questions that this raises:
These side inputs would possibly be able to eliminate our reliance on a metadata cache. The ideal outcome of this brainstorming would be a well-crafted design document for a new internal data model. Then we could get to work on actually performing this refactoring while maintaining backwards compatibility in terms of all of the real-world use cases we already support. |
Hey Ryan, Thanks for reframing the problem in this way. This is a great write up! It certainly gives me a lot to think about... I will start thinking about the internal PGF data model. I'll take my time, since the project is focused on the first launch anyway. |
I recently learned about lithops
Based on reading through the documentation, it looks like pangeo-forge today is already extremely compatible with the lithops execution model (does not support much inter-process communication). Lithops supports nearly every cloud's serverless offering. There would be some big advantages in moving to a serverless execution framework. We should consider the potential pros and cons vs. the apache beam idea. |
I’m more actively working on this specific design doc. For now, I’d like to respond to open concerns listed in this thread so far.
In theory, we could implement Prefect and Dask as Beam Runners (e.g. have adapter layers between Beam and, say, Prefect). In fact, the Beam Runner docs suggest that Beam’s internal data model is similar to Pangeo’s, at least in goal. Beam in essence includes a compiler from dataflow description graph onto execution primitives.
Switching to Beam could mean reusing their compiler architecture.
An early section of the Beam Runner docs mentions that the framework is flexible here: “You don’t have to do it all at once, and there may even be features that don’t make sense for your runner to ever support.” The standard practice is to throw an error if the runner receives a pipeline that it cannot execute. This suggests that one could build an MVP Beam runner and iterate. It would be interesting to investigate how feasible it would be to implement a Prefect runner for a Beam pipeline. The mapping docs you link lead me to believe that this would be totally tractable. [Responses concerning how this could be implemented with Prefect]> However, chained maps must be the same length. > There is no mechanism for aggregation or grouping of tasks, as there is in BeamSorry, can you point out where in the docs that it mentions this? Maybe, I’m misunderstanding their conception of a “Task”. Prefect seems to offer From reviewing the linked page, it sounds like Prefect includes all the same primitives that Beam offers – check out their guide on implementing primitives (it’s worth pointing out, we could implement this in Python only). I’m not necessarily advocating for Pangeo Forge to maintain a Beam adapter to Prefect. I want to establish that a mapping is possible. As mentioned in the PGF biweekly meetings, it should also be possible to create a runner from Beam to Dask, which I think would be way more fruitful. A core benefit to adopting the Beam model is that PGF would gain a lot of modularity, towards this point:
One way this could be achieved is if Pangeo-Forge Recipes became a library of Composite Transforms that interacted with each other, likely via shared PCollection interfaces. XArray-Beam does provide one model for such an interface that’s oriented around XArray Datasets. However, Pangeo-Forge could provide other models for other recipe types, like the HDReference recipe. Or, it’s likely for this case that the Beam primitives would be sufficient.
Beam Transformations include the capacity for adding side inputs. You can extract the value from a reduction and pass it into a side input, like so. Lithos vs BeamI agree with you that Pangeo-Forge today is compatible with Lithos. Here are a few pros vs cons about doubling down on the adaptor approach vs switching to Beam:
|
This is a prototype for using Apache Beam for the internal (and external?) data model of Pangeo Forge Recipes. Here, I demo how HDFReferenceRecipe could be structured into modular components via composite Beam transforms. xref: pangeo-forge#256
This is a prototype for using Apache Beam for the internal (and external?) data model of Pangeo Forge Recipes. Here, I demo how HDFReferenceRecipe could be structured into modular components via composite Beam transforms. xref: pangeo-forge#256
FYI Beam now supports Python 3.9 on both Pip and Conda. 🎉 |
Here is a little experiment I did tonight in preparation for tomorrow's sprint: https://gist.github.com/rabernat/15f77fb447e2cdbc73c4031c59768886 |
I've just successfully run a Beam Pipeline as a Dataflow job, deployed from a Jupyter Notebook. The pipeline is the official wordcount example. Notes:
import re
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
class WordExtractingDoFn(beam.DoFn):
"""Parse each line of input text into words."""
def process(self, element):
"""Returns an iterator over the words of this element.
The element is a line of text. If the line is blank, note that, too.
Args:
element: the element being processed
Returns:
The processed element.
"""
return re.findall(r'[\w\']+', element, re.UNICODE)
input_file = 'gs://dataflow-samples/shakespeare/kinglear.txt'
output_path = 'gs://beam-dataflow-test/counts.txt'
beam_options = PipelineOptions(
runner='DataflowRunner',
project='pangeo-forge-4967',
# Note: manually incremenent the `job_name` for each run.
# In production, this will be handled programmatically.
job_name='wordcount-example-0',
temp_location='gs://beam-dataflow-test/temp',
# Our institutional GCP policies forbid public IPs, xref:
# - https://cloud.google.com/dataflow/docs/reference/pipeline-options#security_and_networking
# - https://github.com/pangeo-forge/pangeo-forge-gcs-bakery/issues/29
use_public_ips=False,
# The private network we've already set up:
# - https://github.com/pangeo-forge/pangeo-forge-gcs-bakery/pull/30
# is in us-central1, so that's our only option unless/until we establish private networks elsewhere. xref:
# - https://console.cloud.google.com/networking/networks/details/default?project=pangeo-forge-4967&pageTab=SUBNETS
region='us-central1',
)
with beam.Pipeline(options=beam_options) as p:
# Read the text file[pattern] into a PCollection.
lines = p | 'Read' >> ReadFromText(input_file)
counts = (
lines
| 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
| 'GroupAndSum' >> beam.CombinePerKey(sum))
# Format the counts into a PCollection of strings.
def format_result(word, count):
return '%s: %d' % (word, count)
output = counts | 'Format' >> beam.MapTuple(format_result)
# Write the output using a "Write" transform that has side effects.
output | 'Write' >> WriteToText(output_path) |
I'm in the process of preparing a separate issue discussing the multi-cloud bakery story for Beam pipeline runners but I have one initial question here. Do we want to aim for consistency across all platforms for our pipeline runner (Flink on k8s) or will we want to leverage Dataflow on GCP? I would lean towards a consistent approach across cloud providers so we can leverage the Flink monitoring API for integration with |
Very interesting question, @sharkinsspatial. Personally, I would favor an incremental approach: Dataflow on GCP to begin, which allows us to de-risk and debug the Beam Pipelines themselves, with presumably minimal concerns regarding infrastructure. And then a subsequent phase in which we migrate to the more generalized, platform-agnostic implementation. My concern with going all-in on Flink to start is that too many things will be changing all at once (data model and infrastructure, instead of data model then infrastructure). Though I can also see the counter-argument that this incremental approach means accruing technical debt by working towards an approach which we may likely abandon before too long. |
In order to kickstart the discussion of implementing a Dask Beam runner, I propose we meet during the week of June 13-17. I have created a When2Meet Poll here - Dask Beam Runner Discussion - When2meet . If you are interested in attending, please give your availability. Hope to see many people there! 🚀 |
Re: @cisaacstern's incremental approach. That LGTM, and – It would be prudent to ensure the core recipe transforms span the runners we want to target in the long-term, as described in Beam's capability matrix. Also: I have a working prototype of a Dask Runner for Beam here alxmrs/beam#1. I would love to start stress testing it with Pangeo-Forge recipes, if at all possible. Ideally, PGF recipes should drive the capabilities of the Dask runner. |
Awesome, @alxmrs! Here's how you could start testing the Dask Runner via a PR to
Opening a PR to |
Thanks for the detailed instructions! I'll set this up later this week :) |
So excited about this! 🚀 I am making steady progress on my end...see #379. |
@rabernat wasn't sure the best place to give an update. I'm about to go on vacation for a week, and wanted to explain my status on Beam & Dask. From what I can tell, when apache/beam#23913 lands we should have enough implemented in the Dask Runner to support executing Pangeo Forge recipes. I've tried testing things locally, and PGF pipelines seem to run correctly (all exit codes are zero, but I haven't generated ARCO data e2e quite yet). The outstanding work to do on this PR involves passing lint + formatting checks. @pabloem is a good point-of-contact for carrying this change out to completion in my absence. |
TL;DR: what the title says
background
This team should feel very proud of the work we've done on this package over the past few months. We have shipped dozens of real recipes (albeit without any permanent infrastructure to deploy them on) and learned lots of important lessons in the process.
At the end of a long haul of work on this package, I'd like to zoom out and offer some reflections on our direction in terms of software architecture. I feel like we (specifically, I) have flailed around a bit in terms of the core internals of the Recipe model. Examples of big refactorings:
fsspec_open_kwargs
,query_string_secrets
, &is_opendap
attributes ofFilePattern
#167In the meantime, we have worked on many orthogonal but also important aspects, mostly around I/O. We have also waited months 😡 for Columbia to release our free credits from Prefect Cloud to actually deploy our primary bakery, which has given us plenty of time to contemplate our Prefect dependency.
In #245 (comment), based on my experience trying to generalize the input layer of pangeo-forge-recipes, I am basically suggesting we move to a much more general / flexible model for recipes, where we simply chain stages together. Something like
or in beam-style:
This, coupled with a one-way flow of data between these stages, would basically inevitably look a lot like xarray-beam.
We have already danced around various integrations with beam, for example:
I'd like to discuss the pros and cons of incorporating xarray-beam more tightly into pangeo-forge-recipes.
What is xarray-beam?
Xarray-beam's data model defines a convention for "keys", which are more-or-less equivalent to our Indexes. These keys can be used in the context of general Beam Pipelines which pass around xarray datasets. In contrast to pangeo-forge-recipes, there are not a lot of other custom objects. The API has just a few functions for reading, writing, rechunking, aggregating, etc.
A high level difference in the evolution of the two packages is that we kind of went bottom-up, starting from a very specific use case (many netcdfs to zarr) and are gradually trying to generalize, while they started as a much more general framework. So maybe not is the time to align more closely.
Some possible ways we could do this are...
Option A: Recipes ➡️ Beam Pipelines
This is the most "all-in" approach. We simply say that our Recipes are Beam Pipelines and build them using xarray-beam. We keep our input / file-pattern stuff and basically throw away a good portion of our code. (We still keep everything related to opening files / datasets, which is transferrable.)
Pros
Cons
Questions
Option B: Adopt parts of the xarray-beam data model but continue to use an adaptor
We continue to maintain a separate
Pipeline
object internally, but adopt more of the xarray-beam-like semantics, such asKeys
for each stage. We use this to allow arbitrary stages to be chained together using beam-like operation likemap
.Pros
Cons
Pipeline
object gets more complicated and harder to maintainOption C: Do nothing
We continue more-or-less ignoring xarray-beam and doing our own thing, possibly going down architectural rabbit holes that have presumably been solved already.
I have enumerated a lot more cons for switching to Beam, but I am still kind of thinking it's the right option.
Welcoming thoughts from everyone.
The text was updated successfully, but these errors were encountered: