Skip to content

Commit

Permalink
Ingested grid point as a polygon in BigQuery. (#337)
Browse files Browse the repository at this point in the history
* Ingest grid points as Polygon in the BigQuery.

* Refactor the code for Polygon.

* Refactor code related to polygon.

* Fixed fetch_geo_polygon method.

* Updated grid resolution of the polygon.

* Test cases added for the Polygon.

* Updated code of the Polygon.

* Conflicts resolved.

* Updated Testcase after conflict resolve.

* Cover Corner points in BQ polygon and Updated test cases.

* Syntax updated.

* Syntax Updated and argument parser added.

* Test cases added and updated.

* Update polygon in test cases.

* Example added, updated logic and  for the polygon

* 'skip_creating_polygon' is updated in README.md.

* Add nits.

* Weather-mv version bumping to 0.2.16.
  • Loading branch information
dabhicusp authored Jul 11, 2023
1 parent 34099ba commit 952631d
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 20 deletions.
15 changes: 14 additions & 1 deletion weather_mv/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ usage: weather-mv bigquery [-h] -i URIS [--topic TOPIC] [--window_size WINDOW_SI
[--import_time IMPORT_TIME] [--infer_schema]
[--xarray_open_dataset_kwargs XARRAY_OPEN_DATASET_KWARGS]
[--tif_metadata_for_datetime TIF_METADATA_FOR_DATETIME] [-s]
[--coordinate_chunk_size COORDINATE_CHUNK_SIZE]
[--coordinate_chunk_size COORDINATE_CHUNK_SIZE] ['--skip_creating_polygon']
```

The `bigquery` subcommand loads weather data into BigQuery. In addition to the common options above, users may specify
Expand All @@ -81,6 +81,9 @@ _Command options_:
* `--tif_metadata_for_datetime` : Metadata that contains tif file's timestamp. Applicable only for tif files.
* `-s, --skip-region-validation` : Skip validation of regions for data migration. Default: off.
* `--disable_grib_schema_normalization` : To disable grib's schema normalization. Default: off.
* `--skip_creating_polygon` : Not ingest grid points as polygons in BigQuery. Default: Ingest grid points as Polygon in
BigQuery. Note: This feature relies on the assumption that the provided grid has an equal distance between consecutive
points of latitude and longitude.

Invoke with `bq -h` or `bigquery --help` to see the full range of options.

Expand Down Expand Up @@ -117,6 +120,16 @@ weather-mv bq --uris "gs://your-bucket/*.nc" \
--dry-run
```

Ingest grid points with skip creating polygon in BigQuery:

```bash
weather-mv bq --uris "gs://your-bucket/*.nc" \
--output_table $PROJECT.$DATASET_ID.$TABLE_ID \
--temp_location "gs://$BUCKET/tmp" \ # Needed for batch writes to BigQuery
--direct_num_workers 2 \
--skip_creating_polygon
```

Load COG's (.tif) files:

```bash
Expand Down
116 changes: 114 additions & 2 deletions weather_mv/loader_pipeline/bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
DATA_URI_COLUMN = 'data_uri'
DATA_FIRST_STEP = 'data_first_step'
GEO_POINT_COLUMN = 'geo_point'
GEO_POLYGON_COLUMN = 'geo_polygon'
LATITUDE_RANGE = (-90, 90)
LONGITUDE_RANGE = (-180, 180)


@dataclasses.dataclass
Expand Down Expand Up @@ -94,6 +96,9 @@ class ToBigQuery(ToDataSink):
skip_region_validation: bool
disable_grib_schema_normalization: bool
coordinate_chunk_size: int = 10_000
skip_creating_polygon: bool = False
lat_grid_resolution: t.Optional[float] = None
lon_grid_resolution: t.Optional[float] = None

@classmethod
def add_parser_arguments(cls, subparser: argparse.ArgumentParser):
Expand All @@ -105,6 +110,11 @@ def add_parser_arguments(cls, subparser: argparse.ArgumentParser):
'all data variables as columns.')
subparser.add_argument('-a', '--area', metavar='area', type=float, nargs='+', default=list(),
help='Target area in [N, W, S, E]. Default: Will include all available area.')
subparser.add_argument('--skip_creating_polygon', action='store_true',
help='Not ingest grid points as polygons in BigQuery. Default: Ingest grid points as '
'Polygon in BigQuery. Note: This feature relies on the assumption that the '
'provided grid has an equal distance between consecutive points of latitude and '
'longitude.')
subparser.add_argument('--import_time', type=str, default=datetime.datetime.utcnow().isoformat(),
help=("When writing data to BigQuery, record that data import occurred at this "
"time (format: YYYY-MM-DD HH:MM:SS.usec+offset). Default: now in UTC."))
Expand Down Expand Up @@ -154,6 +164,27 @@ def __post_init__(self):
with open_dataset(self.first_uri, self.xarray_open_dataset_kwargs,
self.disable_grib_schema_normalization, self.tif_metadata_for_datetime,
is_zarr=self.zarr) as open_ds:

if not self.skip_creating_polygon:
logger.warning("Assumes that equal distance between consecutive points of latitude "
"and longitude for the entire grid.")
# Find the grid_resolution.
if open_ds['latitude'].size > 1 and open_ds['longitude'].size > 1:
latitude_length = len(open_ds['latitude'])
longitude_length = len(open_ds['longitude'])

latitude_range = np.ptp(open_ds["latitude"].values)
longitude_range = np.ptp(open_ds["longitude"].values)

self.lat_grid_resolution = abs(latitude_range / latitude_length) / 2
self.lon_grid_resolution = abs(longitude_range / longitude_length) / 2

else:
self.skip_creating_polygon = True
logger.warning("Polygon can't be genereated as provided dataset has a only single grid point.")
else:
logger.info("Polygon is not created as '--skip_creating_polygon' flag passed.")

# Define table from user input
if self.variables and not self.infer_schema and not open_ds.attrs['is_normalized']:
logger.info('Creating schema from input variables.')
Expand Down Expand Up @@ -233,8 +264,14 @@ def extract_rows(self, uri: str, coordinates: t.List[t.Dict]) -> t.Iterator[t.Di
row[DATA_IMPORT_TIME_COLUMN] = self.import_time
row[DATA_URI_COLUMN] = uri
row[DATA_FIRST_STEP] = first_time_step
row[GEO_POINT_COLUMN] = fetch_geo_point(row['latitude'], row['longitude'])

longitude = ((row['longitude'] + 180) % 360) - 180
row[GEO_POINT_COLUMN] = fetch_geo_point(row['latitude'], longitude)
row[GEO_POLYGON_COLUMN] = (
fetch_geo_polygon(row["latitude"], longitude, self.lat_grid_resolution, self.lon_grid_resolution)
if not self.skip_creating_polygon
else None
)
# 'row' ends up looking like:
# {'latitude': 88.0, 'longitude': 2.0, 'time': '2015-01-01 06:00:00', 'd': -2.0187, 'cc': 0.007812,
# 'z': 50049.8, 'data_import_time': '2020-12-05 00:12:02.424573 UTC', ...}
Expand Down Expand Up @@ -301,6 +338,7 @@ def to_table_schema(columns: t.List[t.Tuple[str, str]]) -> t.List[bigquery.Schem
fields.append(bigquery.SchemaField(DATA_URI_COLUMN, 'STRING', mode='NULLABLE'))
fields.append(bigquery.SchemaField(DATA_FIRST_STEP, 'TIMESTAMP', mode='NULLABLE'))
fields.append(bigquery.SchemaField(GEO_POINT_COLUMN, 'GEOGRAPHY', mode='NULLABLE'))
fields.append(bigquery.SchemaField(GEO_POLYGON_COLUMN, 'STRING', mode='NULLABLE'))

return fields

Expand All @@ -309,6 +347,80 @@ def fetch_geo_point(lat: float, long: float) -> str:
"""Calculates a geography point from an input latitude and longitude."""
if lat > LATITUDE_RANGE[1] or lat < LATITUDE_RANGE[0]:
raise ValueError(f"Invalid latitude value '{lat}'")
long = ((long + 180) % 360) - 180
if long > LONGITUDE_RANGE[1] or long < LONGITUDE_RANGE[0]:
raise ValueError(f"Invalid longitude value '{long}'")
point = geojson.dumps(geojson.Point((long, lat)))
return point


def fetch_geo_polygon(latitude: float, longitude: float, lat_grid_resolution: float, lon_grid_resolution: float) -> str:
"""Create a Polygon based on latitude, longitude and resolution.
Example ::
* - . - *
| |
. • .
| |
* - . - *
In order to create the polygon, we require the `*` point as indicated in the above example.
To determine the position of the `*` point, we find the `.` point.
The `get_lat_lon_range` function gives the `.` point and `bound_point` gives the `*` point.
"""
lat_lon_bound = bound_point(latitude, longitude, lat_grid_resolution, lon_grid_resolution)
polygon = geojson.dumps(geojson.Polygon([
(lat_lon_bound[0][0], lat_lon_bound[0][1]), # lower_left
(lat_lon_bound[1][0], lat_lon_bound[1][1]), # upper_left
(lat_lon_bound[2][0], lat_lon_bound[2][1]), # upper_right
(lat_lon_bound[3][0], lat_lon_bound[3][1]), # lower_right
(lat_lon_bound[0][0], lat_lon_bound[0][1]), # lower_left
]))
return polygon


def bound_point(latitude: float, longitude: float, lat_grid_resolution: float, lon_grid_resolution: float) -> t.List:
"""Calculate the bound point based on latitude, longitude and grid resolution.
Example ::
* - . - *
| |
. • .
| |
* - . - *
This function gives the `*` point in the above example.
"""
lat_in_bound = latitude in [90.0, -90.0]
lon_in_bound = longitude in [-180.0, 180.0]

lat_range = get_lat_lon_range(latitude, "latitude", lat_in_bound,
lat_grid_resolution, lon_grid_resolution)
lon_range = get_lat_lon_range(longitude, "longitude", lon_in_bound,
lat_grid_resolution, lon_grid_resolution)
lower_left = [lon_range[1], lat_range[1]]
upper_left = [lon_range[1], lat_range[0]]
upper_right = [lon_range[0], lat_range[0]]
lower_right = [lon_range[0], lat_range[1]]
return [lower_left, upper_left, upper_right, lower_right]


def get_lat_lon_range(value: float, lat_lon: str, is_point_out_of_bound: bool,
lat_grid_resolution: float, lon_grid_resolution: float) -> t.List:
"""Calculate the latitude, longitude point range point latitude, longitude and grid resolution.
Example ::
* - . - *
| |
. • .
| |
* - . - *
This function gives the `.` point in the above example.
"""
if lat_lon == 'latitude':
if is_point_out_of_bound:
return [-90 + lat_grid_resolution, 90 - lat_grid_resolution]
else:
return [value + lat_grid_resolution, value - lat_grid_resolution]
else:
if is_point_out_of_bound:
return [-180 + lon_grid_resolution, 180 - lon_grid_resolution]
else:
return [value + lon_grid_resolution, value - lon_grid_resolution]
Loading

0 comments on commit 952631d

Please sign in to comment.