Skip to content

Commit

Permalink
Add: extract specific date's data from any files. (#402)
Browse files Browse the repository at this point in the history
  • Loading branch information
dabhicusp authored Oct 10, 2023
1 parent 765a40c commit 3bd6183
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 11 deletions.
10 changes: 10 additions & 0 deletions weather_mv/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,16 @@ weather-mv bq --uris "gs://your-bucket/*.zarr" \
--direct_num_workers 2
```

Upload a specific date range's data from the file:

```bash
weather-mv bq --uris "gs://your-bucket/*.nc" \
--output_table $PROJECT.$DATASET_ID.$TABLE_ID \
--temp_location "gs://$BUCKET/tmp" \
--use-local-code \
--xarray_open_dataset_kwargs '{"start_date": "2021-07-18", "end_date": "2021-07-19"}' \
```

Control how weather data is opened with XArray:

```bash
Expand Down
10 changes: 4 additions & 6 deletions weather_mv/loader_pipeline/bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ class ToBigQuery(ToDataSink):
skip_creating_polygon: bool = False
lat_grid_resolution: t.Optional[float] = None
lon_grid_resolution: t.Optional[float] = None
start_date: t.Optional[str] = None
end_date: t.Optional[str] = None

@classmethod
def add_parser_arguments(cls, subparser: argparse.ArgumentParser):
Expand Down Expand Up @@ -175,8 +173,6 @@ def __post_init__(self):
"""Initializes Sink by creating a BigQuery table based on user input."""
if self.zarr:
self.xarray_open_dataset_kwargs = self.zarr_kwargs
self.start_date = self.zarr_kwargs.get('start_date')
self.end_date = self.zarr_kwargs.get('end_date')
with open_dataset(self.first_uri, self.xarray_open_dataset_kwargs,
self.disable_grib_schema_normalization, self.tif_metadata_for_start_time,
self.tif_metadata_for_end_time, is_zarr=self.zarr) as open_ds:
Expand Down Expand Up @@ -316,10 +312,12 @@ def expand(self, paths):
else:
xarray_open_dataset_kwargs = self.xarray_open_dataset_kwargs.copy()
xarray_open_dataset_kwargs.pop('chunks')
start_date = xarray_open_dataset_kwargs.pop('start_date', None)
end_date = xarray_open_dataset_kwargs.pop('end_date', None)
ds, chunks = xbeam.open_zarr(self.first_uri, **xarray_open_dataset_kwargs)

if self.start_date is not None and self.end_date is not None:
ds = ds.sel(time=slice(self.start_date, self.end_date))
if start_date is not None and end_date is not None:
ds = ds.sel(time=slice(start_date, end_date))

ds.attrs[DATA_URI_COLUMN] = self.first_uri
extracted_rows = (
Expand Down
16 changes: 11 additions & 5 deletions weather_mv/loader_pipeline/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,11 +407,15 @@ def open_dataset(uri: str,
is_zarr: bool = False) -> t.Iterator[xr.Dataset]:
"""Open the dataset at 'uri' and return a xarray.Dataset."""
try:
local_open_dataset_kwargs = start_date = end_date = None
if open_dataset_kwargs is not None:
local_open_dataset_kwargs = open_dataset_kwargs.copy()
start_date = local_open_dataset_kwargs.pop('start_date', None)
end_date = local_open_dataset_kwargs.pop('end_date', None)

if is_zarr:
if open_dataset_kwargs is not None:
start_date = open_dataset_kwargs.pop('start_date', None)
end_date = open_dataset_kwargs.pop('end_date', None)
ds: xr.Dataset = _add_is_normalized_attr(xr.open_dataset(uri, engine='zarr', **open_dataset_kwargs), False)
ds: xr.Dataset = _add_is_normalized_attr(xr.open_dataset(uri, engine='zarr',
**local_open_dataset_kwargs), False)
if start_date is not None and end_date is not None:
ds = ds.sel(time=slice(start_date, end_date))
beam.metrics.Metrics.counter('Success', 'ReadNetcdfData').inc()
Expand All @@ -423,7 +427,9 @@ def open_dataset(uri: str,
xr_dataset: xr.Dataset = __open_dataset_file(local_path,
uri_extension,
disable_grib_schema_normalization,
open_dataset_kwargs)
local_open_dataset_kwargs)
if start_date is not None and end_date is not None:
xr_dataset = xr_dataset.sel(time=slice(start_date, end_date))
if uri_extension in ['.tif', '.tiff']:
xr_dataset = _preprocess_tif(xr_dataset,
local_path,
Expand Down

0 comments on commit 3bd6183

Please sign in to comment.