Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposed Recipes for Global Drifter Program #101

Open
philippemiron opened this issue Feb 1, 2022 · 23 comments · May be fixed by #186
Open

Proposed Recipes for Global Drifter Program #101

philippemiron opened this issue Feb 1, 2022 · 23 comments · May be fixed by #186

Comments

@philippemiron
Copy link

Source Dataset

I'm trying to port a code that converts the hourly GDP dataset (~20'000 individual netCDF files) into a ragged array to a pangeo-forge recipe.

  • AOML website link.
  • The file format is netCDF.
  • One file per drifter.
  • Accessible through ftp
    • ftp://ftp.aoml.noaa.gov/phod/pub/lumpkin/hourly/v2.00/netcdf/drifter_101143.nc

Transformation / Alignment / Merging

The files contain a few variables and metadata that should in fact be stored as variables. I have a draft recipe that cleans up the file, parses the date, converts metadata to variables. The files have two dimensions ['traj', 'obs'], where each file (one ['traj']) contains a different number of observations ['obs'] (this ranges from ~1000-20000+). More precisely, scalar variables['traj'] are: type of buoy, dimensions, launch date, drogue loss date, etc., and vector variables['obs'] are: lon, lat, ve, vn, time, err, etc.

To create the ragged array, scalar variables should be concatenated together, and the same goes for the various vector variables. My two issues are:

  • is it possible to concatenate on multiple dimensions?
  • is it possible to set a non-uniform chunk size? Chunks would be set to the number of observations per trajectory, which is needed to merge the nth chunk and allow for efficient calculations per trajectory.

Cheers,

Output Dataset

Single netCDF archive, or a Zarr folder.

@rabernat
Copy link
Contributor

rabernat commented Feb 1, 2022

Thanks for posting this Philippe! Pinging @selipot who I believe has NSF Earthcube funding to work on this topic!

  • is it possible to concatenate on multiple dimensions?

Not currently, but this is in the works (see pangeo-forge/pangeo-forge-recipes#140)

  • is it possible to set a non-uniform chunk size?

On the target Zarr dataset? No, Zarr does not support this.


Given the ragged nature of the data you described, Xarray / Zarr may not be a good fit at all for the target format. Have you considered either

  • Awkward Array (cc @jpivarski) - what is the ideal on-disk format?
  • Using a tabular data model (e.g. Pandas dataframe) + Parquet storage?

@selipot
Copy link

selipot commented Feb 1, 2022

Thank you for pinging me. Yes, @philippemiron's work on this is funded through my CloudDrift EarthCube project :)

@philippemiron
Copy link
Author

Thanks for the suggestion, I will take a look at Awkward Array. I have not considered pandas, the data set is not that large ~17 GB, but I believe xarray would be easier to work with. I realized that ragged arrays are not really a good fit for Xarray but I have been testing using groupby() operation from flox by setting nonuniform Dask chunks per trajectory and hope to make operations per trajectory efficient.

@rabernat
Copy link
Contributor

rabernat commented Feb 1, 2022

For working with Lagrangian data (which we do a lot in our lab), I have always preferred Pandas, due to the ability to group on any of the different attributes (float id, lat / lon, time, etc). 17 GB is not so bad if you use dask.dataframe + Parquet.

I would recommend the following approach:

  • Define 3 representative workflow tasks
  • Try to do each of them with the 3 different libraries (Xarray, Awkward, Pandas)
  • Assess on a few different metrics including
    • performance (raw speed)
    • how confusing / complicated the code is

@jpivarski
Copy link

Thanks for the cc!

It's looking more and more like the ideal on-disk format is Parquet. pyarrow support for converting rich data types between Arrow and Parquet is getting better and better, and in pyarrow 6, Awkward 2, we carry over enough metadata to do lossless Awkward ↔ Arrow ↔ Parquet round-trips.

Also, we've been thinking/hoping to add a Zarr v3 extension (zarr-developers/zarr-specs#62), though this would require the Awkward library to interpret a Zarr dataset as ragged. An advantage of Parquet is that the raggedness concept is built into the file format—any other library reading it would interpret it correctly.

We're in the midst of developing a Dask collection for Awkward Arrays (https://github.com/ContinuumIO/dask-awkward/) and would be eager to help you with your use-case as a way to get feedback for our development. I'm on https://gitter.im/pangeo-data/Lobby and https://discourse.pangeo.io/ .

@philippemiron
Copy link
Author

philippemiron commented Mar 23, 2022

Hi @rabernat,

We are still testing out file format and actually putting together a Notebook for the EarthCube annual meaning benchmarking typical workflow tasks with xarray, pandas, and awkward array. We are now wondering if it will be possible to host the dataset on the Pangeo Cloud. Currently, I have a set of preprocessing tools that:

With those tools, I can create the ragged array as follow:

from os.path import join, isfile
from preprocess import create_ragged_array
import urllib.request

# download the data
list_id = [101143, 101144, 101509, 101510, 101511]  # subset of the 17324 files

input_url = (
    'ftp://ftp.aoml.noaa.gov/phod/pub/lumpkin/hourly/v2.00/netcdf/'
)

folder =  'raw/'  # output folder
files = []
for i in list_id:
    file = f'drifter_{i}.nc'
    url = join(input_url, file)
    
    if not isfile(join(folder, file)):
        req = urllib.request.urlretrieve(url, join(folder, file))
        
    files.append(join(folder, file))

# ragged array
ds = create_ragged_array(files).to_xarray()
ds.to_netcdf('array.nc')

My guess/hope is that this could possibly be converted to a pangeo-forge recipe but to be honest I'm not totally sure how to proceed (and if it's possible) with the current model.

@rabernat
Copy link
Contributor

@philippemiron - we would love to help support this use case. If the data can be stored as netCDF, then we can make it work here (with zarr).

However, I'm confused about what this ragged array looks like in xarray? My understanding is that xarray does not support ragged arrays. Can you share the xarray repr of ds as generated by your code?

@philippemiron
Copy link
Author

philippemiron commented Mar 24, 2022

See here. I'm also working with @jpivarski on a parquet representation that would be handled by awkward array.

@rabernat
Copy link
Contributor

Could you please just do print(ds) and copy-paste the results on this issue?

@philippemiron
Copy link
Author

<xarray.Dataset>
Dimensions:                (traj: 5, obs: 10345)
Coordinates:
    ID                     (traj) int64 ...
    longitude              (obs) float32 ...
    latitude               (obs) float32 ...
    time                   (obs) datetime64[ns] ...
    ids                    (obs) int64 ...
Dimensions without coordinates: traj, obs
Data variables: (12/54)
    rowsize                (traj) int64 ...
    location_type          (traj) bool ...
    WMO                    (traj) int32 ...
    expno                  (traj) int32 ...
    deploy_date            (traj) datetime64[ns] ...
    deploy_lon             (traj) float32 ...
    ...                     ...
    err_sst                (obs) float32 ...
    err_sst1               (obs) float32 ...
    err_sst2               (obs) float32 ...
    flg_sst                (obs) int8 ...
    flg_sst1               (obs) int8 ...
    flg_sst2               (obs) int8 ...
Attributes: (12/15)
    title:             Global Drifter Program hourly drifting buoy collection
    history:           Version 2.00.  Metadata from dirall.dat and deplog.dat
    Conventions:       CF-1.6
    date_created:      2022-03-24T11:31:24.444885
    publisher_name:    GDP Drifter DAC
    publisher_email:   aoml.dftr@noaa.gov
    ...                ...
    metadata_link:     https://www.aoml.noaa.gov/phod/dac/dirall.html
    contributor_name:  NOAA Global Drifter Program
    contributor_role:  Data Acquisition Center
    institution:       NOAA Atlantic Oceanographic and Meteorological Laboratory
    acknowledgement:   Elipot et al. (2022) to be submitted. Elipot et al. (2...
    summary:           Global Drifter Program hourly data

@dhruvbalwada
Copy link
Contributor

dhruvbalwada commented Mar 24, 2022

I am very interested in how this gets solved.

I just wanted to raise the question of whether this is the best/good format to save the data in?
It looks like instead of using a ragged array, the dataset is stored as a single vector (obs). This is the same approach that Argopy (https://argopy.readthedocs.io/en/latest/data_manipulation.html) took with its point data set (@gmaze). When Argopy goes to their "profile" dataset, they basically fill the dataset with a lot of NaNs.
A single 'obs' vector is definitely the simplest and most intuitive format, and the separation into different drifters will have to be done by the analyst in code or by going to a "trajectory" dataset (like "profile"). Does the parquet representation handled by awkward array take care of that in a smarter way? and if so, then maybe Argopy can also benefit.

@philippemiron
Copy link
Author

Hi @dhruvbalwada,

The variables are indeed each stored into a 1-d ragged array of size ['obs']. The separation can be easily done using the rowsize['traj'] variable that contains the number of ['obs'] per trajectory. The main advantage of using awkward array is that the indexing is already taken care of by the library, e.g. sst[400] returns the sea surface temperature along trajectory #401. I believe it is worth investigating as it can make the implementation of operations per trajectory simpler.

@dhruvbalwada
Copy link
Contributor

dhruvbalwada commented Mar 24, 2022

Oh wow, that sounds amazing! Can't wait to play with this.
Maybe Argopy (@gmaze) would also like to move to this format in future releases, learning from the experience you are having here.

I am sure the pros will have some better comments, but the way the xarray is formatted it seems like it should be little trouble getting it into Pangeo Forge.
One question might be, how does one go about chunking this dataset? If there are some better approaches than just chopping it arbitrarily.
Have you tried following the tutorial to see if you can create a recipe out of this, it seems like most of the elements are already there in the code you used to create the ragged array. Is there a specific step in making the recipe where you are getting stuck?

@philippemiron
Copy link
Author

To perform operations per trajectory, you can chunk it like this:

# align chunks of the dask array with the trajectories
chunk_settings = {'obs': tuple(rowsize.tolist())}
ds = xr.open_dataset(path_gdp, chunks=chunk_settings)

and then use map_blocks() and apply_ufunc().

@rabernat
Copy link
Contributor

After examining the code and the ouput dataset, I believe it should be possible to produce this dataset using a standard Pangeo Forge XarrayZarrRecipe. You are basically just concatenating all of the netcdf files along the obs dimension, as we usually do with time dimensions. The only complication would be how to handle the traj dimension, which has a different (smaller) shape.

We need to play around with this a bit, but I don't see any fundamental blockers here.

@philippemiron
Copy link
Author

philippemiron commented Mar 24, 2022

I'm not 100% certain of what is the issue. I managed to create the XarrayZarrRecipe:

recipe = XarrayZarrRecipe(pattern,
                          target_chunks={'obs': tuple(obs_per_file)},
                          process_chunk=preprocess_gdp,
                          xarray_open_kwargs={'decode_times':False})
recipe

XarrayZarrRecipe(file_pattern=<FilePattern {'obs': 5}>, inputs_per_chunk=1, target_chunks={'obs': (417, 2005, 1970, 1307, 4646)}, target=None, input_cache=None, metadata_cache=None, cache_inputs=True, copy_input_to_local_file=False, consolidate_zarr=True, consolidate_dimension_coordinates=True, xarray_open_kwargs={'decode_times': False}, xarray_concat_kwargs={}, delete_input_encoding=True, process_input=None, process_chunk=<function preprocess_gdp at 0x13f0a1750>, lock_timeout=None, subset_inputs={}, open_input_with_fsspec_reference=False)

But then when trying to retrieve one chunk, I get the following error:

with recipe.open_chunk(all_chunks[0]) as ds:
    display(ds)

ValueError: Chunks do not add up to shape. Got chunks=((417, 2005, 1970, 1307, 4646),), shape=(417,)

I will give it a second try over the weekend and report back on Monday.

@rabernat
Copy link
Contributor

I will try to spend some time playing around with it.

@jpivarski
Copy link

A single 'obs' vector is definitely the simplest and most intuitive format, and the separation into different drifters will have to be done by the analyst in code or by going to a "trajectory" dataset (like "profile"). Does the parquet representation handled by awkward array take care of that in a smarter way? and if so, then maybe Argopy can also benefit.

Parquet does that in a smarter way: the construction of a ragged array from rowsize and obs is built into the Parquet format. It knows about ragged arrays, but also nested records, missing data, heterogenous unions, etc.

>>> import awkward._v2 as ak
>>> dataset = ak.from_parquet("global-drifter/global-drifter-0000.parquet", "obs.l*itude")
>>> dataset.show(limit_rows=6)
{obs: [{longitude: 119, latitude: 21.2}, ..., {longitude: 115, ...}]},
{obs: [{longitude: -67.3, latitude: 41.9}, ..., {longitude: -52.6, ...}]},
{obs: [{longitude: -50, latitude: -32.5}, ..., {longitude: -51.8, ...}]},
...,
{obs: [{longitude: 177, latitude: 56.1}, ..., {longitude: 176, ...}]},
{obs: [{longitude: -109, latitude: -55.3}, ..., {longitude: -96.4, ...}]}]
>>> print(dataset.type)
907 * {obs: var * ?{longitude: float32, latitude: float32}}
>>> ak.num(dataset)
<Array [{obs: 1488}, {obs: 4136}, ..., {obs: 44962}] type='907 * {obs: int64}'>

But since this structure is relatively simple, ragged arrays with only one level of depth, distributing them with the existing NetCDF infrastructure and building that structure on demand is pretty reasonable:

>>> import h5py
>>> import numpy as np
>>> file = h5py.File("gdp_v2.00.nc")   # a different data sample
>>> content = ak.zip({"longitude": np.asarray(file["longitude"]), "latitude": np.asarray(file["latitude"])})
>>> dataset = ak.unflatten(content, np.asarray(file["rowsize"]))
>>> dataset.show(limit_rows=6)
[{longitude: -17.7, latitude: 14.7}, {...}, ..., {longitude: -16.9, ...}],
[{longitude: -17.3, latitude: 14.5}, {...}, ..., {longitude: -17.3, ...}],
[{longitude: 136, latitude: 10}, {...}, ..., {longitude: 125, latitude: 13.7}],
...,
[{longitude: -25.7, latitude: 65}, {...}, ..., {...}, {longitude: -30.2, ...}],
[{longitude: -24.9, latitude: 64.8}, {...}, ..., {longitude: -30.4, ...}]]
>>> print(dataset.type)
17324 * var * {longitude: float32, latitude: float32}
>>> ak.num(dataset)
<Array [417, 2005, 1970, 1307, ..., 4572, 17097, 3127] type='17324 * int64'>

(The last numbers are the lengths of the trajectories.) Although Parquet does this stuff in general, this particular dataset isn't complex enough to really need it.

Actually, I'd like to get involved in this for Argopy, for exactly the same reasons that it applies to @philippemiron's project. I did some early tests converting Argo's NetCDF files into Parquet, and the Parquet versions were 20× smaller, as well as faster to load. But even there, Parquet isn't strictly needed: the same rowsize + obs trick could be applied, and I think the gains would be similar. (By decoupling a physically meaningful variable, the sequence length, from file boundaries, we reduce overhead. But the rowsize + obs trick does the same thing.)

For this particular project, the main thing that I think Awkward + Dask provides over xarray + Dask is that when you

chunk_settings = {'obs': tuple(rowsize.tolist())}
ds = xr.open_dataset(path_gdp, chunks=chunk_settings)

the chunking size for parallelization becomes tied to the physically meaningful variable, "trajectory length." Datasets with a large number of small trajectories would have a per-task overhead that you can't get rid of. Consider that the rowsize is a potentially large array; converting it to a Python tuple might not be feasible, let alone creating that many Dask tasks.

With Awkward + Dask, a single task could be run on dataset[100:200], for example, which would have 100× less per-task overhead because it applies to 100 trajectories. How large that slice is would be determined by the number of available processors, memory constraints, and maybe networking if distributed.

Meanwhile, however, I need to learn the "+ Dask" part of this to actually develop some examples. @philippemiron and I have also considered some examples with Numba, and that's a third part to consider. @douglasdavis tried an example of Awkward + Dask + Numba this morning and it worked, though it evaluated the first partition locally to determine the output type and I think we can streamline that.

@philippemiron
Copy link
Author

philippemiron commented Mar 24, 2022

I will try to spend some time playing around with it.

I've modified the preprocess function to take a xarray.Dataset as a parameter so we can pass it directly to process_chunk of XarrayZarrRecipe, see this gist. Here are the first few steps of the recipe to save you some time.

input_url_pattern = (
    "ftp://ftp.aoml.noaa.gov/phod/pub/lumpkin/hourly/"
    "v2.00/netcdf/drifter_{id}.nc"
)
list_id = [101143, 101144, 101509, 101510, 101511]
obs_per_file = [417, 2005, 1970, 1307, 4646]
input_urls = [
    input_url_pattern.format(id=id)
    for id in list_id
]
print(f"Found {len(input_urls)} files!")
input_urls
from preprocess import preprocess_gdp

recipe = XarrayZarrRecipe(pattern,
                          target_chunks={'obs': tuple(obs_per_file)},
                          process_chunk=preprocess_gdp,
                          xarray_open_kwargs={'decode_times':False})
recipe

Cheers,

@rabernat
Copy link
Contributor

I just got an update from @selipot about this

We have derived a good format for one of the datasets of the NOAA GDP (the hourly dataset, now with SST). Now, as we planned, could we get this dataset up on the Pangeo catalog?

The dataset is not big in itself (17GB as NetCDF or ~6.2 GB as parquet) but I think its presence in a cloud catalog will be beneficial as it lays next to other Eulerian big datasets. If you agree, how can we make this happen? The initial dataset is available right now from NOAA NCEI as a single NetCDF file (see https://github.com/Cloud-Drift/gdp-get-started), but we have a better version we would like to share with Pangeo.

I am copying Philippe Miron to this email since he led the efforts that made this happened. By the way, we just released officially the first version of our clouddrift library (https://github.com/Cloud-Drift/clouddrift) which contains the tools and recipe that were used to create this dataset.

Sounds like we are ready to move ahead with creating a recipe. The dataset is just a single netCDF file at

https://www.nodc.noaa.gov/archive/arc0199/0248584/1.1/data/0-data/gdp_v2.00.nc

@rabernat
Copy link
Contributor

@selipot - I browsed around the website, but I couldn't find any information about the GDP data license. Without a clear license, we may be in violation of copyright law if we host the data. Can you say any more about the license under which GDP is distributed?

@rabernat rabernat linked a pull request Sep 23, 2022 that will close this issue
@selipot
Copy link

selipot commented Sep 23, 2022

Working on finding this out with with the GDP. I suspect it would be what the license is for NOAA data which is unknown to me. Again reaching out to NOAA.

@selipot
Copy link

selipot commented Sep 23, 2022

From NOAA and NCEI: All environmental data managed/archived at NCEI is publicly available without restriction. NCEI doesn't currently employ a data usage license.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants