Skip to content

Commit

Permalink
Example added, updated logic and for the polygon
Browse files Browse the repository at this point in the history
  • Loading branch information
dabhicusp committed Jul 7, 2023
1 parent 5deaa5c commit 218694e
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 76 deletions.
77 changes: 54 additions & 23 deletions weather_mv/loader_pipeline/bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
GEO_POINT_COLUMN = 'geo_point'
GEO_POLYGON_COLUMN = 'geo_polygon'
LATITUDE_RANGE = (-90, 90)
LONGITUDE_RANGE = (-180, 180)


@dataclasses.dataclass
Expand Down Expand Up @@ -95,8 +96,7 @@ class ToBigQuery(ToDataSink):
skip_region_validation: bool
disable_grib_schema_normalization: bool
coordinate_chunk_size: int = 10_000
create_polygon: bool = False
should_create_polygon: bool = False
skip_creating_polygon: bool = False
lat_grid_resolution: t.Optional[float] = None
lon_grid_resolution: t.Optional[float] = None

Expand All @@ -110,7 +110,7 @@ def add_parser_arguments(cls, subparser: argparse.ArgumentParser):
'all data variables as columns.')
subparser.add_argument('-a', '--area', metavar='area', type=float, nargs='+', default=list(),
help='Target area in [N, W, S, E]. Default: Will include all available area.')
subparser.add_argument('--create_polygon', action='store_true',
subparser.add_argument('--skip_creating_polygon', action='store_true',
help='Ingest grid points as polygons in BigQuery. Default: Ingest grid points as normal '
'point in BigQuery. Note: This feature relies on the assumption that the provided '
'grid is regular.')
Expand Down Expand Up @@ -164,8 +164,9 @@ def __post_init__(self):
self.disable_grib_schema_normalization, self.tif_metadata_for_datetime,
is_zarr=self.zarr) as open_ds:

if self.create_polygon:
logger.warning("Assumes that Grid is regular.")
if not self.skip_creating_polygon:
logger.warning("Assumes that equal distance between consecutive points of latitude "
"and longitude for the entire grid.")
# Find the grid_resolution.
if open_ds['latitude'].size > 1 and open_ds['longitude'].size > 1:
latitude_length = len(open_ds['latitude'])
Expand All @@ -177,9 +178,11 @@ def __post_init__(self):
self.lat_grid_resolution = abs(latitude_range / latitude_length) / 2
self.lon_grid_resolution = abs(longitude_range / longitude_length) / 2

self.should_create_polygon = True
else:
self.skip_creating_polygon = True
logger.warning("Polygon can't be genereated as provided dataset has a only single grid point.")
else:
logger.info("Polygon is not created as '--skip_creating_polygon' flag passed.")

# Define table from user input
if self.variables and not self.infer_schema and not open_ds.attrs['is_normalized']:
Expand Down Expand Up @@ -262,7 +265,7 @@ def extract_rows(self, uri: str, coordinates: t.List[t.Dict]) -> t.Iterator[t.Di
row[GEO_POINT_COLUMN] = fetch_geo_point(row['latitude'], longitude)
row[GEO_POLYGON_COLUMN] = fetch_geo_polygon(row['latitude'], longitude,
self.lat_grid_resolution, self.lon_grid_resolution
) if self.should_create_polygon else None
) if not self.skip_creating_polygon else None
# 'row' ends up looking like:
# {'latitude': 88.0, 'longitude': 2.0, 'time': '2015-01-01 06:00:00', 'd': -2.0187, 'cc': 0.007812,
# 'z': 50049.8, 'data_import_time': '2020-12-05 00:12:02.424573 UTC', ...}
Expand Down Expand Up @@ -338,12 +341,24 @@ def fetch_geo_point(lat: float, long: float) -> str:
"""Calculates a geography point from an input latitude and longitude."""
if lat > LATITUDE_RANGE[1] or lat < LATITUDE_RANGE[0]:
raise ValueError(f"Invalid latitude value '{lat}'")
if long > LONGITUDE_RANGE[1] or long < LONGITUDE_RANGE[0]:
raise ValueError(f"Invalid longitude value '{long}'")
point = geojson.dumps(geojson.Point((long, lat)))
return point


def fetch_geo_polygon(latitude: float, longitude: float, lat_grid_resolution: float, lon_grid_resolution: float) -> str:
"""Create a Polygon based on latitude, longitude and resolution."""
"""Create a Polygon based on latitude, longitude and resolution.
Example ::
* - . - *
| |
. • .
| |
* - . - *
In order to create the polygon, we require the `*` point as indicated in the above example.
To determine the position of the `*` point, we find the `.` point.
The `get_lat_lon_range` function gives the `.` point and `bound_point` gives the `*` point.
"""
lat_lon_bound = bound_point(latitude, longitude, lat_grid_resolution, lon_grid_resolution)
polygon = geojson.dumps(geojson.Polygon([
(lat_lon_bound[0][0], lat_lon_bound[0][1]), # lower_left
Expand All @@ -355,8 +370,16 @@ def fetch_geo_polygon(latitude: float, longitude: float, lat_grid_resolution: fl
return polygon


def bound_point(latitude, longitude, lat_grid_resolution, lon_grid_resolution) -> t.List:
"""Calculate the bound point based on latitude, longitude and grid resolution."""
def bound_point(latitude: float, longitude: float, lat_grid_resolution: float, lon_grid_resolution: float) -> t.List:
"""Calculate the bound point based on latitude, longitude and grid resolution.
Example ::
* - . - *
| |
. • .
| |
* - . - *
This function gives the `*` point in the above example.
"""
lat_in_bound = latitude in [90.0, -90.0]
lon_in_bound = longitude in [-180.0, 180.0]

Expand All @@ -365,22 +388,30 @@ def bound_point(latitude, longitude, lat_grid_resolution, lon_grid_resolution) -
lon_range = get_lat_lon_range(longitude, "longitude", lon_in_bound,
lat_grid_resolution, lon_grid_resolution)
lower_left = [lon_range[1], lat_range[1]]
upper_left = [lon_range[0], lat_range[1]]
upper_left = [lon_range[1], lat_range[0]]
upper_right = [lon_range[0], lat_range[0]]
lower_right = [lon_range[1], lat_range[0]]
lower_right = [lon_range[0], lat_range[1]]
return [lower_left, upper_left, upper_right, lower_right]


def get_lat_lon_range(value, lat_lon, is_point_out_of_bound, lat_grid_resolution, lon_grid_resolution):
"""Calculate the latitude, longitude point range point latitude, longitude and grid resolution."""
if is_point_out_of_bound:
if lat_lon == 'latitude':
if value == -90.0:
return [90 - lat_grid_resolution, value + lat_grid_resolution]
return [value - lat_grid_resolution, -90 + lat_grid_resolution]
def get_lat_lon_range(value: float, lat_lon: str, is_point_out_of_bound: bool,
lat_grid_resolution: float, lon_grid_resolution: float) -> t.List:
"""Calculate the latitude, longitude point range point latitude, longitude and grid resolution.
Example ::
* - . - *
| |
. • .
| |
* - . - *
This function gives the `.` point in the above example.
"""
if lat_lon == 'latitude':
if is_point_out_of_bound:
return [-90 + lat_grid_resolution, 90 - lat_grid_resolution]
else:
if value == 180.0:
return [-180 + lon_grid_resolution, value - lon_grid_resolution]
return [value + lon_grid_resolution, 180 - lon_grid_resolution]
return [value + lat_grid_resolution, value - lat_grid_resolution]
else:
return [value + lat_grid_resolution, value - lat_grid_resolution]
if is_point_out_of_bound:
return [-180 + lon_grid_resolution, 180 - lon_grid_resolution]
else:
return [value + lon_grid_resolution, value - lon_grid_resolution]
Loading

0 comments on commit 218694e

Please sign in to comment.