Skip to content

Commit

Permalink
Merge branch 'main' into mv-group-common-hypercubes
Browse files Browse the repository at this point in the history
  • Loading branch information
j9sh264 committed Oct 17, 2023
2 parents 437fc58 + 3bd6183 commit 943cae0
Show file tree
Hide file tree
Showing 34 changed files with 766 additions and 197 deletions.
42 changes: 20 additions & 22 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,42 +84,40 @@ jobs:
echo "::set-output name=dir::$(pip cache dir)"
- name: Install linter
run: |
pip install ruff
pip install ruff==0.0.280
- name: Lint project
run: ruff check .
type-check:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ["3.8"]
steps:
- name: Cancel previous
uses: styfle/cancel-workflow-action@0.7.0
with:
access_token: ${{ github.token }}
if: ${{github.ref != 'refs/head/main'}}
- uses: actions/checkout@v2
- name: Set up Python 3.8
uses: actions/setup-python@v2
- name: conda cache
uses: actions/cache@v2
env:
# Increase this value to reset cache if etc/example-environment.yml has not changed
CACHE_NUMBER: 0
with:
python-version: "3.8"
- name: Setup conda
uses: s-weigand/setup-conda@v1
path: ~/conda_pkgs_dir
key:
${{ runner.os }}-conda-${{ env.CACHE_NUMBER }}-${{ matrix.python-version }}-${{ hashFiles('ci3.8.yml') }}
- name: Setup conda environment
uses: conda-incubator/setup-miniconda@v2
with:
update-conda: true
python-version: "3.8"
conda-channels: anaconda, conda-forge
- name: Install ecCodes
run: |
conda install -y eccodes>=2.21.0 -c conda-forge
conda install -y pyproj -c conda-forge
conda install -y gdal -c conda-forge
- name: Get pip cache dir
id: pip-cache
run: |
python -m pip install --upgrade pip wheel
echo "::set-output name=dir::$(pip cache dir)"
- name: Install weather-tools
python-version: ${{ matrix.python-version }}
channels: conda-forge
environment-file: ci${{ matrix.python-version}}.yml
activate-environment: weather-tools
- name: Install weather-tools[test]
run: |
pip install -e .[test] --use-deprecated=legacy-resolver
conda run -n weather-tools pip install -e .[test] --use-deprecated=legacy-resolver
- name: Run type checker
run: pytype
run: conda run -n weather-tools pytype
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ ARG weather_tools_git_rev=main
RUN git clone https://github.com/google/weather-tools.git /weather
WORKDIR /weather
RUN git checkout "${weather_tools_git_rev}"
RUN rm -r /weather/weather_*/test_data/
RUN conda env create -f environment.yml --debug

# Activate the conda env and update the PATH
Expand Down
4 changes: 3 additions & 1 deletion ci3.8.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ dependencies:
- requests=2.28.1
- netcdf4=1.6.1
- rioxarray=0.13.4
- xarray-beam=0.3.1
- xarray-beam=0.6.2
- ecmwf-api-client=1.6.3
- fsspec=2022.11.0
- gcsfs=2022.11.0
Expand All @@ -33,6 +33,8 @@ dependencies:
- ruff==0.0.260
- google-cloud-sdk=410.0.0
- aria2=1.36.0
- zarr=2.15.0
- pip:
- cython==0.29.34
- earthengine-api==0.1.329
- .[test]
4 changes: 3 additions & 1 deletion ci3.9.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ dependencies:
- requests=2.28.1
- netcdf4=1.6.1
- rioxarray=0.13.4
- xarray-beam=0.3.1
- xarray-beam=0.6.2
- ecmwf-api-client=1.6.3
- fsspec=2022.11.0
- gcsfs=2022.11.0
Expand All @@ -33,6 +33,8 @@ dependencies:
- aria2=1.36.0
- xarray==2023.1.0
- ruff==0.0.260
- zarr=2.15.0
- pip:
- cython==0.29.34
- earthengine-api==0.1.329
- .[test]
4 changes: 3 additions & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ channels:
dependencies:
- python=3.8.13
- apache-beam=2.40.0
- xarray-beam=0.3.1
- xarray-beam=0.6.2
- xarray=2023.1.0
- fsspec=2022.11.0
- gcsfs=2022.11.0
Expand All @@ -25,7 +25,9 @@ dependencies:
- google-cloud-sdk=410.0.0
- aria2=1.36.0
- pip=22.3
- zarr=2.15.0
- pip:
- cython==0.29.34
- earthengine-api==0.1.329
- firebase-admin==6.0.1
- .
Expand Down
6 changes: 4 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@
"earthengine-api>=0.1.263",
"pyproj", # requires separate binary installation!
"gdal", # requires separate binary installation!
"xarray-beam==0.3.1",
"xarray-beam==0.6.2",
"gcsfs==2022.11.0",
"zarr==2.15.0",
]

weather_sp_requirements = [
Expand All @@ -82,6 +83,7 @@
"memray",
"pytest-memray",
"h5py",
"pooch",
]

all_test_requirements = beam_gcp_requirements + weather_dl_requirements + \
Expand Down Expand Up @@ -115,7 +117,7 @@

],
python_requires='>=3.8, <3.10',
install_requires=['apache-beam[gcp]==2.40.0'],
install_requires=['apache-beam[gcp]==2.40.0', 'gcsfs==2022.11.0'],
use_scm_version=True,
setup_requires=['setuptools_scm'],
scripts=['weather_dl/weather-dl', 'weather_mv/weather-mv', 'weather_sp/weather-sp'],
Expand Down
13 changes: 13 additions & 0 deletions weather_dl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ _Common options_:
that partitions will be processed in sequential order of each config; 'fair' means that partitions from each config
will be interspersed evenly. Note: When using 'fair' scheduling, we recommend you set the '--partition-chunks' to a
much smaller number. Default: 'in-order'.
* `--log-level`: An integer to configure log level. Default: 2(INFO).
* `--use-local-code`: Supply local code to the Runner. Default: False.

> Note:
> * In case of BigQuery manifest tool will create the BQ table itself, if not already present.
Expand Down Expand Up @@ -93,6 +95,17 @@ weather-dl configs/mars_example_config.cfg \
--job_name $JOB_NAME
```

Using DataflowRunner and using local code for pipeline

```bash
weather-dl configs/mars_example_config.cfg \
--runner DataflowRunner \
--project $PROJECT \
--temp_location gs://$BUCKET/tmp \
--job_name $JOB_NAME \
--use-local-code
```

Using the DataflowRunner and specifying 3 requests per license

```bash
Expand Down
3 changes: 3 additions & 0 deletions weather_dl/download_pipeline/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,9 @@ def set_stage(self, stage: Stage) -> None:

if stage == Stage.FETCH:
new_status.fetch_start_time = current_utc_time
new_status.fetch_end_time = None
new_status.download_start_time = None
new_status.download_end_time = None
elif stage == Stage.RETRIEVE:
new_status.retrieve_start_time = current_utc_time
elif stage == Stage.DOWNLOAD:
Expand Down
1 change: 1 addition & 0 deletions weather_dl/download_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def run(argv: t.List[str], save_main_session: bool = True) -> PipelineArgs:
help="Update the manifest for the already downloaded shards and exit. Default: 'false'.")
parser.add_argument('--log-level', type=int, default=2,
help='An integer to configure log level. Default: 2(INFO)')
parser.add_argument('--use-local-code', action='store_true', default=False, help='Supply local code to the Runner.')

known_args, pipeline_args = parser.parse_known_args(argv[1:])

Expand Down
3 changes: 2 additions & 1 deletion weather_dl/download_pipeline/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@
schedule='in-order',
check_skip_in_dry_run=False,
update_manifest=False,
log_level=20),
log_level=20,
use_local_code=False),
pipeline_options=PipelineOptions('--save_main_session True'.split()),
configs=[Config.from_dict(CONFIG)],
client_name='cds',
Expand Down
2 changes: 1 addition & 1 deletion weather_dl/download_pipeline/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def to_json_serializable_type(value: t.Any) -> t.Any:
elif type(value) == np.ndarray:
# Will return a scaler if array is of size 1, else will return a list.
return value.tolist()
elif type(value) == datetime.datetime or type(value) == str or type(value) == np.datetime64:
elif isinstance(value, datetime.datetime) or isinstance(value, str) or isinstance(value, np.datetime64):
# Assume strings are ISO format timestamps...
try:
value = datetime.datetime.fromisoformat(value)
Expand Down
2 changes: 1 addition & 1 deletion weather_dl/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
setup(
name='download_pipeline',
packages=find_packages(),
version='0.1.19',
version='0.1.20',
author='Anthromets',
author_email='anthromets-ecmwf@google.com',
url='https://weather-tools.readthedocs.io/en/latest/weather_dl/',
Expand Down
22 changes: 16 additions & 6 deletions weather_dl/weather-dl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import tempfile

import weather_dl

SDK_CONTAINER_IMAGE='gcr.io/weather-tools-prod/weather-tools:0.0.0'

if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)

Expand All @@ -48,10 +50,16 @@ if __name__ == '__main__':
from download_pipeline import cli
except ImportError as e:
raise ImportError('please re-install package in a clean python environment.') from e

if '-h' in sys.argv or '--help' in sys.argv or len(sys.argv) == 1:
cli()
else:

args = []

if "DataflowRunner" in sys.argv and "--sdk_container_image" not in sys.argv:
args.extend(['--sdk_container_image',
os.getenv('SDK_CONTAINER_IMAGE', SDK_CONTAINER_IMAGE),
'--experiments',
'use_runner_v2'])

if "--use-local-code" in sys.argv:
with tempfile.TemporaryDirectory() as tmpdir:
original_dir = os.getcwd()

Expand All @@ -72,5 +80,7 @@ if __name__ == '__main__':
# cleanup memory to prevent pickling error.
tar = None
weather_dl = None

cli(['--extra_package', pkg_archive])
args.extend(['--extra_package', pkg_archive])
cli(args)
else:
cli(args)
72 changes: 68 additions & 4 deletions weather_mv/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ _Common options_
* `--num_shards`: Number of shards to use when writing windowed elements to cloud storage. Only used with the `topic`
flag. Default: 5 shards.
* `-d, --dry-run`: Preview the load into BigQuery. Default: off.
* `--log-level`: An integer to configure log level. Default: 2(INFO).
* `--use-local-code`: Supply local code to the Runner. Default: False.

Invoke with `-h` or `--help` to see the full range of options.

Expand All @@ -59,8 +61,9 @@ usage: weather-mv bigquery [-h] -i URIS [--topic TOPIC] [--window_size WINDOW_SI
-o OUTPUT_TABLE [-v variables [variables ...]] [-a area [area ...]]
[--import_time IMPORT_TIME] [--infer_schema]
[--xarray_open_dataset_kwargs XARRAY_OPEN_DATASET_KWARGS]
[--tif_metadata_for_datetime TIF_METADATA_FOR_DATETIME] [-s]
[--coordinate_chunk_size COORDINATE_CHUNK_SIZE]
[--tif_metadata_for_start_time TIF_METADATA_FOR_START_TIME]
[--tif_metadata_for_end_time TIF_METADATA_FOR_END_TIME] [-s]
[--coordinate_chunk_size COORDINATE_CHUNK_SIZE] ['--skip_creating_polygon']
```

The `bigquery` subcommand loads weather data into BigQuery. In addition to the common options above, users may specify
Expand All @@ -78,9 +81,13 @@ _Command options_:
* `--xarray_open_dataset_kwargs`: Keyword-args to pass into `xarray.open_dataset()` in the form of a JSON string.
* `--coordinate_chunk_size`: The size of the chunk of coordinates used for extracting vector data into BigQuery. Used to
tune parallel uploads.
* `--tif_metadata_for_datetime` : Metadata that contains tif file's timestamp. Applicable only for tif files.
* `--tif_metadata_for_start_time` : Metadata that contains tif file's start/initialization time. Applicable only for tif files.
* `--tif_metadata_for_end_time` : Metadata that contains tif file's end/forecast time. Applicable only for tif files (optional).
* `-s, --skip-region-validation` : Skip validation of regions for data migration. Default: off.
* `--disable_grib_schema_normalization` : To disable grib's schema normalization. Default: off.
* `--skip_creating_polygon` : Not ingest grid points as polygons in BigQuery. Default: Ingest grid points as Polygon in
BigQuery. Note: This feature relies on the assumption that the provided grid has an equal distance between consecutive
points of latitude and longitude.

Invoke with `bq -h` or `bigquery --help` to see the full range of options.

Expand Down Expand Up @@ -117,14 +124,25 @@ weather-mv bq --uris "gs://your-bucket/*.nc" \
--dry-run
```

Ingest grid points with skip creating polygon in BigQuery:

```bash
weather-mv bq --uris "gs://your-bucket/*.nc" \
--output_table $PROJECT.$DATASET_ID.$TABLE_ID \
--temp_location "gs://$BUCKET/tmp" \ # Needed for batch writes to BigQuery
--direct_num_workers 2 \
--skip_creating_polygon
```

Load COG's (.tif) files:

```bash
weather-mv bq --uris "gs://your-bucket/*.tif" \
--output_table $PROJECT.$DATASET_ID.$TABLE_ID \
--temp_location "gs://$BUCKET/tmp" \ # Needed for batch writes to BigQuery
--direct_num_workers 2 \
--tif_metadata_for_datetime start_time
--tif_metadata_for_start_time start_time \
--tif_metadata_for_end_time end_time
```

Upload only a subset of variables:
Expand All @@ -147,6 +165,39 @@ weather-mv bq --uris "gs://your-bucket/*.nc" \
--direct_num_workers 2
```

Upload a zarr file:

```bash
weather-mv bq --uris "gs://your-bucket/*.zarr" \
--output_table $PROJECT.$DATASET_ID.$TABLE_ID \
--temp_location "gs://$BUCKET/tmp" \
--use-local-code \
--zarr \
--direct_num_workers 2
```

Upload a specific date range's data from the zarr file:

```bash
weather-mv bq --uris "gs://your-bucket/*.zarr" \
--output_table $PROJECT.$DATASET_ID.$TABLE_ID \
--temp_location "gs://$BUCKET/tmp" \
--use-local-code \
--zarr \
--zarr_kwargs '{"start_date": "2021-07-18", "end_date": "2021-07-19"}' \
--direct_num_workers 2
```

Upload a specific date range's data from the file:

```bash
weather-mv bq --uris "gs://your-bucket/*.nc" \
--output_table $PROJECT.$DATASET_ID.$TABLE_ID \
--temp_location "gs://$BUCKET/tmp" \
--use-local-code \
--xarray_open_dataset_kwargs '{"start_date": "2021-07-18", "end_date": "2021-07-19"}' \
```

Control how weather data is opened with XArray:

```bash
Expand All @@ -169,6 +220,19 @@ weather-mv bq --uris "gs://your-bucket/*.nc" \
--job_name $JOB_NAME
```

Using DataflowRunner and using local code for pipeline

```bash
weather-mv bq --uris "gs://your-bucket/*.nc" \
--output_table $PROJECT.$DATASET_ID.$TABLE_ID \
--runner DataflowRunner \
--project $PROJECT \
--region $REGION \
--temp_location "gs://$BUCKET/tmp" \
--job_name $JOB_NAME \
--use-local-code
```

For a full list of how to configure the Dataflow pipeline, please review
[this table](https://cloud.google.com/dataflow/docs/reference/pipeline-options).

Expand Down
Loading

0 comments on commit 943cae0

Please sign in to comment.