Skip to content

Commit

Permalink
weather-* scripts default to runtime container image. (#355)
Browse files Browse the repository at this point in the history
* Removed Extra Packages

* Added Flag To Use Local Code.

* Updated README.md files

* Version Bump

---------

Co-authored-by: Darshan <darshan.prajapati@infocusp.com>
  • Loading branch information
DarshanSP19 and Darshan authored Jul 11, 2023
1 parent 952631d commit b5542c4
Show file tree
Hide file tree
Showing 14 changed files with 101 additions and 22 deletions.
13 changes: 13 additions & 0 deletions weather_dl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ _Common options_:
that partitions will be processed in sequential order of each config; 'fair' means that partitions from each config
will be interspersed evenly. Note: When using 'fair' scheduling, we recommend you set the '--partition-chunks' to a
much smaller number. Default: 'in-order'.
* `--log-level`: An integer to configure log level. Default: 2(INFO).
* `--use-local-code`: Supply local code to the Runner. Default: False.

> Note:
> * In case of BigQuery manifest tool will create the BQ table itself, if not already present.
Expand Down Expand Up @@ -93,6 +95,17 @@ weather-dl configs/mars_example_config.cfg \
--job_name $JOB_NAME
```

Using DataflowRunner and using local code for pipeline

```bash
weather-dl configs/mars_example_config.cfg \
--runner DataflowRunner \
--project $PROJECT \
--temp_location gs://$BUCKET/tmp \
--job_name $JOB_NAME \
--use-local-code
```

Using the DataflowRunner and specifying 3 requests per license

```bash
Expand Down
1 change: 1 addition & 0 deletions weather_dl/download_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def run(argv: t.List[str], save_main_session: bool = True) -> PipelineArgs:
help="Update the manifest for the already downloaded shards and exit. Default: 'false'.")
parser.add_argument('--log-level', type=int, default=2,
help='An integer to configure log level. Default: 2(INFO)')
parser.add_argument('--use-local-code', action='store_true', default=False, help='Supply local code to the Runner.')

known_args, pipeline_args = parser.parse_known_args(argv[1:])

Expand Down
3 changes: 2 additions & 1 deletion weather_dl/download_pipeline/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@
schedule='in-order',
check_skip_in_dry_run=False,
update_manifest=False,
log_level=20),
log_level=20,
use_local_code=False),
pipeline_options=PipelineOptions('--save_main_session True'.split()),
configs=[Config.from_dict(CONFIG)],
client_name='cds',
Expand Down
2 changes: 1 addition & 1 deletion weather_dl/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
setup(
name='download_pipeline',
packages=find_packages(),
version='0.1.19',
version='0.1.20',
author='Anthromets',
author_email='anthromets-ecmwf@google.com',
url='https://weather-tools.readthedocs.io/en/latest/weather_dl/',
Expand Down
22 changes: 16 additions & 6 deletions weather_dl/weather-dl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import tempfile

import weather_dl

SDK_CONTAINER_IMAGE='gcr.io/weather-tools-prod/weather-tools:0.0.0'

if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)

Expand All @@ -48,10 +50,16 @@ if __name__ == '__main__':
from download_pipeline import cli
except ImportError as e:
raise ImportError('please re-install package in a clean python environment.') from e

if '-h' in sys.argv or '--help' in sys.argv or len(sys.argv) == 1:
cli()
else:

args = []

if "DataflowRunner" in sys.argv and "--sdk_container_image" not in sys.argv:
args.extend(['--sdk_container_image',
os.getenv('SDK_CONTAINER_IMAGE', SDK_CONTAINER_IMAGE),
'--experiments',
'use_runner_v2'])

if "--use-local-code" in sys.argv:
with tempfile.TemporaryDirectory() as tmpdir:
original_dir = os.getcwd()

Expand All @@ -72,5 +80,7 @@ if __name__ == '__main__':
# cleanup memory to prevent pickling error.
tar = None
weather_dl = None

cli(['--extra_package', pkg_archive])
args.extend(['--extra_package', pkg_archive])
cli(args)
else:
cli(args)
15 changes: 15 additions & 0 deletions weather_mv/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ _Common options_
* `--num_shards`: Number of shards to use when writing windowed elements to cloud storage. Only used with the `topic`
flag. Default: 5 shards.
* `-d, --dry-run`: Preview the load into BigQuery. Default: off.
* `--log-level`: An integer to configure log level. Default: 2(INFO).
* `--use-local-code`: Supply local code to the Runner. Default: False.

Invoke with `-h` or `--help` to see the full range of options.

Expand Down Expand Up @@ -182,6 +184,19 @@ weather-mv bq --uris "gs://your-bucket/*.nc" \
--job_name $JOB_NAME
```

Using DataflowRunner and using local code for pipeline

```bash
weather-mv bq --uris "gs://your-bucket/*.nc" \
--output_table $PROJECT.$DATASET_ID.$TABLE_ID \
--runner DataflowRunner \
--project $PROJECT \
--region $REGION \
--temp_location "gs://$BUCKET/tmp" \
--job_name $JOB_NAME \
--use-local-code
```

For a full list of how to configure the Dataflow pipeline, please review
[this table](https://cloud.google.com/dataflow/docs/reference/pipeline-options).

Expand Down
2 changes: 2 additions & 0 deletions weather_mv/loader_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from .streaming import GroupMessagesByFixedWindows, ParsePaths

logger = logging.getLogger(__name__)
SDK_CONTAINER_IMAGE='gcr.io/weather-tools-prod/weather-tools:0.0.0'


def configure_logger(verbosity: int) -> None:
Expand Down Expand Up @@ -113,6 +114,7 @@ def run(argv: t.List[str]) -> t.Tuple[argparse.Namespace, t.List[str]]:
help='Preview the weather-mv job. Default: off')
base.add_argument('--log-level', type=int, default=2,
help='An integer to configure log level. Default: 2(INFO)')
base.add_argument('--use-local-code', action='store_true', default=False, help='Supply local code to the Runner.')

subparsers = parser.add_subparsers(help='help for subcommand', dest='subcommand')

Expand Down
1 change: 1 addition & 0 deletions weather_mv/loader_pipeline/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def setUp(self) -> None:
'zarr': False,
'zarr_kwargs': {},
'log_level': 2,
'use_local_code': False,
'skip_creating_polygon': False,
}

Expand Down
2 changes: 1 addition & 1 deletion weather_mv/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
packages=find_packages(),
author='Anthromets',
author_email='anthromets-ecmwf@google.com',
version='0.2.16',
version='0.2.17',
url='https://weather-tools.readthedocs.io/en/latest/weather_mv/',
description='A tool to load weather data into BigQuery.',
install_requires=beam_gcp_requirements + base_requirements,
Expand Down
20 changes: 15 additions & 5 deletions weather_mv/weather-mv
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import tempfile

import weather_mv

SDK_CONTAINER_IMAGE='gcr.io/weather-tools-prod/weather-tools:0.0.0'

if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)

Expand All @@ -47,9 +49,15 @@ if __name__ == '__main__':
except ImportError as e:
raise ImportError('please re-install package in a clean python environment.') from e

if '-h' in sys.argv or '--help' in sys.argv or len(sys.argv) == 1:
cli()
else:
args = []

if "DataflowRunner" in sys.argv and "--sdk_container_image" not in sys.argv:
args.extend(['--sdk_container_image',
os.getenv('SDK_CONTAINER_IMAGE', SDK_CONTAINER_IMAGE),
'--experiments',
'use_runner_v2'])

if "--use-local-code" in sys.argv:
with tempfile.TemporaryDirectory() as tmpdir:
original_dir = os.getcwd()

Expand All @@ -70,5 +78,7 @@ if __name__ == '__main__':
# cleanup memory to prevent pickling error.
tar = None
weather_mv = None

cli(['--extra_package', pkg_archive])
args.extend(['--extra_package', pkg_archive])
cli(args)
else:
cli(args)
15 changes: 15 additions & 0 deletions weather_sp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ _Common options_:
using Python formatting, see [Output section](#output) below.
* `-f, --force`: Force re-splitting of the pipeline. Turns of skipping of already split data.
* `-d, --dry-run`: Test the input file matching and the output file scheme without splitting.
* `--log-level`: An integer to configure log level. Default: 2(INFO).
* `--use-local-code`: Supply local code to the Runner. Default: False.

Invoke with `-h` or `--help` to see the full range of options.

Expand Down Expand Up @@ -59,6 +61,19 @@ weather-sp --input-pattern 'gs://test-tmp/era5/2015/**' \
--job_name $JOB_NAME
```

Using DataflowRunner and using local code for pipeline

```bash
weather-sp --input-pattern 'gs://test-tmp/era5/2015/**' \
--output-dir 'gs://test-tmp/era5/splits'
--formatting '.{typeOfLevel}' \
--runner DataflowRunner \
--project $PROJECT \
--temp_location gs://$BUCKET/tmp \
--job_name $JOB_NAME \
--use-local-code
```

Using ecCodes-powered grib splitting on Dataflow (this is often more robust, especially when splitting multiple
dimensions at once):

Expand Down
2 changes: 1 addition & 1 deletion weather_sp/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
packages=find_packages(),
author='Anthromets',
author_email='anthromets-ecmwf@google.com',
version='0.3.0',
version='0.3.1',
url='https://weather-tools.readthedocs.io/en/latest/weather_sp/',
description='A tool to split weather data files into per-variable files.',
install_requires=beam_gcp_requirements + base_requirements,
Expand Down
2 changes: 2 additions & 0 deletions weather_sp/splitter_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from .file_splitters import get_splitter

logger = logging.getLogger(__name__)
SDK_CONTAINER_IMAGE='gcr.io/weather-tools-prod/weather-tools:0.0.0'


def configure_logger(verbosity: int) -> None:
Expand Down Expand Up @@ -88,6 +89,7 @@ def run(argv: t.List[str], save_main_session: bool = True):
)
parser.add_argument('-i', '--input-pattern', type=str, required=True,
help='Pattern for input weather data.')
parser.add_argument('--use-local-code', action='store_true', default=False, help='Supply local code to the Runner.')
output_options = parser.add_mutually_exclusive_group(required=True)
output_options.add_argument(
'--output-template', type=str,
Expand Down
23 changes: 16 additions & 7 deletions weather_sp/weather-sp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import tempfile

import weather_sp

SDK_CONTAINER_IMAGE='gcr.io/weather-tools-prod/weather-tools:0.0.0'

if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)

Expand Down Expand Up @@ -50,9 +52,15 @@ if __name__ == '__main__':
raise ImportError(
'please re-install package in a clean python environment.') from e

if '-h' in sys.argv or '--help' in sys.argv or len(sys.argv) == 1:
cli()
else:
args = []

if "DataflowRunner" in sys.argv and "--sdk_container_image" not in sys.argv:
args.extend(['--sdk_container_image',
os.getenv('SDK_CONTAINER_IMAGE', SDK_CONTAINER_IMAGE),
'--experiments',
'use_runner_v2'])

if "--use-local-code" in sys.argv:
with tempfile.TemporaryDirectory() as tmpdir:
original_dir = os.getcwd()

Expand All @@ -68,11 +76,12 @@ if __name__ == '__main__':
pkg_archive = glob.glob(os.path.join(tmpdir, '*.tar.gz'))[0]

with tarfile.open(pkg_archive, 'r') as tar:
assert any([f.endswith('.py') for f in
tar.getnames()]), 'extra_package must include python files!'
assert any([f.endswith('.py') for f in tar.getnames()]), 'extra_package must include python files!'

# cleanup memory to prevent pickling error.
tar = None
weather_sp = None

cli(['--extra_package', pkg_archive])
args.extend(['--extra_package', pkg_archive])
cli(args)
else:
cli(args)

0 comments on commit b5542c4

Please sign in to comment.