From 34099baf4e73e81976037001404bc5ec15154d13 Mon Sep 17 00:00:00 2001 From: Rahul Mahrsee <86819420+mahrsee1997@users.noreply.github.com> Date: Tue, 11 Jul 2023 12:16:11 +0530 Subject: [PATCH 1/3] Set appropriate manifest fields to None in case of errors. (#356) --- weather_dl/download_pipeline/manifest.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/weather_dl/download_pipeline/manifest.py b/weather_dl/download_pipeline/manifest.py index b07d779f..d2a72550 100644 --- a/weather_dl/download_pipeline/manifest.py +++ b/weather_dl/download_pipeline/manifest.py @@ -368,6 +368,9 @@ def set_stage(self, stage: Stage) -> None: if stage == Stage.FETCH: new_status.fetch_start_time = current_utc_time + new_status.fetch_end_time = None + new_status.download_start_time = None + new_status.download_end_time = None elif stage == Stage.RETRIEVE: new_status.retrieve_start_time = current_utc_time elif stage == Stage.DOWNLOAD: From 952631dda424a435213fc3f080b6a71785fe53d0 Mon Sep 17 00:00:00 2001 From: dabhi_cusp <123355381+dabhicusp@users.noreply.github.com> Date: Tue, 11 Jul 2023 13:18:30 +0530 Subject: [PATCH 2/3] Ingested grid point as a polygon in BigQuery. (#337) * Ingest grid points as Polygon in the BigQuery. * Refactor the code for Polygon. * Refactor code related to polygon. * Fixed fetch_geo_polygon method. * Updated grid resolution of the polygon. * Test cases added for the Polygon. * Updated code of the Polygon. * Conflicts resolved. * Updated Testcase after conflict resolve. * Cover Corner points in BQ polygon and Updated test cases. * Syntax updated. * Syntax Updated and argument parser added. * Test cases added and updated. * Update polygon in test cases. * Example added, updated logic and for the polygon * 'skip_creating_polygon' is updated in README.md. * Add nits. * Weather-mv version bumping to 0.2.16. --- weather_mv/README.md | 15 ++- weather_mv/loader_pipeline/bq.py | 116 ++++++++++++++++- weather_mv/loader_pipeline/bq_test.py | 131 +++++++++++++++++--- weather_mv/loader_pipeline/pipeline_test.py | 1 + weather_mv/setup.py | 2 +- 5 files changed, 245 insertions(+), 20 deletions(-) diff --git a/weather_mv/README.md b/weather_mv/README.md index 98470e3b..743af03d 100644 --- a/weather_mv/README.md +++ b/weather_mv/README.md @@ -60,7 +60,7 @@ usage: weather-mv bigquery [-h] -i URIS [--topic TOPIC] [--window_size WINDOW_SI [--import_time IMPORT_TIME] [--infer_schema] [--xarray_open_dataset_kwargs XARRAY_OPEN_DATASET_KWARGS] [--tif_metadata_for_datetime TIF_METADATA_FOR_DATETIME] [-s] - [--coordinate_chunk_size COORDINATE_CHUNK_SIZE] + [--coordinate_chunk_size COORDINATE_CHUNK_SIZE] ['--skip_creating_polygon'] ``` The `bigquery` subcommand loads weather data into BigQuery. In addition to the common options above, users may specify @@ -81,6 +81,9 @@ _Command options_: * `--tif_metadata_for_datetime` : Metadata that contains tif file's timestamp. Applicable only for tif files. * `-s, --skip-region-validation` : Skip validation of regions for data migration. Default: off. * `--disable_grib_schema_normalization` : To disable grib's schema normalization. Default: off. +* `--skip_creating_polygon` : Not ingest grid points as polygons in BigQuery. Default: Ingest grid points as Polygon in + BigQuery. Note: This feature relies on the assumption that the provided grid has an equal distance between consecutive + points of latitude and longitude. Invoke with `bq -h` or `bigquery --help` to see the full range of options. @@ -117,6 +120,16 @@ weather-mv bq --uris "gs://your-bucket/*.nc" \ --dry-run ``` +Ingest grid points with skip creating polygon in BigQuery: + +```bash +weather-mv bq --uris "gs://your-bucket/*.nc" \ + --output_table $PROJECT.$DATASET_ID.$TABLE_ID \ + --temp_location "gs://$BUCKET/tmp" \ # Needed for batch writes to BigQuery + --direct_num_workers 2 \ + --skip_creating_polygon +``` + Load COG's (.tif) files: ```bash diff --git a/weather_mv/loader_pipeline/bq.py b/weather_mv/loader_pipeline/bq.py index 4c862a82..58120940 100644 --- a/weather_mv/loader_pipeline/bq.py +++ b/weather_mv/loader_pipeline/bq.py @@ -45,7 +45,9 @@ DATA_URI_COLUMN = 'data_uri' DATA_FIRST_STEP = 'data_first_step' GEO_POINT_COLUMN = 'geo_point' +GEO_POLYGON_COLUMN = 'geo_polygon' LATITUDE_RANGE = (-90, 90) +LONGITUDE_RANGE = (-180, 180) @dataclasses.dataclass @@ -94,6 +96,9 @@ class ToBigQuery(ToDataSink): skip_region_validation: bool disable_grib_schema_normalization: bool coordinate_chunk_size: int = 10_000 + skip_creating_polygon: bool = False + lat_grid_resolution: t.Optional[float] = None + lon_grid_resolution: t.Optional[float] = None @classmethod def add_parser_arguments(cls, subparser: argparse.ArgumentParser): @@ -105,6 +110,11 @@ def add_parser_arguments(cls, subparser: argparse.ArgumentParser): 'all data variables as columns.') subparser.add_argument('-a', '--area', metavar='area', type=float, nargs='+', default=list(), help='Target area in [N, W, S, E]. Default: Will include all available area.') + subparser.add_argument('--skip_creating_polygon', action='store_true', + help='Not ingest grid points as polygons in BigQuery. Default: Ingest grid points as ' + 'Polygon in BigQuery. Note: This feature relies on the assumption that the ' + 'provided grid has an equal distance between consecutive points of latitude and ' + 'longitude.') subparser.add_argument('--import_time', type=str, default=datetime.datetime.utcnow().isoformat(), help=("When writing data to BigQuery, record that data import occurred at this " "time (format: YYYY-MM-DD HH:MM:SS.usec+offset). Default: now in UTC.")) @@ -154,6 +164,27 @@ def __post_init__(self): with open_dataset(self.first_uri, self.xarray_open_dataset_kwargs, self.disable_grib_schema_normalization, self.tif_metadata_for_datetime, is_zarr=self.zarr) as open_ds: + + if not self.skip_creating_polygon: + logger.warning("Assumes that equal distance between consecutive points of latitude " + "and longitude for the entire grid.") + # Find the grid_resolution. + if open_ds['latitude'].size > 1 and open_ds['longitude'].size > 1: + latitude_length = len(open_ds['latitude']) + longitude_length = len(open_ds['longitude']) + + latitude_range = np.ptp(open_ds["latitude"].values) + longitude_range = np.ptp(open_ds["longitude"].values) + + self.lat_grid_resolution = abs(latitude_range / latitude_length) / 2 + self.lon_grid_resolution = abs(longitude_range / longitude_length) / 2 + + else: + self.skip_creating_polygon = True + logger.warning("Polygon can't be genereated as provided dataset has a only single grid point.") + else: + logger.info("Polygon is not created as '--skip_creating_polygon' flag passed.") + # Define table from user input if self.variables and not self.infer_schema and not open_ds.attrs['is_normalized']: logger.info('Creating schema from input variables.') @@ -233,8 +264,14 @@ def extract_rows(self, uri: str, coordinates: t.List[t.Dict]) -> t.Iterator[t.Di row[DATA_IMPORT_TIME_COLUMN] = self.import_time row[DATA_URI_COLUMN] = uri row[DATA_FIRST_STEP] = first_time_step - row[GEO_POINT_COLUMN] = fetch_geo_point(row['latitude'], row['longitude']) + longitude = ((row['longitude'] + 180) % 360) - 180 + row[GEO_POINT_COLUMN] = fetch_geo_point(row['latitude'], longitude) + row[GEO_POLYGON_COLUMN] = ( + fetch_geo_polygon(row["latitude"], longitude, self.lat_grid_resolution, self.lon_grid_resolution) + if not self.skip_creating_polygon + else None + ) # 'row' ends up looking like: # {'latitude': 88.0, 'longitude': 2.0, 'time': '2015-01-01 06:00:00', 'd': -2.0187, 'cc': 0.007812, # 'z': 50049.8, 'data_import_time': '2020-12-05 00:12:02.424573 UTC', ...} @@ -301,6 +338,7 @@ def to_table_schema(columns: t.List[t.Tuple[str, str]]) -> t.List[bigquery.Schem fields.append(bigquery.SchemaField(DATA_URI_COLUMN, 'STRING', mode='NULLABLE')) fields.append(bigquery.SchemaField(DATA_FIRST_STEP, 'TIMESTAMP', mode='NULLABLE')) fields.append(bigquery.SchemaField(GEO_POINT_COLUMN, 'GEOGRAPHY', mode='NULLABLE')) + fields.append(bigquery.SchemaField(GEO_POLYGON_COLUMN, 'STRING', mode='NULLABLE')) return fields @@ -309,6 +347,80 @@ def fetch_geo_point(lat: float, long: float) -> str: """Calculates a geography point from an input latitude and longitude.""" if lat > LATITUDE_RANGE[1] or lat < LATITUDE_RANGE[0]: raise ValueError(f"Invalid latitude value '{lat}'") - long = ((long + 180) % 360) - 180 + if long > LONGITUDE_RANGE[1] or long < LONGITUDE_RANGE[0]: + raise ValueError(f"Invalid longitude value '{long}'") point = geojson.dumps(geojson.Point((long, lat))) return point + + +def fetch_geo_polygon(latitude: float, longitude: float, lat_grid_resolution: float, lon_grid_resolution: float) -> str: + """Create a Polygon based on latitude, longitude and resolution. + + Example :: + * - . - * + | | + . • . + | | + * - . - * + In order to create the polygon, we require the `*` point as indicated in the above example. + To determine the position of the `*` point, we find the `.` point. + The `get_lat_lon_range` function gives the `.` point and `bound_point` gives the `*` point. + """ + lat_lon_bound = bound_point(latitude, longitude, lat_grid_resolution, lon_grid_resolution) + polygon = geojson.dumps(geojson.Polygon([ + (lat_lon_bound[0][0], lat_lon_bound[0][1]), # lower_left + (lat_lon_bound[1][0], lat_lon_bound[1][1]), # upper_left + (lat_lon_bound[2][0], lat_lon_bound[2][1]), # upper_right + (lat_lon_bound[3][0], lat_lon_bound[3][1]), # lower_right + (lat_lon_bound[0][0], lat_lon_bound[0][1]), # lower_left + ])) + return polygon + + +def bound_point(latitude: float, longitude: float, lat_grid_resolution: float, lon_grid_resolution: float) -> t.List: + """Calculate the bound point based on latitude, longitude and grid resolution. + + Example :: + * - . - * + | | + . • . + | | + * - . - * + This function gives the `*` point in the above example. + """ + lat_in_bound = latitude in [90.0, -90.0] + lon_in_bound = longitude in [-180.0, 180.0] + + lat_range = get_lat_lon_range(latitude, "latitude", lat_in_bound, + lat_grid_resolution, lon_grid_resolution) + lon_range = get_lat_lon_range(longitude, "longitude", lon_in_bound, + lat_grid_resolution, lon_grid_resolution) + lower_left = [lon_range[1], lat_range[1]] + upper_left = [lon_range[1], lat_range[0]] + upper_right = [lon_range[0], lat_range[0]] + lower_right = [lon_range[0], lat_range[1]] + return [lower_left, upper_left, upper_right, lower_right] + + +def get_lat_lon_range(value: float, lat_lon: str, is_point_out_of_bound: bool, + lat_grid_resolution: float, lon_grid_resolution: float) -> t.List: + """Calculate the latitude, longitude point range point latitude, longitude and grid resolution. + + Example :: + * - . - * + | | + . • . + | | + * - . - * + This function gives the `.` point in the above example. + """ + if lat_lon == 'latitude': + if is_point_out_of_bound: + return [-90 + lat_grid_resolution, 90 - lat_grid_resolution] + else: + return [value + lat_grid_resolution, value - lat_grid_resolution] + else: + if is_point_out_of_bound: + return [-180 + lon_grid_resolution, 180 - lon_grid_resolution] + else: + return [value + lon_grid_resolution, value - lon_grid_resolution] diff --git a/weather_mv/loader_pipeline/bq_test.py b/weather_mv/loader_pipeline/bq_test.py index e7a6e5b6..ed96cd9d 100644 --- a/weather_mv/loader_pipeline/bq_test.py +++ b/weather_mv/loader_pipeline/bq_test.py @@ -29,6 +29,7 @@ DEFAULT_IMPORT_TIME, dataset_to_table_schema, fetch_geo_point, + fetch_geo_polygon, ToBigQuery, ) from .sinks_test import TestDataBase, _handle_missing_grib_be @@ -74,6 +75,7 @@ def test_schema_generation(self): SchemaField('data_uri', 'STRING', 'NULLABLE', None, (), None), SchemaField('data_first_step', 'TIMESTAMP', 'NULLABLE', None, (), None), SchemaField('geo_point', 'GEOGRAPHY', 'NULLABLE', None, (), None), + SchemaField('geo_polygon', 'STRING', 'NULLABLE', None, (), None) ] self.assertListEqual(schema, expected_schema) @@ -89,6 +91,7 @@ def test_schema_generation__with_schema_normalization(self): SchemaField('data_uri', 'STRING', 'NULLABLE', None, (), None), SchemaField('data_first_step', 'TIMESTAMP', 'NULLABLE', None, (), None), SchemaField('geo_point', 'GEOGRAPHY', 'NULLABLE', None, (), None), + SchemaField('geo_polygon', 'STRING', 'NULLABLE', None, (), None) ] self.assertListEqual(schema, expected_schema) @@ -104,6 +107,7 @@ def test_schema_generation__with_target_columns(self): SchemaField('data_uri', 'STRING', 'NULLABLE', None, (), None), SchemaField('data_first_step', 'TIMESTAMP', 'NULLABLE', None, (), None), SchemaField('geo_point', 'GEOGRAPHY', 'NULLABLE', None, (), None), + SchemaField('geo_polygon', 'STRING', 'NULLABLE', None, (), None) ] self.assertListEqual(schema, expected_schema) @@ -119,6 +123,7 @@ def test_schema_generation__with_target_columns__with_schema_normalization(self) SchemaField('data_uri', 'STRING', 'NULLABLE', None, (), None), SchemaField('data_first_step', 'TIMESTAMP', 'NULLABLE', None, (), None), SchemaField('geo_point', 'GEOGRAPHY', 'NULLABLE', None, (), None), + SchemaField('geo_polygon', 'STRING', 'NULLABLE', None, (), None) ] self.assertListEqual(schema, expected_schema) @@ -135,6 +140,7 @@ def test_schema_generation__no_targets_specified(self): SchemaField('data_uri', 'STRING', 'NULLABLE', None, (), None), SchemaField('data_first_step', 'TIMESTAMP', 'NULLABLE', None, (), None), SchemaField('geo_point', 'GEOGRAPHY', 'NULLABLE', None, (), None), + SchemaField('geo_polygon', 'STRING', 'NULLABLE', None, (), None) ] self.assertListEqual(schema, expected_schema) @@ -151,6 +157,7 @@ def test_schema_generation__no_targets_specified__with_schema_normalization(self SchemaField('data_uri', 'STRING', 'NULLABLE', None, (), None), SchemaField('data_first_step', 'TIMESTAMP', 'NULLABLE', None, (), None), SchemaField('geo_point', 'GEOGRAPHY', 'NULLABLE', None, (), None), + SchemaField('geo_polygon', 'STRING', 'NULLABLE', None, (), None) ] self.assertListEqual(schema, expected_schema) @@ -184,6 +191,7 @@ def test_schema_generation__non_index_coords(self): SchemaField('data_uri', 'STRING', 'NULLABLE', None, (), None), SchemaField('data_first_step', 'TIMESTAMP', 'NULLABLE', None, (), None), SchemaField('geo_point', 'GEOGRAPHY', 'NULLABLE', None, (), None), + SchemaField('geo_polygon', 'STRING', 'NULLABLE', None, (), None) ] self.assertListEqual(schema, expected_schema) @@ -193,14 +201,17 @@ class ExtractRowsTestBase(TestDataBase): def extract(self, data_path, *, variables=None, area=None, open_dataset_kwargs=None, import_time=DEFAULT_IMPORT_TIME, disable_grib_schema_normalization=False, - tif_metadata_for_datetime=None, zarr: bool = False, zarr_kwargs=None) -> t.Iterator[t.Dict]: + tif_metadata_for_datetime=None, zarr: bool = False, zarr_kwargs=None, + skip_creating_polygon: bool = False) -> t.Iterator[t.Dict]: if zarr_kwargs is None: zarr_kwargs = {} op = ToBigQuery.from_kwargs(first_uri=data_path, dry_run=True, zarr=zarr, zarr_kwargs=zarr_kwargs, - output_table='foo.bar.baz', variables=variables, area=area, - xarray_open_dataset_kwargs=open_dataset_kwargs, import_time=import_time, infer_schema=False, - tif_metadata_for_datetime=tif_metadata_for_datetime, skip_region_validation=True, - disable_grib_schema_normalization=disable_grib_schema_normalization, coordinate_chunk_size=1000) + output_table='foo.bar.baz', variables=variables, area=area, + xarray_open_dataset_kwargs=open_dataset_kwargs, import_time=import_time, + infer_schema=False, tif_metadata_for_datetime=tif_metadata_for_datetime, + skip_region_validation=True, + disable_grib_schema_normalization=disable_grib_schema_normalization, + coordinate_chunk_size=1000, skip_creating_polygon=skip_creating_polygon) coords = op.prepare_coordinates(data_path) for uri, chunk in coords: yield from op.extract_rows(uri, chunk) @@ -233,7 +244,7 @@ def setUp(self) -> None: self.test_data_path = f'{self.test_data_folder}/test_data_20180101.nc' def test_extract_rows(self): - actual = next(self.extract(self.test_data_path)) + actual = next(self.extract(self.test_data_path, skip_creating_polygon=True)) expected = { 'd2m': 242.3035430908203, 'data_import_time': '1970-01-01T00:00:00+00:00', @@ -245,6 +256,7 @@ def test_extract_rows(self): 'u10': 3.4776244163513184, 'v10': 0.03294110298156738, 'geo_point': geojson.dumps(geojson.Point((-108.0, 49.0))), + 'geo_polygon': None } self.assertRowsEqual(actual, expected) @@ -259,11 +271,15 @@ def test_extract_rows__with_subset_variables(self): 'time': '2018-01-02T06:00:00+00:00', 'u10': 3.4776244163513184, 'geo_point': geojson.dumps(geojson.Point((-108.0, 49.0))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (-108.098837, 48.900826), (-108.098837, 49.099174), + (-107.901163, 49.099174), (-107.901163, 48.900826), + (-108.098837, 48.900826)])) } self.assertRowsEqual(actual, expected) def test_extract_rows__specific_area(self): - actual = next(self.extract(self.test_data_path, area=[45, -103, 33, -92])) + actual = next(self.extract(self.test_data_path, area=[45, -103, 33, -92], skip_creating_polygon=True)) expected = { 'd2m': 246.19993591308594, 'data_import_time': '1970-01-01T00:00:00+00:00', @@ -275,6 +291,7 @@ def test_extract_rows__specific_area(self): 'u10': 2.73445987701416, 'v10': 0.08277571201324463, 'geo_point': geojson.dumps(geojson.Point((-103.0, 45.0))), + 'geo_polygon': None } self.assertRowsEqual(actual, expected) @@ -291,6 +308,10 @@ def test_extract_rows__specific_area_float_points(self): 'u10': 3.94743275642395, 'v10': -0.19749987125396729, 'geo_point': geojson.dumps(geojson.Point((-103.400002, 45.200001))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (-103.498839, 45.100827), (-103.498839, 45.299174), + (-103.301164, 45.299174), (-103.301164, 45.100827), + (-103.498839, 45.100827)])) } self.assertRowsEqual(actual, expected) @@ -307,7 +328,11 @@ def test_extract_rows__specify_import_time(self): 'time': '2018-01-02T06:00:00+00:00', 'u10': 3.4776244163513184, 'v10': 0.03294110298156738, - 'geo_point': geojson.dumps(geojson.Point((-108.0, 49.0))) + 'geo_point': geojson.dumps(geojson.Point((-108.0, 49.0))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (-108.098837, 48.900826), (-108.098837, 49.099174), + (-107.901163, 49.099174), (-107.901163, 48.900826), + (-108.098837, 48.900826)])) } self.assertRowsEqual(actual, expected) @@ -324,7 +349,8 @@ def test_extract_rows_single_point(self): 'time': '2018-01-02T06:00:00+00:00', 'u10': 3.4776244163513184, 'v10': 0.03294110298156738, - 'geo_point': geojson.dumps(geojson.Point((-108.0, 49.0))) + 'geo_point': geojson.dumps(geojson.Point((-108.0, 49.0))), + 'geo_polygon': None } self.assertRowsEqual(actual, expected) @@ -342,12 +368,16 @@ def test_extract_rows_nan(self): 'u10': None, 'v10': 0.03294110298156738, 'geo_point': geojson.dumps(geojson.Point((-108.0, 49.0))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (-108.098837, 48.900826), (-108.098837, 49.099174), + (-107.901163, 49.099174), (-107.901163, 48.900826), + (-108.098837, 48.900826)])) } self.assertRowsEqual(actual, expected) - def test_extract_rows__with_valid_lat_long(self): - valid_lat_long = [[-90, -360], [-90, -359], [-45, -180], [-45, -45], [0, 0], [45, 45], [45, 180], [90, 359], - [90, 360]] + def test_extract_rows__with_valid_lat_long_with_point(self): + valid_lat_long = [[-90, 0], [-90, 1], [-45, -180], [-45, -45], [0, 0], [45, 45], [45, -180], [90, -1], + [90, 0]] actual_val = [ '{"type": "Point", "coordinates": [0, -90]}', '{"type": "Point", "coordinates": [1, -90]}', @@ -364,7 +394,31 @@ def test_extract_rows__with_valid_lat_long(self): expected = fetch_geo_point(lat, long) self.assertEqual(actual, expected) - def test_extract_rows__with_invalid_lat(self): + def test_extract_rows__with_valid_lat_long_with_polygon(self): + valid_lat_long = [[-90, 0], [-90, -180], [-45, -180], [-45, 180], [0, 0], [90, 180], [45, -180], [-90, 180], + [90, 1], [0, 180], [1, -180], [90, -180]] + actual_val = [ + '{"type": "Polygon", "coordinates": [[-1, 89], [-1, -89], [1, -89], [1, 89], [-1, 89]]}', + '{"type": "Polygon", "coordinates": [[179, 89], [179, -89], [-179, -89], [-179, 89], [179, 89]]}', + '{"type": "Polygon", "coordinates": [[179, -46], [179, -44], [-179, -44], [-179, -46], [179, -46]]}', + '{"type": "Polygon", "coordinates": [[179, -46], [179, -44], [-179, -44], [-179, -46], [179, -46]]}', + '{"type": "Polygon", "coordinates": [[-1, -1], [-1, 1], [1, 1], [1, -1], [-1, -1]]}', + '{"type": "Polygon", "coordinates": [[179, 89], [179, -89], [-179, -89], [-179, 89], [179, 89]]}', + '{"type": "Polygon", "coordinates": [[179, 44], [179, 46], [-179, 46], [-179, 44], [179, 44]]}', + '{"type": "Polygon", "coordinates": [[179, 89], [179, -89], [-179, -89], [-179, 89], [179, 89]]}', + '{"type": "Polygon", "coordinates": [[0, 89], [0, -89], [2, -89], [2, 89], [0, 89]]}', + '{"type": "Polygon", "coordinates": [[179, -1], [179, 1], [-179, 1], [-179, -1], [179, -1]]}', + '{"type": "Polygon", "coordinates": [[179, 0], [179, 2], [-179, 2], [-179, 0], [179, 0]]}', + '{"type": "Polygon", "coordinates": [[179, 89], [179, -89], [-179, -89], [-179, 89], [179, 89]]}' + ] + lat_grid_resolution = 1 + lon_grid_resolution = 1 + for actual, (lat, long) in zip(actual_val, valid_lat_long): + with self.subTest(): + expected = fetch_geo_polygon(lat, long, lat_grid_resolution, lon_grid_resolution) + self.assertEqual(actual, expected) + + def test_extract_rows__with_invalid_lat_lon(self): invalid_lat_long = [[-100, -2000], [-100, -500], [100, 500], [100, 2000]] for (lat, long) in invalid_lat_long: with self.subTest(): @@ -384,12 +438,16 @@ def test_extract_rows_zarr(self): 'longitude': 0, 'time': '1959-01-01T00:00:00+00:00', 'geo_point': geojson.dumps(geojson.Point((0.0, 90.0))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (-0.124913, 89.875173), (-0.124913, -89.875173), + (0.124913, -89.875173), (0.124913, 89.875173), + (-0.124913, 89.875173)])) } self.assertRowsEqual(actual, expected) def test_droping_variable_while_opening_zarr(self): input_path = os.path.join(self.test_data_folder, 'test_data.zarr') - actual = next(self.extract(input_path, zarr=True, zarr_kwargs={ 'drop_variables': ['cape'] })) + actual = next(self.extract(input_path, zarr=True, zarr_kwargs={'drop_variables': ['cape']})) expected = { 'd2m': 237.5404052734375, 'data_import_time': '1970-01-01T00:00:00+00:00', @@ -399,6 +457,10 @@ def test_droping_variable_while_opening_zarr(self): 'longitude': 0, 'time': '1959-01-01T00:00:00+00:00', 'geo_point': geojson.dumps(geojson.Point((0.0, 90.0))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (-0.124913, 89.875173), (-0.124913, -89.875173), + (0.124913, -89.875173), (0.124913, 89.875173), + (-0.124913, 89.875173)])) } self.assertRowsEqual(actual, expected) @@ -420,7 +482,11 @@ def test_extract_rows(self): 'latitude': 42.09783344918844, 'longitude': -123.66686981141397, 'time': '2020-07-01T00:00:00+00:00', - 'geo_point': geojson.dumps(geojson.Point((-123.66687, 42.097833))) + 'geo_point': geojson.dumps(geojson.Point((-123.66687, 42.097833))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (-123.669853, 42.095605), (-123.669853, 42.100066), + (-123.663885, 42.100066), (-123.663885, 42.095605), + (-123.669853, 42.095605)])) } self.assertRowsEqual(actual, expected) @@ -447,6 +513,10 @@ def test_extract_rows(self): 'valid_time': '2021-10-18T06:00:00+00:00', 'z': 1.42578125, 'geo_point': geojson.dumps(geojson.Point((-180.0, 90.0))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (179.950014, 89.950028), (179.950014, -89.950028), + (-179.950014, -89.950028), (-179.950014, 89.950028), + (179.950014, 89.950028)])) } self.assertRowsEqual(actual, expected) @@ -461,6 +531,10 @@ def test_extract_rows__with_vars__excludes_non_index_coords__without_schema_norm 'longitude': -180.0, 'z': 1.42578125, 'geo_point': geojson.dumps(geojson.Point((-180.0, 90.0))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (179.950014, 89.950028), (179.950014, -89.950028), + (-179.950014, -89.950028), (-179.950014, 89.950028), + (179.950014, 89.950028)])) } self.assertRowsEqual(actual, expected) @@ -477,6 +551,10 @@ def test_extract_rows__with_vars__includes_coordinates_in_vars__without_schema_n 'step': 0, 'z': 1.42578125, 'geo_point': geojson.dumps(geojson.Point((-180.0, 90.0))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (179.950014, 89.950028), (179.950014, -89.950028), + (-179.950014, -89.950028), (-179.950014, 89.950028), + (179.950014, 89.950028)])) } self.assertRowsEqual(actual, expected) @@ -491,6 +569,10 @@ def test_extract_rows__with_vars__excludes_non_index_coords__with_schema_normali 'longitude': -180.0, 'surface_0_00_instant_z': 1.42578125, 'geo_point': geojson.dumps(geojson.Point((-180.0, 90.0))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (179.950014, 89.950028), (179.950014, -89.950028), + (-179.950014, -89.950028), (-179.950014, 89.950028), + (179.950014, 89.950028)])) } self.assertRowsEqual(actual, expected) @@ -506,6 +588,10 @@ def test_extract_rows__with_vars__includes_coordinates_in_vars__with_schema_norm 'step': 0, 'surface_0_00_instant_z': 1.42578125, 'geo_point': geojson.dumps(geojson.Point((-180.0, 90.0))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (179.950014, 89.950028), (179.950014, -89.950028), + (-179.950014, -89.950028), (-179.950014, 89.950028), + (179.950014, 89.950028)])) } self.assertRowsEqual(actual, expected) @@ -559,6 +645,10 @@ def test_multiple_editions__without_schema_normalization(self): 'v200': -3.6647186279296875, 'valid_time': '2021-12-10T20:00:00+00:00', 'geo_point': geojson.dumps(geojson.Point((-180.0, 90.0))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (179.950014, 89.950028), (179.950014, -89.950028), + (-179.950014, -89.950028), (-179.950014, 89.950028), + (179.950014, 89.950028)])) } self.assertRowsEqual(actual, expected) @@ -614,7 +704,11 @@ def test_multiple_editions__with_schema_normalization(self): 'surface_0_00_instant_tprate': 0.0, 'surface_0_00_instant_ceil': 179.17018127441406, 'valid_time': '2021-12-10T20:00:00+00:00', - 'geo_point': geojson.dumps(geojson.Point((-180.0, 90.0))) + 'geo_point': geojson.dumps(geojson.Point((-180.0, 90.0))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (179.950014, 89.950028), (179.950014, -89.950028), + (-179.950014, -89.950028), (-179.950014, 89.950028), + (179.950014, 89.950028)])) } self.assertRowsEqual(actual, expected) @@ -634,6 +728,11 @@ def test_multiple_editions__with_vars__includes_coordinates_in_vars__with_schema 'depthBelowLandLayer_0_00_instant_stl1': 251.02520751953125, 'depthBelowLandLayer_7_00_instant_stl2': 253.54124450683594, 'geo_point': geojson.dumps(geojson.Point((-180.0, 90.0))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (179.950014, 89.950028), (179.950014, -89.950028), + (-179.950014, -89.950028), (-179.950014, 89.950028), + (179.950014, 89.950028)])) + } self.assertRowsEqual(actual, expected) diff --git a/weather_mv/loader_pipeline/pipeline_test.py b/weather_mv/loader_pipeline/pipeline_test.py index 09e53b43..6a18e59d 100644 --- a/weather_mv/loader_pipeline/pipeline_test.py +++ b/weather_mv/loader_pipeline/pipeline_test.py @@ -66,6 +66,7 @@ def setUp(self) -> None: 'zarr': False, 'zarr_kwargs': {}, 'log_level': 2, + 'skip_creating_polygon': False, } diff --git a/weather_mv/setup.py b/weather_mv/setup.py index 26201eef..12bd63fe 100644 --- a/weather_mv/setup.py +++ b/weather_mv/setup.py @@ -62,7 +62,7 @@ packages=find_packages(), author='Anthromets', author_email='anthromets-ecmwf@google.com', - version='0.2.15', + version='0.2.16', url='https://weather-tools.readthedocs.io/en/latest/weather_mv/', description='A tool to load weather data into BigQuery.', install_requires=beam_gcp_requirements + base_requirements, From b5542c408168ccc194b7189ae20998e53a17c201 Mon Sep 17 00:00:00 2001 From: Darshan Prajapati <93967637+DarshanSP19@users.noreply.github.com> Date: Tue, 11 Jul 2023 18:02:09 +0530 Subject: [PATCH 3/3] weather-* scripts default to runtime container image. (#355) * Removed Extra Packages * Added Flag To Use Local Code. * Updated README.md files * Version Bump --------- Co-authored-by: Darshan --- weather_dl/README.md | 13 +++++++++++ weather_dl/download_pipeline/pipeline.py | 1 + weather_dl/download_pipeline/pipeline_test.py | 3 ++- weather_dl/setup.py | 2 +- weather_dl/weather-dl | 22 +++++++++++++----- weather_mv/README.md | 15 ++++++++++++ weather_mv/loader_pipeline/pipeline.py | 2 ++ weather_mv/loader_pipeline/pipeline_test.py | 1 + weather_mv/setup.py | 2 +- weather_mv/weather-mv | 20 ++++++++++++---- weather_sp/README.md | 15 ++++++++++++ weather_sp/setup.py | 2 +- weather_sp/splitter_pipeline/pipeline.py | 2 ++ weather_sp/weather-sp | 23 +++++++++++++------ 14 files changed, 101 insertions(+), 22 deletions(-) diff --git a/weather_dl/README.md b/weather_dl/README.md index 57924111..d5608c67 100644 --- a/weather_dl/README.md +++ b/weather_dl/README.md @@ -57,6 +57,8 @@ _Common options_: that partitions will be processed in sequential order of each config; 'fair' means that partitions from each config will be interspersed evenly. Note: When using 'fair' scheduling, we recommend you set the '--partition-chunks' to a much smaller number. Default: 'in-order'. +* `--log-level`: An integer to configure log level. Default: 2(INFO). +* `--use-local-code`: Supply local code to the Runner. Default: False. > Note: > * In case of BigQuery manifest tool will create the BQ table itself, if not already present. @@ -93,6 +95,17 @@ weather-dl configs/mars_example_config.cfg \ --job_name $JOB_NAME ``` +Using DataflowRunner and using local code for pipeline + +```bash +weather-dl configs/mars_example_config.cfg \ + --runner DataflowRunner \ + --project $PROJECT \ + --temp_location gs://$BUCKET/tmp \ + --job_name $JOB_NAME \ + --use-local-code +``` + Using the DataflowRunner and specifying 3 requests per license ```bash diff --git a/weather_dl/download_pipeline/pipeline.py b/weather_dl/download_pipeline/pipeline.py index fa4983e2..71bdb71f 100644 --- a/weather_dl/download_pipeline/pipeline.py +++ b/weather_dl/download_pipeline/pipeline.py @@ -177,6 +177,7 @@ def run(argv: t.List[str], save_main_session: bool = True) -> PipelineArgs: help="Update the manifest for the already downloaded shards and exit. Default: 'false'.") parser.add_argument('--log-level', type=int, default=2, help='An integer to configure log level. Default: 2(INFO)') + parser.add_argument('--use-local-code', action='store_true', default=False, help='Supply local code to the Runner.') known_args, pipeline_args = parser.parse_known_args(argv[1:]) diff --git a/weather_dl/download_pipeline/pipeline_test.py b/weather_dl/download_pipeline/pipeline_test.py index 9e18a4bc..c6370c6f 100644 --- a/weather_dl/download_pipeline/pipeline_test.py +++ b/weather_dl/download_pipeline/pipeline_test.py @@ -58,7 +58,8 @@ schedule='in-order', check_skip_in_dry_run=False, update_manifest=False, - log_level=20), + log_level=20, + use_local_code=False), pipeline_options=PipelineOptions('--save_main_session True'.split()), configs=[Config.from_dict(CONFIG)], client_name='cds', diff --git a/weather_dl/setup.py b/weather_dl/setup.py index 60c8732f..b96466fd 100644 --- a/weather_dl/setup.py +++ b/weather_dl/setup.py @@ -48,7 +48,7 @@ setup( name='download_pipeline', packages=find_packages(), - version='0.1.19', + version='0.1.20', author='Anthromets', author_email='anthromets-ecmwf@google.com', url='https://weather-tools.readthedocs.io/en/latest/weather_dl/', diff --git a/weather_dl/weather-dl b/weather_dl/weather-dl index 6fc95b7d..5b73411a 100755 --- a/weather_dl/weather-dl +++ b/weather_dl/weather-dl @@ -24,6 +24,8 @@ import tempfile import weather_dl +SDK_CONTAINER_IMAGE='gcr.io/weather-tools-prod/weather-tools:0.0.0' + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) @@ -48,10 +50,16 @@ if __name__ == '__main__': from download_pipeline import cli except ImportError as e: raise ImportError('please re-install package in a clean python environment.') from e - - if '-h' in sys.argv or '--help' in sys.argv or len(sys.argv) == 1: - cli() - else: + + args = [] + + if "DataflowRunner" in sys.argv and "--sdk_container_image" not in sys.argv: + args.extend(['--sdk_container_image', + os.getenv('SDK_CONTAINER_IMAGE', SDK_CONTAINER_IMAGE), + '--experiments', + 'use_runner_v2']) + + if "--use-local-code" in sys.argv: with tempfile.TemporaryDirectory() as tmpdir: original_dir = os.getcwd() @@ -72,5 +80,7 @@ if __name__ == '__main__': # cleanup memory to prevent pickling error. tar = None weather_dl = None - - cli(['--extra_package', pkg_archive]) + args.extend(['--extra_package', pkg_archive]) + cli(args) + else: + cli(args) diff --git a/weather_mv/README.md b/weather_mv/README.md index 743af03d..eff4717d 100644 --- a/weather_mv/README.md +++ b/weather_mv/README.md @@ -49,6 +49,8 @@ _Common options_ * `--num_shards`: Number of shards to use when writing windowed elements to cloud storage. Only used with the `topic` flag. Default: 5 shards. * `-d, --dry-run`: Preview the load into BigQuery. Default: off. +* `--log-level`: An integer to configure log level. Default: 2(INFO). +* `--use-local-code`: Supply local code to the Runner. Default: False. Invoke with `-h` or `--help` to see the full range of options. @@ -182,6 +184,19 @@ weather-mv bq --uris "gs://your-bucket/*.nc" \ --job_name $JOB_NAME ``` +Using DataflowRunner and using local code for pipeline + +```bash +weather-mv bq --uris "gs://your-bucket/*.nc" \ + --output_table $PROJECT.$DATASET_ID.$TABLE_ID \ + --runner DataflowRunner \ + --project $PROJECT \ + --region $REGION \ + --temp_location "gs://$BUCKET/tmp" \ + --job_name $JOB_NAME \ + --use-local-code +``` + For a full list of how to configure the Dataflow pipeline, please review [this table](https://cloud.google.com/dataflow/docs/reference/pipeline-options). diff --git a/weather_mv/loader_pipeline/pipeline.py b/weather_mv/loader_pipeline/pipeline.py index 10136747..c12bd5f5 100644 --- a/weather_mv/loader_pipeline/pipeline.py +++ b/weather_mv/loader_pipeline/pipeline.py @@ -27,6 +27,7 @@ from .streaming import GroupMessagesByFixedWindows, ParsePaths logger = logging.getLogger(__name__) +SDK_CONTAINER_IMAGE='gcr.io/weather-tools-prod/weather-tools:0.0.0' def configure_logger(verbosity: int) -> None: @@ -113,6 +114,7 @@ def run(argv: t.List[str]) -> t.Tuple[argparse.Namespace, t.List[str]]: help='Preview the weather-mv job. Default: off') base.add_argument('--log-level', type=int, default=2, help='An integer to configure log level. Default: 2(INFO)') + base.add_argument('--use-local-code', action='store_true', default=False, help='Supply local code to the Runner.') subparsers = parser.add_subparsers(help='help for subcommand', dest='subcommand') diff --git a/weather_mv/loader_pipeline/pipeline_test.py b/weather_mv/loader_pipeline/pipeline_test.py index 6a18e59d..4d546192 100644 --- a/weather_mv/loader_pipeline/pipeline_test.py +++ b/weather_mv/loader_pipeline/pipeline_test.py @@ -66,6 +66,7 @@ def setUp(self) -> None: 'zarr': False, 'zarr_kwargs': {}, 'log_level': 2, + 'use_local_code': False, 'skip_creating_polygon': False, } diff --git a/weather_mv/setup.py b/weather_mv/setup.py index 12bd63fe..bfe09713 100644 --- a/weather_mv/setup.py +++ b/weather_mv/setup.py @@ -62,7 +62,7 @@ packages=find_packages(), author='Anthromets', author_email='anthromets-ecmwf@google.com', - version='0.2.16', + version='0.2.17', url='https://weather-tools.readthedocs.io/en/latest/weather_mv/', description='A tool to load weather data into BigQuery.', install_requires=beam_gcp_requirements + base_requirements, diff --git a/weather_mv/weather-mv b/weather_mv/weather-mv index e8be4fcd..886c3d1f 100755 --- a/weather_mv/weather-mv +++ b/weather_mv/weather-mv @@ -23,6 +23,8 @@ import tempfile import weather_mv +SDK_CONTAINER_IMAGE='gcr.io/weather-tools-prod/weather-tools:0.0.0' + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) @@ -47,9 +49,15 @@ if __name__ == '__main__': except ImportError as e: raise ImportError('please re-install package in a clean python environment.') from e - if '-h' in sys.argv or '--help' in sys.argv or len(sys.argv) == 1: - cli() - else: + args = [] + + if "DataflowRunner" in sys.argv and "--sdk_container_image" not in sys.argv: + args.extend(['--sdk_container_image', + os.getenv('SDK_CONTAINER_IMAGE', SDK_CONTAINER_IMAGE), + '--experiments', + 'use_runner_v2']) + + if "--use-local-code" in sys.argv: with tempfile.TemporaryDirectory() as tmpdir: original_dir = os.getcwd() @@ -70,5 +78,7 @@ if __name__ == '__main__': # cleanup memory to prevent pickling error. tar = None weather_mv = None - - cli(['--extra_package', pkg_archive]) + args.extend(['--extra_package', pkg_archive]) + cli(args) + else: + cli(args) diff --git a/weather_sp/README.md b/weather_sp/README.md index 019217a6..93f81907 100644 --- a/weather_sp/README.md +++ b/weather_sp/README.md @@ -27,6 +27,8 @@ _Common options_: using Python formatting, see [Output section](#output) below. * `-f, --force`: Force re-splitting of the pipeline. Turns of skipping of already split data. * `-d, --dry-run`: Test the input file matching and the output file scheme without splitting. +* `--log-level`: An integer to configure log level. Default: 2(INFO). +* `--use-local-code`: Supply local code to the Runner. Default: False. Invoke with `-h` or `--help` to see the full range of options. @@ -59,6 +61,19 @@ weather-sp --input-pattern 'gs://test-tmp/era5/2015/**' \ --job_name $JOB_NAME ``` +Using DataflowRunner and using local code for pipeline + +```bash +weather-sp --input-pattern 'gs://test-tmp/era5/2015/**' \ + --output-dir 'gs://test-tmp/era5/splits' + --formatting '.{typeOfLevel}' \ + --runner DataflowRunner \ + --project $PROJECT \ + --temp_location gs://$BUCKET/tmp \ + --job_name $JOB_NAME \ + --use-local-code +``` + Using ecCodes-powered grib splitting on Dataflow (this is often more robust, especially when splitting multiple dimensions at once): diff --git a/weather_sp/setup.py b/weather_sp/setup.py index d56b351e..59786c8e 100644 --- a/weather_sp/setup.py +++ b/weather_sp/setup.py @@ -44,7 +44,7 @@ packages=find_packages(), author='Anthromets', author_email='anthromets-ecmwf@google.com', - version='0.3.0', + version='0.3.1', url='https://weather-tools.readthedocs.io/en/latest/weather_sp/', description='A tool to split weather data files into per-variable files.', install_requires=beam_gcp_requirements + base_requirements, diff --git a/weather_sp/splitter_pipeline/pipeline.py b/weather_sp/splitter_pipeline/pipeline.py index c3dcd470..bbcee909 100644 --- a/weather_sp/splitter_pipeline/pipeline.py +++ b/weather_sp/splitter_pipeline/pipeline.py @@ -26,6 +26,7 @@ from .file_splitters import get_splitter logger = logging.getLogger(__name__) +SDK_CONTAINER_IMAGE='gcr.io/weather-tools-prod/weather-tools:0.0.0' def configure_logger(verbosity: int) -> None: @@ -88,6 +89,7 @@ def run(argv: t.List[str], save_main_session: bool = True): ) parser.add_argument('-i', '--input-pattern', type=str, required=True, help='Pattern for input weather data.') + parser.add_argument('--use-local-code', action='store_true', default=False, help='Supply local code to the Runner.') output_options = parser.add_mutually_exclusive_group(required=True) output_options.add_argument( '--output-template', type=str, diff --git a/weather_sp/weather-sp b/weather_sp/weather-sp index a22b75b1..4db3dc20 100755 --- a/weather_sp/weather-sp +++ b/weather_sp/weather-sp @@ -23,6 +23,8 @@ import tempfile import weather_sp +SDK_CONTAINER_IMAGE='gcr.io/weather-tools-prod/weather-tools:0.0.0' + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) @@ -50,9 +52,15 @@ if __name__ == '__main__': raise ImportError( 'please re-install package in a clean python environment.') from e - if '-h' in sys.argv or '--help' in sys.argv or len(sys.argv) == 1: - cli() - else: + args = [] + + if "DataflowRunner" in sys.argv and "--sdk_container_image" not in sys.argv: + args.extend(['--sdk_container_image', + os.getenv('SDK_CONTAINER_IMAGE', SDK_CONTAINER_IMAGE), + '--experiments', + 'use_runner_v2']) + + if "--use-local-code" in sys.argv: with tempfile.TemporaryDirectory() as tmpdir: original_dir = os.getcwd() @@ -68,11 +76,12 @@ if __name__ == '__main__': pkg_archive = glob.glob(os.path.join(tmpdir, '*.tar.gz'))[0] with tarfile.open(pkg_archive, 'r') as tar: - assert any([f.endswith('.py') for f in - tar.getnames()]), 'extra_package must include python files!' + assert any([f.endswith('.py') for f in tar.getnames()]), 'extra_package must include python files!' # cleanup memory to prevent pickling error. tar = None weather_sp = None - - cli(['--extra_package', pkg_archive]) + args.extend(['--extra_package', pkg_archive]) + cli(args) + else: + cli(args) \ No newline at end of file