Skip to content

Commit

Permalink
Merge branch 'google:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
Piyush-Ingale authored Jul 12, 2023
2 parents 189d973 + b5542c4 commit ffd7613
Show file tree
Hide file tree
Showing 17 changed files with 348 additions and 41 deletions.
13 changes: 13 additions & 0 deletions weather_dl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ _Common options_:
that partitions will be processed in sequential order of each config; 'fair' means that partitions from each config
will be interspersed evenly. Note: When using 'fair' scheduling, we recommend you set the '--partition-chunks' to a
much smaller number. Default: 'in-order'.
* `--log-level`: An integer to configure log level. Default: 2(INFO).
* `--use-local-code`: Supply local code to the Runner. Default: False.

> Note:
> * In case of BigQuery manifest tool will create the BQ table itself, if not already present.
Expand Down Expand Up @@ -93,6 +95,17 @@ weather-dl configs/mars_example_config.cfg \
--job_name $JOB_NAME
```

Using DataflowRunner and using local code for pipeline

```bash
weather-dl configs/mars_example_config.cfg \
--runner DataflowRunner \
--project $PROJECT \
--temp_location gs://$BUCKET/tmp \
--job_name $JOB_NAME \
--use-local-code
```

Using the DataflowRunner and specifying 3 requests per license

```bash
Expand Down
3 changes: 3 additions & 0 deletions weather_dl/download_pipeline/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,9 @@ def set_stage(self, stage: Stage) -> None:

if stage == Stage.FETCH:
new_status.fetch_start_time = current_utc_time
new_status.fetch_end_time = None
new_status.download_start_time = None
new_status.download_end_time = None
elif stage == Stage.RETRIEVE:
new_status.retrieve_start_time = current_utc_time
elif stage == Stage.DOWNLOAD:
Expand Down
1 change: 1 addition & 0 deletions weather_dl/download_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def run(argv: t.List[str], save_main_session: bool = True) -> PipelineArgs:
help="Update the manifest for the already downloaded shards and exit. Default: 'false'.")
parser.add_argument('--log-level', type=int, default=2,
help='An integer to configure log level. Default: 2(INFO)')
parser.add_argument('--use-local-code', action='store_true', default=False, help='Supply local code to the Runner.')

known_args, pipeline_args = parser.parse_known_args(argv[1:])

Expand Down
3 changes: 2 additions & 1 deletion weather_dl/download_pipeline/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@
schedule='in-order',
check_skip_in_dry_run=False,
update_manifest=False,
log_level=20),
log_level=20,
use_local_code=False),
pipeline_options=PipelineOptions('--save_main_session True'.split()),
configs=[Config.from_dict(CONFIG)],
client_name='cds',
Expand Down
2 changes: 1 addition & 1 deletion weather_dl/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
setup(
name='download_pipeline',
packages=find_packages(),
version='0.1.19',
version='0.1.20',
author='Anthromets',
author_email='anthromets-ecmwf@google.com',
url='https://weather-tools.readthedocs.io/en/latest/weather_dl/',
Expand Down
22 changes: 16 additions & 6 deletions weather_dl/weather-dl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import tempfile

import weather_dl

SDK_CONTAINER_IMAGE='gcr.io/weather-tools-prod/weather-tools:0.0.0'

if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)

Expand All @@ -48,10 +50,16 @@ if __name__ == '__main__':
from download_pipeline import cli
except ImportError as e:
raise ImportError('please re-install package in a clean python environment.') from e

if '-h' in sys.argv or '--help' in sys.argv or len(sys.argv) == 1:
cli()
else:

args = []

if "DataflowRunner" in sys.argv and "--sdk_container_image" not in sys.argv:
args.extend(['--sdk_container_image',
os.getenv('SDK_CONTAINER_IMAGE', SDK_CONTAINER_IMAGE),
'--experiments',
'use_runner_v2'])

if "--use-local-code" in sys.argv:
with tempfile.TemporaryDirectory() as tmpdir:
original_dir = os.getcwd()

Expand All @@ -72,5 +80,7 @@ if __name__ == '__main__':
# cleanup memory to prevent pickling error.
tar = None
weather_dl = None

cli(['--extra_package', pkg_archive])
args.extend(['--extra_package', pkg_archive])
cli(args)
else:
cli(args)
30 changes: 29 additions & 1 deletion weather_mv/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ _Common options_
* `--num_shards`: Number of shards to use when writing windowed elements to cloud storage. Only used with the `topic`
flag. Default: 5 shards.
* `-d, --dry-run`: Preview the load into BigQuery. Default: off.
* `--log-level`: An integer to configure log level. Default: 2(INFO).
* `--use-local-code`: Supply local code to the Runner. Default: False.

Invoke with `-h` or `--help` to see the full range of options.

Expand All @@ -60,7 +62,7 @@ usage: weather-mv bigquery [-h] -i URIS [--topic TOPIC] [--window_size WINDOW_SI
[--import_time IMPORT_TIME] [--infer_schema]
[--xarray_open_dataset_kwargs XARRAY_OPEN_DATASET_KWARGS]
[--tif_metadata_for_datetime TIF_METADATA_FOR_DATETIME] [-s]
[--coordinate_chunk_size COORDINATE_CHUNK_SIZE]
[--coordinate_chunk_size COORDINATE_CHUNK_SIZE] ['--skip_creating_polygon']
```

The `bigquery` subcommand loads weather data into BigQuery. In addition to the common options above, users may specify
Expand All @@ -81,6 +83,9 @@ _Command options_:
* `--tif_metadata_for_datetime` : Metadata that contains tif file's timestamp. Applicable only for tif files.
* `-s, --skip-region-validation` : Skip validation of regions for data migration. Default: off.
* `--disable_grib_schema_normalization` : To disable grib's schema normalization. Default: off.
* `--skip_creating_polygon` : Not ingest grid points as polygons in BigQuery. Default: Ingest grid points as Polygon in
BigQuery. Note: This feature relies on the assumption that the provided grid has an equal distance between consecutive
points of latitude and longitude.

Invoke with `bq -h` or `bigquery --help` to see the full range of options.

Expand Down Expand Up @@ -117,6 +122,16 @@ weather-mv bq --uris "gs://your-bucket/*.nc" \
--dry-run
```

Ingest grid points with skip creating polygon in BigQuery:

```bash
weather-mv bq --uris "gs://your-bucket/*.nc" \
--output_table $PROJECT.$DATASET_ID.$TABLE_ID \
--temp_location "gs://$BUCKET/tmp" \ # Needed for batch writes to BigQuery
--direct_num_workers 2 \
--skip_creating_polygon
```

Load COG's (.tif) files:

```bash
Expand Down Expand Up @@ -169,6 +184,19 @@ weather-mv bq --uris "gs://your-bucket/*.nc" \
--job_name $JOB_NAME
```

Using DataflowRunner and using local code for pipeline

```bash
weather-mv bq --uris "gs://your-bucket/*.nc" \
--output_table $PROJECT.$DATASET_ID.$TABLE_ID \
--runner DataflowRunner \
--project $PROJECT \
--region $REGION \
--temp_location "gs://$BUCKET/tmp" \
--job_name $JOB_NAME \
--use-local-code
```

For a full list of how to configure the Dataflow pipeline, please review
[this table](https://cloud.google.com/dataflow/docs/reference/pipeline-options).

Expand Down
116 changes: 114 additions & 2 deletions weather_mv/loader_pipeline/bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
DATA_URI_COLUMN = 'data_uri'
DATA_FIRST_STEP = 'data_first_step'
GEO_POINT_COLUMN = 'geo_point'
GEO_POLYGON_COLUMN = 'geo_polygon'
LATITUDE_RANGE = (-90, 90)
LONGITUDE_RANGE = (-180, 180)


@dataclasses.dataclass
Expand Down Expand Up @@ -94,6 +96,9 @@ class ToBigQuery(ToDataSink):
skip_region_validation: bool
disable_grib_schema_normalization: bool
coordinate_chunk_size: int = 10_000
skip_creating_polygon: bool = False
lat_grid_resolution: t.Optional[float] = None
lon_grid_resolution: t.Optional[float] = None

@classmethod
def add_parser_arguments(cls, subparser: argparse.ArgumentParser):
Expand All @@ -105,6 +110,11 @@ def add_parser_arguments(cls, subparser: argparse.ArgumentParser):
'all data variables as columns.')
subparser.add_argument('-a', '--area', metavar='area', type=float, nargs='+', default=list(),
help='Target area in [N, W, S, E]. Default: Will include all available area.')
subparser.add_argument('--skip_creating_polygon', action='store_true',
help='Not ingest grid points as polygons in BigQuery. Default: Ingest grid points as '
'Polygon in BigQuery. Note: This feature relies on the assumption that the '
'provided grid has an equal distance between consecutive points of latitude and '
'longitude.')
subparser.add_argument('--import_time', type=str, default=datetime.datetime.utcnow().isoformat(),
help=("When writing data to BigQuery, record that data import occurred at this "
"time (format: YYYY-MM-DD HH:MM:SS.usec+offset). Default: now in UTC."))
Expand Down Expand Up @@ -158,6 +168,27 @@ def __post_init__(self):
with open_dataset(self.first_uri, self.xarray_open_dataset_kwargs,
self.disable_grib_schema_normalization, self.tif_metadata_for_datetime,
is_zarr=self.zarr) as open_ds:

if not self.skip_creating_polygon:
logger.warning("Assumes that equal distance between consecutive points of latitude "
"and longitude for the entire grid.")
# Find the grid_resolution.
if open_ds['latitude'].size > 1 and open_ds['longitude'].size > 1:
latitude_length = len(open_ds['latitude'])
longitude_length = len(open_ds['longitude'])

latitude_range = np.ptp(open_ds["latitude"].values)
longitude_range = np.ptp(open_ds["longitude"].values)

self.lat_grid_resolution = abs(latitude_range / latitude_length) / 2
self.lon_grid_resolution = abs(longitude_range / longitude_length) / 2

else:
self.skip_creating_polygon = True
logger.warning("Polygon can't be genereated as provided dataset has a only single grid point.")
else:
logger.info("Polygon is not created as '--skip_creating_polygon' flag passed.")

# Define table from user input
if self.variables and not self.infer_schema and not open_ds.attrs['is_normalized']:
logger.info('Creating schema from input variables.')
Expand Down Expand Up @@ -237,8 +268,14 @@ def extract_rows(self, uri: str, coordinates: t.List[t.Dict]) -> t.Iterator[t.Di
row[DATA_IMPORT_TIME_COLUMN] = self.import_time
row[DATA_URI_COLUMN] = uri
row[DATA_FIRST_STEP] = first_time_step
row[GEO_POINT_COLUMN] = fetch_geo_point(row['latitude'], row['longitude'])

longitude = ((row['longitude'] + 180) % 360) - 180
row[GEO_POINT_COLUMN] = fetch_geo_point(row['latitude'], longitude)
row[GEO_POLYGON_COLUMN] = (
fetch_geo_polygon(row["latitude"], longitude, self.lat_grid_resolution, self.lon_grid_resolution)
if not self.skip_creating_polygon
else None
)
# 'row' ends up looking like:
# {'latitude': 88.0, 'longitude': 2.0, 'time': '2015-01-01 06:00:00', 'd': -2.0187, 'cc': 0.007812,
# 'z': 50049.8, 'data_import_time': '2020-12-05 00:12:02.424573 UTC', ...}
Expand Down Expand Up @@ -305,6 +342,7 @@ def to_table_schema(columns: t.List[t.Tuple[str, str]]) -> t.List[bigquery.Schem
fields.append(bigquery.SchemaField(DATA_URI_COLUMN, 'STRING', mode='NULLABLE'))
fields.append(bigquery.SchemaField(DATA_FIRST_STEP, 'TIMESTAMP', mode='NULLABLE'))
fields.append(bigquery.SchemaField(GEO_POINT_COLUMN, 'GEOGRAPHY', mode='NULLABLE'))
fields.append(bigquery.SchemaField(GEO_POLYGON_COLUMN, 'STRING', mode='NULLABLE'))

return fields

Expand All @@ -313,6 +351,80 @@ def fetch_geo_point(lat: float, long: float) -> str:
"""Calculates a geography point from an input latitude and longitude."""
if lat > LATITUDE_RANGE[1] or lat < LATITUDE_RANGE[0]:
raise ValueError(f"Invalid latitude value '{lat}'")
long = ((long + 180) % 360) - 180
if long > LONGITUDE_RANGE[1] or long < LONGITUDE_RANGE[0]:
raise ValueError(f"Invalid longitude value '{long}'")
point = geojson.dumps(geojson.Point((long, lat)))
return point


def fetch_geo_polygon(latitude: float, longitude: float, lat_grid_resolution: float, lon_grid_resolution: float) -> str:
"""Create a Polygon based on latitude, longitude and resolution.
Example ::
* - . - *
| |
. • .
| |
* - . - *
In order to create the polygon, we require the `*` point as indicated in the above example.
To determine the position of the `*` point, we find the `.` point.
The `get_lat_lon_range` function gives the `.` point and `bound_point` gives the `*` point.
"""
lat_lon_bound = bound_point(latitude, longitude, lat_grid_resolution, lon_grid_resolution)
polygon = geojson.dumps(geojson.Polygon([
(lat_lon_bound[0][0], lat_lon_bound[0][1]), # lower_left
(lat_lon_bound[1][0], lat_lon_bound[1][1]), # upper_left
(lat_lon_bound[2][0], lat_lon_bound[2][1]), # upper_right
(lat_lon_bound[3][0], lat_lon_bound[3][1]), # lower_right
(lat_lon_bound[0][0], lat_lon_bound[0][1]), # lower_left
]))
return polygon


def bound_point(latitude: float, longitude: float, lat_grid_resolution: float, lon_grid_resolution: float) -> t.List:
"""Calculate the bound point based on latitude, longitude and grid resolution.
Example ::
* - . - *
| |
. • .
| |
* - . - *
This function gives the `*` point in the above example.
"""
lat_in_bound = latitude in [90.0, -90.0]
lon_in_bound = longitude in [-180.0, 180.0]

lat_range = get_lat_lon_range(latitude, "latitude", lat_in_bound,
lat_grid_resolution, lon_grid_resolution)
lon_range = get_lat_lon_range(longitude, "longitude", lon_in_bound,
lat_grid_resolution, lon_grid_resolution)
lower_left = [lon_range[1], lat_range[1]]
upper_left = [lon_range[1], lat_range[0]]
upper_right = [lon_range[0], lat_range[0]]
lower_right = [lon_range[0], lat_range[1]]
return [lower_left, upper_left, upper_right, lower_right]


def get_lat_lon_range(value: float, lat_lon: str, is_point_out_of_bound: bool,
lat_grid_resolution: float, lon_grid_resolution: float) -> t.List:
"""Calculate the latitude, longitude point range point latitude, longitude and grid resolution.
Example ::
* - . - *
| |
. • .
| |
* - . - *
This function gives the `.` point in the above example.
"""
if lat_lon == 'latitude':
if is_point_out_of_bound:
return [-90 + lat_grid_resolution, 90 - lat_grid_resolution]
else:
return [value + lat_grid_resolution, value - lat_grid_resolution]
else:
if is_point_out_of_bound:
return [-180 + lon_grid_resolution, 180 - lon_grid_resolution]
else:
return [value + lon_grid_resolution, value - lon_grid_resolution]
Loading

0 comments on commit ffd7613

Please sign in to comment.