diff --git a/weather_mv/README.md b/weather_mv/README.md index 98470e3b..743af03d 100644 --- a/weather_mv/README.md +++ b/weather_mv/README.md @@ -60,7 +60,7 @@ usage: weather-mv bigquery [-h] -i URIS [--topic TOPIC] [--window_size WINDOW_SI [--import_time IMPORT_TIME] [--infer_schema] [--xarray_open_dataset_kwargs XARRAY_OPEN_DATASET_KWARGS] [--tif_metadata_for_datetime TIF_METADATA_FOR_DATETIME] [-s] - [--coordinate_chunk_size COORDINATE_CHUNK_SIZE] + [--coordinate_chunk_size COORDINATE_CHUNK_SIZE] ['--skip_creating_polygon'] ``` The `bigquery` subcommand loads weather data into BigQuery. In addition to the common options above, users may specify @@ -81,6 +81,9 @@ _Command options_: * `--tif_metadata_for_datetime` : Metadata that contains tif file's timestamp. Applicable only for tif files. * `-s, --skip-region-validation` : Skip validation of regions for data migration. Default: off. * `--disable_grib_schema_normalization` : To disable grib's schema normalization. Default: off. +* `--skip_creating_polygon` : Not ingest grid points as polygons in BigQuery. Default: Ingest grid points as Polygon in + BigQuery. Note: This feature relies on the assumption that the provided grid has an equal distance between consecutive + points of latitude and longitude. Invoke with `bq -h` or `bigquery --help` to see the full range of options. @@ -117,6 +120,16 @@ weather-mv bq --uris "gs://your-bucket/*.nc" \ --dry-run ``` +Ingest grid points with skip creating polygon in BigQuery: + +```bash +weather-mv bq --uris "gs://your-bucket/*.nc" \ + --output_table $PROJECT.$DATASET_ID.$TABLE_ID \ + --temp_location "gs://$BUCKET/tmp" \ # Needed for batch writes to BigQuery + --direct_num_workers 2 \ + --skip_creating_polygon +``` + Load COG's (.tif) files: ```bash diff --git a/weather_mv/loader_pipeline/bq.py b/weather_mv/loader_pipeline/bq.py index 4c862a82..58120940 100644 --- a/weather_mv/loader_pipeline/bq.py +++ b/weather_mv/loader_pipeline/bq.py @@ -45,7 +45,9 @@ DATA_URI_COLUMN = 'data_uri' DATA_FIRST_STEP = 'data_first_step' GEO_POINT_COLUMN = 'geo_point' +GEO_POLYGON_COLUMN = 'geo_polygon' LATITUDE_RANGE = (-90, 90) +LONGITUDE_RANGE = (-180, 180) @dataclasses.dataclass @@ -94,6 +96,9 @@ class ToBigQuery(ToDataSink): skip_region_validation: bool disable_grib_schema_normalization: bool coordinate_chunk_size: int = 10_000 + skip_creating_polygon: bool = False + lat_grid_resolution: t.Optional[float] = None + lon_grid_resolution: t.Optional[float] = None @classmethod def add_parser_arguments(cls, subparser: argparse.ArgumentParser): @@ -105,6 +110,11 @@ def add_parser_arguments(cls, subparser: argparse.ArgumentParser): 'all data variables as columns.') subparser.add_argument('-a', '--area', metavar='area', type=float, nargs='+', default=list(), help='Target area in [N, W, S, E]. Default: Will include all available area.') + subparser.add_argument('--skip_creating_polygon', action='store_true', + help='Not ingest grid points as polygons in BigQuery. Default: Ingest grid points as ' + 'Polygon in BigQuery. Note: This feature relies on the assumption that the ' + 'provided grid has an equal distance between consecutive points of latitude and ' + 'longitude.') subparser.add_argument('--import_time', type=str, default=datetime.datetime.utcnow().isoformat(), help=("When writing data to BigQuery, record that data import occurred at this " "time (format: YYYY-MM-DD HH:MM:SS.usec+offset). Default: now in UTC.")) @@ -154,6 +164,27 @@ def __post_init__(self): with open_dataset(self.first_uri, self.xarray_open_dataset_kwargs, self.disable_grib_schema_normalization, self.tif_metadata_for_datetime, is_zarr=self.zarr) as open_ds: + + if not self.skip_creating_polygon: + logger.warning("Assumes that equal distance between consecutive points of latitude " + "and longitude for the entire grid.") + # Find the grid_resolution. + if open_ds['latitude'].size > 1 and open_ds['longitude'].size > 1: + latitude_length = len(open_ds['latitude']) + longitude_length = len(open_ds['longitude']) + + latitude_range = np.ptp(open_ds["latitude"].values) + longitude_range = np.ptp(open_ds["longitude"].values) + + self.lat_grid_resolution = abs(latitude_range / latitude_length) / 2 + self.lon_grid_resolution = abs(longitude_range / longitude_length) / 2 + + else: + self.skip_creating_polygon = True + logger.warning("Polygon can't be genereated as provided dataset has a only single grid point.") + else: + logger.info("Polygon is not created as '--skip_creating_polygon' flag passed.") + # Define table from user input if self.variables and not self.infer_schema and not open_ds.attrs['is_normalized']: logger.info('Creating schema from input variables.') @@ -233,8 +264,14 @@ def extract_rows(self, uri: str, coordinates: t.List[t.Dict]) -> t.Iterator[t.Di row[DATA_IMPORT_TIME_COLUMN] = self.import_time row[DATA_URI_COLUMN] = uri row[DATA_FIRST_STEP] = first_time_step - row[GEO_POINT_COLUMN] = fetch_geo_point(row['latitude'], row['longitude']) + longitude = ((row['longitude'] + 180) % 360) - 180 + row[GEO_POINT_COLUMN] = fetch_geo_point(row['latitude'], longitude) + row[GEO_POLYGON_COLUMN] = ( + fetch_geo_polygon(row["latitude"], longitude, self.lat_grid_resolution, self.lon_grid_resolution) + if not self.skip_creating_polygon + else None + ) # 'row' ends up looking like: # {'latitude': 88.0, 'longitude': 2.0, 'time': '2015-01-01 06:00:00', 'd': -2.0187, 'cc': 0.007812, # 'z': 50049.8, 'data_import_time': '2020-12-05 00:12:02.424573 UTC', ...} @@ -301,6 +338,7 @@ def to_table_schema(columns: t.List[t.Tuple[str, str]]) -> t.List[bigquery.Schem fields.append(bigquery.SchemaField(DATA_URI_COLUMN, 'STRING', mode='NULLABLE')) fields.append(bigquery.SchemaField(DATA_FIRST_STEP, 'TIMESTAMP', mode='NULLABLE')) fields.append(bigquery.SchemaField(GEO_POINT_COLUMN, 'GEOGRAPHY', mode='NULLABLE')) + fields.append(bigquery.SchemaField(GEO_POLYGON_COLUMN, 'STRING', mode='NULLABLE')) return fields @@ -309,6 +347,80 @@ def fetch_geo_point(lat: float, long: float) -> str: """Calculates a geography point from an input latitude and longitude.""" if lat > LATITUDE_RANGE[1] or lat < LATITUDE_RANGE[0]: raise ValueError(f"Invalid latitude value '{lat}'") - long = ((long + 180) % 360) - 180 + if long > LONGITUDE_RANGE[1] or long < LONGITUDE_RANGE[0]: + raise ValueError(f"Invalid longitude value '{long}'") point = geojson.dumps(geojson.Point((long, lat))) return point + + +def fetch_geo_polygon(latitude: float, longitude: float, lat_grid_resolution: float, lon_grid_resolution: float) -> str: + """Create a Polygon based on latitude, longitude and resolution. + + Example :: + * - . - * + | | + . • . + | | + * - . - * + In order to create the polygon, we require the `*` point as indicated in the above example. + To determine the position of the `*` point, we find the `.` point. + The `get_lat_lon_range` function gives the `.` point and `bound_point` gives the `*` point. + """ + lat_lon_bound = bound_point(latitude, longitude, lat_grid_resolution, lon_grid_resolution) + polygon = geojson.dumps(geojson.Polygon([ + (lat_lon_bound[0][0], lat_lon_bound[0][1]), # lower_left + (lat_lon_bound[1][0], lat_lon_bound[1][1]), # upper_left + (lat_lon_bound[2][0], lat_lon_bound[2][1]), # upper_right + (lat_lon_bound[3][0], lat_lon_bound[3][1]), # lower_right + (lat_lon_bound[0][0], lat_lon_bound[0][1]), # lower_left + ])) + return polygon + + +def bound_point(latitude: float, longitude: float, lat_grid_resolution: float, lon_grid_resolution: float) -> t.List: + """Calculate the bound point based on latitude, longitude and grid resolution. + + Example :: + * - . - * + | | + . • . + | | + * - . - * + This function gives the `*` point in the above example. + """ + lat_in_bound = latitude in [90.0, -90.0] + lon_in_bound = longitude in [-180.0, 180.0] + + lat_range = get_lat_lon_range(latitude, "latitude", lat_in_bound, + lat_grid_resolution, lon_grid_resolution) + lon_range = get_lat_lon_range(longitude, "longitude", lon_in_bound, + lat_grid_resolution, lon_grid_resolution) + lower_left = [lon_range[1], lat_range[1]] + upper_left = [lon_range[1], lat_range[0]] + upper_right = [lon_range[0], lat_range[0]] + lower_right = [lon_range[0], lat_range[1]] + return [lower_left, upper_left, upper_right, lower_right] + + +def get_lat_lon_range(value: float, lat_lon: str, is_point_out_of_bound: bool, + lat_grid_resolution: float, lon_grid_resolution: float) -> t.List: + """Calculate the latitude, longitude point range point latitude, longitude and grid resolution. + + Example :: + * - . - * + | | + . • . + | | + * - . - * + This function gives the `.` point in the above example. + """ + if lat_lon == 'latitude': + if is_point_out_of_bound: + return [-90 + lat_grid_resolution, 90 - lat_grid_resolution] + else: + return [value + lat_grid_resolution, value - lat_grid_resolution] + else: + if is_point_out_of_bound: + return [-180 + lon_grid_resolution, 180 - lon_grid_resolution] + else: + return [value + lon_grid_resolution, value - lon_grid_resolution] diff --git a/weather_mv/loader_pipeline/bq_test.py b/weather_mv/loader_pipeline/bq_test.py index e7a6e5b6..ed96cd9d 100644 --- a/weather_mv/loader_pipeline/bq_test.py +++ b/weather_mv/loader_pipeline/bq_test.py @@ -29,6 +29,7 @@ DEFAULT_IMPORT_TIME, dataset_to_table_schema, fetch_geo_point, + fetch_geo_polygon, ToBigQuery, ) from .sinks_test import TestDataBase, _handle_missing_grib_be @@ -74,6 +75,7 @@ def test_schema_generation(self): SchemaField('data_uri', 'STRING', 'NULLABLE', None, (), None), SchemaField('data_first_step', 'TIMESTAMP', 'NULLABLE', None, (), None), SchemaField('geo_point', 'GEOGRAPHY', 'NULLABLE', None, (), None), + SchemaField('geo_polygon', 'STRING', 'NULLABLE', None, (), None) ] self.assertListEqual(schema, expected_schema) @@ -89,6 +91,7 @@ def test_schema_generation__with_schema_normalization(self): SchemaField('data_uri', 'STRING', 'NULLABLE', None, (), None), SchemaField('data_first_step', 'TIMESTAMP', 'NULLABLE', None, (), None), SchemaField('geo_point', 'GEOGRAPHY', 'NULLABLE', None, (), None), + SchemaField('geo_polygon', 'STRING', 'NULLABLE', None, (), None) ] self.assertListEqual(schema, expected_schema) @@ -104,6 +107,7 @@ def test_schema_generation__with_target_columns(self): SchemaField('data_uri', 'STRING', 'NULLABLE', None, (), None), SchemaField('data_first_step', 'TIMESTAMP', 'NULLABLE', None, (), None), SchemaField('geo_point', 'GEOGRAPHY', 'NULLABLE', None, (), None), + SchemaField('geo_polygon', 'STRING', 'NULLABLE', None, (), None) ] self.assertListEqual(schema, expected_schema) @@ -119,6 +123,7 @@ def test_schema_generation__with_target_columns__with_schema_normalization(self) SchemaField('data_uri', 'STRING', 'NULLABLE', None, (), None), SchemaField('data_first_step', 'TIMESTAMP', 'NULLABLE', None, (), None), SchemaField('geo_point', 'GEOGRAPHY', 'NULLABLE', None, (), None), + SchemaField('geo_polygon', 'STRING', 'NULLABLE', None, (), None) ] self.assertListEqual(schema, expected_schema) @@ -135,6 +140,7 @@ def test_schema_generation__no_targets_specified(self): SchemaField('data_uri', 'STRING', 'NULLABLE', None, (), None), SchemaField('data_first_step', 'TIMESTAMP', 'NULLABLE', None, (), None), SchemaField('geo_point', 'GEOGRAPHY', 'NULLABLE', None, (), None), + SchemaField('geo_polygon', 'STRING', 'NULLABLE', None, (), None) ] self.assertListEqual(schema, expected_schema) @@ -151,6 +157,7 @@ def test_schema_generation__no_targets_specified__with_schema_normalization(self SchemaField('data_uri', 'STRING', 'NULLABLE', None, (), None), SchemaField('data_first_step', 'TIMESTAMP', 'NULLABLE', None, (), None), SchemaField('geo_point', 'GEOGRAPHY', 'NULLABLE', None, (), None), + SchemaField('geo_polygon', 'STRING', 'NULLABLE', None, (), None) ] self.assertListEqual(schema, expected_schema) @@ -184,6 +191,7 @@ def test_schema_generation__non_index_coords(self): SchemaField('data_uri', 'STRING', 'NULLABLE', None, (), None), SchemaField('data_first_step', 'TIMESTAMP', 'NULLABLE', None, (), None), SchemaField('geo_point', 'GEOGRAPHY', 'NULLABLE', None, (), None), + SchemaField('geo_polygon', 'STRING', 'NULLABLE', None, (), None) ] self.assertListEqual(schema, expected_schema) @@ -193,14 +201,17 @@ class ExtractRowsTestBase(TestDataBase): def extract(self, data_path, *, variables=None, area=None, open_dataset_kwargs=None, import_time=DEFAULT_IMPORT_TIME, disable_grib_schema_normalization=False, - tif_metadata_for_datetime=None, zarr: bool = False, zarr_kwargs=None) -> t.Iterator[t.Dict]: + tif_metadata_for_datetime=None, zarr: bool = False, zarr_kwargs=None, + skip_creating_polygon: bool = False) -> t.Iterator[t.Dict]: if zarr_kwargs is None: zarr_kwargs = {} op = ToBigQuery.from_kwargs(first_uri=data_path, dry_run=True, zarr=zarr, zarr_kwargs=zarr_kwargs, - output_table='foo.bar.baz', variables=variables, area=area, - xarray_open_dataset_kwargs=open_dataset_kwargs, import_time=import_time, infer_schema=False, - tif_metadata_for_datetime=tif_metadata_for_datetime, skip_region_validation=True, - disable_grib_schema_normalization=disable_grib_schema_normalization, coordinate_chunk_size=1000) + output_table='foo.bar.baz', variables=variables, area=area, + xarray_open_dataset_kwargs=open_dataset_kwargs, import_time=import_time, + infer_schema=False, tif_metadata_for_datetime=tif_metadata_for_datetime, + skip_region_validation=True, + disable_grib_schema_normalization=disable_grib_schema_normalization, + coordinate_chunk_size=1000, skip_creating_polygon=skip_creating_polygon) coords = op.prepare_coordinates(data_path) for uri, chunk in coords: yield from op.extract_rows(uri, chunk) @@ -233,7 +244,7 @@ def setUp(self) -> None: self.test_data_path = f'{self.test_data_folder}/test_data_20180101.nc' def test_extract_rows(self): - actual = next(self.extract(self.test_data_path)) + actual = next(self.extract(self.test_data_path, skip_creating_polygon=True)) expected = { 'd2m': 242.3035430908203, 'data_import_time': '1970-01-01T00:00:00+00:00', @@ -245,6 +256,7 @@ def test_extract_rows(self): 'u10': 3.4776244163513184, 'v10': 0.03294110298156738, 'geo_point': geojson.dumps(geojson.Point((-108.0, 49.0))), + 'geo_polygon': None } self.assertRowsEqual(actual, expected) @@ -259,11 +271,15 @@ def test_extract_rows__with_subset_variables(self): 'time': '2018-01-02T06:00:00+00:00', 'u10': 3.4776244163513184, 'geo_point': geojson.dumps(geojson.Point((-108.0, 49.0))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (-108.098837, 48.900826), (-108.098837, 49.099174), + (-107.901163, 49.099174), (-107.901163, 48.900826), + (-108.098837, 48.900826)])) } self.assertRowsEqual(actual, expected) def test_extract_rows__specific_area(self): - actual = next(self.extract(self.test_data_path, area=[45, -103, 33, -92])) + actual = next(self.extract(self.test_data_path, area=[45, -103, 33, -92], skip_creating_polygon=True)) expected = { 'd2m': 246.19993591308594, 'data_import_time': '1970-01-01T00:00:00+00:00', @@ -275,6 +291,7 @@ def test_extract_rows__specific_area(self): 'u10': 2.73445987701416, 'v10': 0.08277571201324463, 'geo_point': geojson.dumps(geojson.Point((-103.0, 45.0))), + 'geo_polygon': None } self.assertRowsEqual(actual, expected) @@ -291,6 +308,10 @@ def test_extract_rows__specific_area_float_points(self): 'u10': 3.94743275642395, 'v10': -0.19749987125396729, 'geo_point': geojson.dumps(geojson.Point((-103.400002, 45.200001))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (-103.498839, 45.100827), (-103.498839, 45.299174), + (-103.301164, 45.299174), (-103.301164, 45.100827), + (-103.498839, 45.100827)])) } self.assertRowsEqual(actual, expected) @@ -307,7 +328,11 @@ def test_extract_rows__specify_import_time(self): 'time': '2018-01-02T06:00:00+00:00', 'u10': 3.4776244163513184, 'v10': 0.03294110298156738, - 'geo_point': geojson.dumps(geojson.Point((-108.0, 49.0))) + 'geo_point': geojson.dumps(geojson.Point((-108.0, 49.0))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (-108.098837, 48.900826), (-108.098837, 49.099174), + (-107.901163, 49.099174), (-107.901163, 48.900826), + (-108.098837, 48.900826)])) } self.assertRowsEqual(actual, expected) @@ -324,7 +349,8 @@ def test_extract_rows_single_point(self): 'time': '2018-01-02T06:00:00+00:00', 'u10': 3.4776244163513184, 'v10': 0.03294110298156738, - 'geo_point': geojson.dumps(geojson.Point((-108.0, 49.0))) + 'geo_point': geojson.dumps(geojson.Point((-108.0, 49.0))), + 'geo_polygon': None } self.assertRowsEqual(actual, expected) @@ -342,12 +368,16 @@ def test_extract_rows_nan(self): 'u10': None, 'v10': 0.03294110298156738, 'geo_point': geojson.dumps(geojson.Point((-108.0, 49.0))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (-108.098837, 48.900826), (-108.098837, 49.099174), + (-107.901163, 49.099174), (-107.901163, 48.900826), + (-108.098837, 48.900826)])) } self.assertRowsEqual(actual, expected) - def test_extract_rows__with_valid_lat_long(self): - valid_lat_long = [[-90, -360], [-90, -359], [-45, -180], [-45, -45], [0, 0], [45, 45], [45, 180], [90, 359], - [90, 360]] + def test_extract_rows__with_valid_lat_long_with_point(self): + valid_lat_long = [[-90, 0], [-90, 1], [-45, -180], [-45, -45], [0, 0], [45, 45], [45, -180], [90, -1], + [90, 0]] actual_val = [ '{"type": "Point", "coordinates": [0, -90]}', '{"type": "Point", "coordinates": [1, -90]}', @@ -364,7 +394,31 @@ def test_extract_rows__with_valid_lat_long(self): expected = fetch_geo_point(lat, long) self.assertEqual(actual, expected) - def test_extract_rows__with_invalid_lat(self): + def test_extract_rows__with_valid_lat_long_with_polygon(self): + valid_lat_long = [[-90, 0], [-90, -180], [-45, -180], [-45, 180], [0, 0], [90, 180], [45, -180], [-90, 180], + [90, 1], [0, 180], [1, -180], [90, -180]] + actual_val = [ + '{"type": "Polygon", "coordinates": [[-1, 89], [-1, -89], [1, -89], [1, 89], [-1, 89]]}', + '{"type": "Polygon", "coordinates": [[179, 89], [179, -89], [-179, -89], [-179, 89], [179, 89]]}', + '{"type": "Polygon", "coordinates": [[179, -46], [179, -44], [-179, -44], [-179, -46], [179, -46]]}', + '{"type": "Polygon", "coordinates": [[179, -46], [179, -44], [-179, -44], [-179, -46], [179, -46]]}', + '{"type": "Polygon", "coordinates": [[-1, -1], [-1, 1], [1, 1], [1, -1], [-1, -1]]}', + '{"type": "Polygon", "coordinates": [[179, 89], [179, -89], [-179, -89], [-179, 89], [179, 89]]}', + '{"type": "Polygon", "coordinates": [[179, 44], [179, 46], [-179, 46], [-179, 44], [179, 44]]}', + '{"type": "Polygon", "coordinates": [[179, 89], [179, -89], [-179, -89], [-179, 89], [179, 89]]}', + '{"type": "Polygon", "coordinates": [[0, 89], [0, -89], [2, -89], [2, 89], [0, 89]]}', + '{"type": "Polygon", "coordinates": [[179, -1], [179, 1], [-179, 1], [-179, -1], [179, -1]]}', + '{"type": "Polygon", "coordinates": [[179, 0], [179, 2], [-179, 2], [-179, 0], [179, 0]]}', + '{"type": "Polygon", "coordinates": [[179, 89], [179, -89], [-179, -89], [-179, 89], [179, 89]]}' + ] + lat_grid_resolution = 1 + lon_grid_resolution = 1 + for actual, (lat, long) in zip(actual_val, valid_lat_long): + with self.subTest(): + expected = fetch_geo_polygon(lat, long, lat_grid_resolution, lon_grid_resolution) + self.assertEqual(actual, expected) + + def test_extract_rows__with_invalid_lat_lon(self): invalid_lat_long = [[-100, -2000], [-100, -500], [100, 500], [100, 2000]] for (lat, long) in invalid_lat_long: with self.subTest(): @@ -384,12 +438,16 @@ def test_extract_rows_zarr(self): 'longitude': 0, 'time': '1959-01-01T00:00:00+00:00', 'geo_point': geojson.dumps(geojson.Point((0.0, 90.0))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (-0.124913, 89.875173), (-0.124913, -89.875173), + (0.124913, -89.875173), (0.124913, 89.875173), + (-0.124913, 89.875173)])) } self.assertRowsEqual(actual, expected) def test_droping_variable_while_opening_zarr(self): input_path = os.path.join(self.test_data_folder, 'test_data.zarr') - actual = next(self.extract(input_path, zarr=True, zarr_kwargs={ 'drop_variables': ['cape'] })) + actual = next(self.extract(input_path, zarr=True, zarr_kwargs={'drop_variables': ['cape']})) expected = { 'd2m': 237.5404052734375, 'data_import_time': '1970-01-01T00:00:00+00:00', @@ -399,6 +457,10 @@ def test_droping_variable_while_opening_zarr(self): 'longitude': 0, 'time': '1959-01-01T00:00:00+00:00', 'geo_point': geojson.dumps(geojson.Point((0.0, 90.0))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (-0.124913, 89.875173), (-0.124913, -89.875173), + (0.124913, -89.875173), (0.124913, 89.875173), + (-0.124913, 89.875173)])) } self.assertRowsEqual(actual, expected) @@ -420,7 +482,11 @@ def test_extract_rows(self): 'latitude': 42.09783344918844, 'longitude': -123.66686981141397, 'time': '2020-07-01T00:00:00+00:00', - 'geo_point': geojson.dumps(geojson.Point((-123.66687, 42.097833))) + 'geo_point': geojson.dumps(geojson.Point((-123.66687, 42.097833))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (-123.669853, 42.095605), (-123.669853, 42.100066), + (-123.663885, 42.100066), (-123.663885, 42.095605), + (-123.669853, 42.095605)])) } self.assertRowsEqual(actual, expected) @@ -447,6 +513,10 @@ def test_extract_rows(self): 'valid_time': '2021-10-18T06:00:00+00:00', 'z': 1.42578125, 'geo_point': geojson.dumps(geojson.Point((-180.0, 90.0))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (179.950014, 89.950028), (179.950014, -89.950028), + (-179.950014, -89.950028), (-179.950014, 89.950028), + (179.950014, 89.950028)])) } self.assertRowsEqual(actual, expected) @@ -461,6 +531,10 @@ def test_extract_rows__with_vars__excludes_non_index_coords__without_schema_norm 'longitude': -180.0, 'z': 1.42578125, 'geo_point': geojson.dumps(geojson.Point((-180.0, 90.0))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (179.950014, 89.950028), (179.950014, -89.950028), + (-179.950014, -89.950028), (-179.950014, 89.950028), + (179.950014, 89.950028)])) } self.assertRowsEqual(actual, expected) @@ -477,6 +551,10 @@ def test_extract_rows__with_vars__includes_coordinates_in_vars__without_schema_n 'step': 0, 'z': 1.42578125, 'geo_point': geojson.dumps(geojson.Point((-180.0, 90.0))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (179.950014, 89.950028), (179.950014, -89.950028), + (-179.950014, -89.950028), (-179.950014, 89.950028), + (179.950014, 89.950028)])) } self.assertRowsEqual(actual, expected) @@ -491,6 +569,10 @@ def test_extract_rows__with_vars__excludes_non_index_coords__with_schema_normali 'longitude': -180.0, 'surface_0_00_instant_z': 1.42578125, 'geo_point': geojson.dumps(geojson.Point((-180.0, 90.0))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (179.950014, 89.950028), (179.950014, -89.950028), + (-179.950014, -89.950028), (-179.950014, 89.950028), + (179.950014, 89.950028)])) } self.assertRowsEqual(actual, expected) @@ -506,6 +588,10 @@ def test_extract_rows__with_vars__includes_coordinates_in_vars__with_schema_norm 'step': 0, 'surface_0_00_instant_z': 1.42578125, 'geo_point': geojson.dumps(geojson.Point((-180.0, 90.0))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (179.950014, 89.950028), (179.950014, -89.950028), + (-179.950014, -89.950028), (-179.950014, 89.950028), + (179.950014, 89.950028)])) } self.assertRowsEqual(actual, expected) @@ -559,6 +645,10 @@ def test_multiple_editions__without_schema_normalization(self): 'v200': -3.6647186279296875, 'valid_time': '2021-12-10T20:00:00+00:00', 'geo_point': geojson.dumps(geojson.Point((-180.0, 90.0))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (179.950014, 89.950028), (179.950014, -89.950028), + (-179.950014, -89.950028), (-179.950014, 89.950028), + (179.950014, 89.950028)])) } self.assertRowsEqual(actual, expected) @@ -614,7 +704,11 @@ def test_multiple_editions__with_schema_normalization(self): 'surface_0_00_instant_tprate': 0.0, 'surface_0_00_instant_ceil': 179.17018127441406, 'valid_time': '2021-12-10T20:00:00+00:00', - 'geo_point': geojson.dumps(geojson.Point((-180.0, 90.0))) + 'geo_point': geojson.dumps(geojson.Point((-180.0, 90.0))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (179.950014, 89.950028), (179.950014, -89.950028), + (-179.950014, -89.950028), (-179.950014, 89.950028), + (179.950014, 89.950028)])) } self.assertRowsEqual(actual, expected) @@ -634,6 +728,11 @@ def test_multiple_editions__with_vars__includes_coordinates_in_vars__with_schema 'depthBelowLandLayer_0_00_instant_stl1': 251.02520751953125, 'depthBelowLandLayer_7_00_instant_stl2': 253.54124450683594, 'geo_point': geojson.dumps(geojson.Point((-180.0, 90.0))), + 'geo_polygon': geojson.dumps(geojson.Polygon([ + (179.950014, 89.950028), (179.950014, -89.950028), + (-179.950014, -89.950028), (-179.950014, 89.950028), + (179.950014, 89.950028)])) + } self.assertRowsEqual(actual, expected) diff --git a/weather_mv/loader_pipeline/pipeline_test.py b/weather_mv/loader_pipeline/pipeline_test.py index 09e53b43..6a18e59d 100644 --- a/weather_mv/loader_pipeline/pipeline_test.py +++ b/weather_mv/loader_pipeline/pipeline_test.py @@ -66,6 +66,7 @@ def setUp(self) -> None: 'zarr': False, 'zarr_kwargs': {}, 'log_level': 2, + 'skip_creating_polygon': False, } diff --git a/weather_mv/setup.py b/weather_mv/setup.py index 26201eef..12bd63fe 100644 --- a/weather_mv/setup.py +++ b/weather_mv/setup.py @@ -62,7 +62,7 @@ packages=find_packages(), author='Anthromets', author_email='anthromets-ecmwf@google.com', - version='0.2.15', + version='0.2.16', url='https://weather-tools.readthedocs.io/en/latest/weather_mv/', description='A tool to load weather data into BigQuery.', install_requires=beam_gcp_requirements + base_requirements,