Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added support for source data with multiple time dimensions #340

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions weather_mv/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,8 @@ _Command options_:
* `--initialization_time_regex`: A Regex string to get the initialization time from the filename.
* `--forecast_time_regex`: A Regex string to get the forecast/end time from the filename.
* `--group_common_hypercubes`: A flag that allows to split up large grib files into multiple level-wise ImageCollections / COGS.
* `--tiff_config`: Configs to handle source data with more than two dimensions. It is a JSON string containing key as `dims` (array with dimensions). Based on dimensions in `dims` separate tiff files will be created. By default multiple tiff files will be generated if source data has multiple time or step values. For grib files with multiple datasets (It is not necessary that every dataset will have same dimensions so some datasets may contain dimension some may not contain) if dataset does not contain dimension provided in `dims` then that dataset will be stored as bands.
`e.g {"dims": ["isobaricInhPa"]}`

Invoke with `ee -h` or `earthengine --help` to see the full range of options.

Expand Down
3 changes: 3 additions & 0 deletions weather_mv/loader_pipeline/bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ def validate_arguments(cls, known_args: argparse.Namespace, pipeline_args: t.Lis
if known_args.area:
assert len(known_args.area) == 4, 'Must specify exactly 4 lat/long values for area: N, W, S, E boundaries.'

# Add a check for tiff_config.
if pipeline_options_dict.get('tiff_config'):
raise RuntimeError('--tiff_config can be specified only for earth engine ingestions.')
# Add a check for group_common_hypercubes.
if pipeline_options_dict.get('group_common_hypercubes'):
raise RuntimeError('--group_common_hypercubes can be specified only for earth engine ingestions.')
Expand Down
52 changes: 36 additions & 16 deletions weather_mv/loader_pipeline/ee.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ class ToEarthEngine(ToDataSink):
band_names_mapping: str
initialization_time_regex: str
forecast_time_regex: str
tiff_config: t.Dict

@classmethod
def add_parser_arguments(cls, subparser: argparse.ArgumentParser):
Expand Down Expand Up @@ -283,6 +284,8 @@ def add_parser_arguments(cls, subparser: argparse.ArgumentParser):
help='A Regex string to get the initialization time from the filename.')
subparser.add_argument('--forecast_time_regex', type=str, default=None,
help='A Regex string to get the forecast/end time from the filename.')
subparser.add_argument('--tiff_config', type=json.loads, default={"dims":[]},
help='Config to create assets splitted by given dimensions.')

@classmethod
def validate_arguments(cls, known_args: argparse.Namespace, pipeline_args: t.List[str]) -> None:
Expand Down Expand Up @@ -398,8 +401,10 @@ def process(self, uri: str) -> t.Iterator[str]:

# Checks if the asset is already present in the GCS bucket or not.
target_path = os.path.join(
self.asset_location, f'{asset_name}{ASSET_TYPE_TO_EXTENSION_MAPPING[self.ee_asset_type]}')
if not self.force_overwrite and FileSystems.exists(target_path):
self.asset_location, f'{asset_name}*{ASSET_TYPE_TO_EXTENSION_MAPPING[self.ee_asset_type]}')
files = FileSystems.match([target_path])

if not self.force_overwrite and files[0].metadata_list:
logger.info(f'Asset file {target_path} already exists in GCS bucket. Skipping...')
return

Expand Down Expand Up @@ -430,6 +435,7 @@ class ConvertToAsset(beam.DoFn, beam.PTransform, KwargsFactoryMixin):
band_names_dict: t.Optional[t.Dict] = None
initialization_time_regex: t.Optional[str] = None
forecast_time_regex: t.Optional[str] = None
tiff_config: t.Optional[t.Dict] = None

def add_to_queue(self, queue: Queue, item: t.Any):
"""Adds a new item to the queue.
Expand All @@ -451,7 +457,8 @@ def convert_to_asset(self, queue: Queue, uri: str):
band_names_dict=self.band_names_dict,
initialization_time_regex=self.initialization_time_regex,
forecast_time_regex=self.forecast_time_regex,
group_common_hypercubes=self.group_common_hypercubes) as ds_list:
group_common_hypercubes=self.group_common_hypercubes,
tiff_config = self.tiff_config) as ds_list:
if not isinstance(ds_list, list):
ds_list = [ds_list]

Expand All @@ -460,34 +467,48 @@ def convert_to_asset(self, queue: Queue, uri: str):
data = list(ds.values())
asset_name = get_ee_safe_name(uri)
channel_names = [da.name for da in data]
start_time, end_time, is_normalized = (attrs.get(key) for key in
('start_time', 'end_time', 'is_normalized'))
start_time, end_time, is_normalized, forecast_hour = (attrs.get(key) for key in
('start_time', 'end_time', 'is_normalized', 'forecast_hour'))

dtype, crs, transform = (attrs.pop(key) for key in ['dtype', 'crs', 'transform'])
attrs.update({'is_normalized': str(is_normalized)}) # EE properties does not support bool.
# Adding job_start_time to properites.
attrs["job_start_time"] = job_start_time
# Make attrs EE ingestable.
attrs = make_attrs_ee_compatible(attrs)

if start_time:
st = re.sub("[^0-9]","",start_time)
asset_name = f"{asset_name}_{st}"
if forecast_hour:
asset_name = f"{asset_name}_FH-{forecast_hour}"
if self.tiff_config:
for var in set(self.tiff_config["dims"]).difference(['time','step']):
var_val = ds.get(var)
if var_val is not None:
if var_val >= 10:
asset_name = f"{asset_name}_{var}_{var_val.values:.0f}"
else:
asset_name = f"{asset_name}_{var}_{var_val.values:.2f}".replace('.', '_')
if self.group_common_hypercubes:
level, height = (attrs.pop(key) for key in ['level', 'height'])
safe_level_name = get_ee_safe_name(level)
asset_name = f'{asset_name}_{safe_level_name}'

asset_name = get_ee_safe_name(asset_name)
# For tiff ingestions.
if self.ee_asset_type == 'IMAGE':
file_name = f'{asset_name}.tiff'

with MemoryFile() as memfile:
with memfile.open(driver='COG',
dtype=dtype,
width=data[0].data.shape[1],
height=data[0].data.shape[0],
count=len(data),
nodata=np.nan,
crs=crs,
transform=transform,
compress='lzw') as f:
dtype=dtype,
width=data[0].data.shape[1],
height=data[0].data.shape[0],
count=len(data),
nodata=np.nan,
crs=crs,
transform=transform,
compress='lzw') as f:
for i, da in enumerate(data):
f.write(da, i+1)
# Making the channel name EE-safe before adding it as a band name.
Expand Down Expand Up @@ -539,11 +560,10 @@ def get_dims_data(index: int) -> t.List[t.Any]:
writer.writerows(
[get_dims_data(i) + list(row) for row in zip(
*[d[i:i + ROWS_PER_WRITE] for d in data]
)]
)]
)

upload(temp.name, target_path)

asset_data = AssetData(
name=asset_name,
target_path=target_path,
Expand Down
89 changes: 83 additions & 6 deletions weather_mv/loader_pipeline/ee_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,24 +65,98 @@ class ConvertToAssetTests(TestDataBase):
def setUp(self) -> None:
super().setUp()
self.tmpdir = tempfile.TemporaryDirectory()
self.convert_to_image_asset = ConvertToAsset(asset_location=self.tmpdir.name)
self.convert_to_table_asset = ConvertToAsset(asset_location=self.tmpdir.name, ee_asset_type='TABLE')
self.convert_to_image_asset = ConvertToAsset(
asset_location=self.tmpdir.name,
tiff_config={"dims": []},
)
self.convert_to_table_asset = ConvertToAsset(
asset_location=self.tmpdir.name,
ee_asset_type='TABLE'
)

def tearDown(self):
self.tmpdir.cleanup()

def test_convert_to_image_asset(self):
data_path = f'{self.test_data_folder}/test_data_grib_single_timestep'
asset_path = os.path.join(self.tmpdir.name, 'test_data_grib_single_timestep.tiff')
asset_path = os.path.join(self.tmpdir.name, 'test_data_grib_single_timestep_20211018060000.tiff')

next(self.convert_to_image_asset.process(data_path))

# The size of tiff is expected to be more than grib.
self.assertTrue(os.path.getsize(asset_path) > os.path.getsize(data_path))

def test_convert_to_multiple_image_assets(self):
data_path = f'{self.test_data_folder}/test_data_20180101.nc'
# file with multiple time values will generate separate tiff files
time_arr = ["20180102060000", "20180102070000", "20180102080000", "20180102090000",
"20180102100000", "20180102110000", "20180102120000", "20180102130000",
"20180102140000", "20180102150000", "20180102160000", "20180102170000",
"20180102180000", "20180102190000", "20180102200000", "20180102210000",
"20180102220000", "20180102230000", "20180103000000", "20180103010000",
"20180103020000", "20180103030000", "20180103040000", "20180103050000",
"20180103060000"]
it = self.convert_to_image_asset.process(data_path)
total_assets_size = 0
for time in time_arr:
next(it)
asset_path = os.path.join(self.tmpdir.name, f'test_data_20180101_{time}.tiff')
self.assertTrue(os.path.lexists(asset_path))
total_assets_size += os.path.getsize(asset_path)

# The size of all tiff combined is expected to be more than source file.
self.assertTrue(total_assets_size > os.path.getsize(data_path))

def test_convert_to_multiple_image_assets_with_grib_multiple_edition_default_behaviour(self):
# default behaviour i.e if user does not provide any dimension in tiff_config then for grib
# with multiple time values will generate separate tiff files

data_path = f'{self.test_data_folder}/test_data_grib_multiple_edition_multiple_timestep.grib2'
time_arr = ["20230614000000","20230614060000","20230614120000"]
it = self.convert_to_image_asset.process(data_path)
total_assets_size = 0
for time in time_arr:
next(it)
asset_path = os.path.join(self.tmpdir.name,
f'test_data_grib_multiple_edition_multiple_timestep_{time}_FH-6.tiff')
self.assertTrue(os.path.lexists(asset_path))
total_assets_size += os.path.getsize(asset_path)

self.assertTrue(total_assets_size > os.path.getsize(data_path))

def test_convert_to_multiple_image_assets_with_grib_multiple_edition(self):

data_path = f'{self.test_data_folder}/test_data_grib_multiple_edition_multiple_timestep.grib2'
expected = [
{"time": "20230614000000", "depthBelowLandLayer": "0_00"},
{"time": "20230614000000", "depthBelowLandLayer": "0_10"},
{"time": "20230614060000", "depthBelowLandLayer": "0_00"},
{"time": "20230614060000", "depthBelowLandLayer": "0_10"},
{"time": "20230614120000", "depthBelowLandLayer": "0_00"},
{"time": "20230614120000", "depthBelowLandLayer": "0_10"}
]
convert_to_image_asset = ConvertToAsset(
asset_location=self.tmpdir.name,
tiff_config={"dims": ['depthBelowLandLayer']},
)
it = convert_to_image_asset.process(data_path)
total_assets_size = 0
for obj in expected:
next(it)
time,dbl = obj.values()
asset_name = f'test_data_grib_multiple_edition_multiple_timestep_{time}_FH-6_depthBelowLandLayer_{dbl}.tiff'
asset_path = os.path.join(self.tmpdir.name, asset_name)
self.assertTrue(os.path.lexists(asset_path))
total_assets_size += os.path.getsize(asset_path)

self.assertTrue(total_assets_size > os.path.getsize(data_path))

def test_convert_to_image_asset__with_multiple_grib_edition(self):
data_path = f'{self.test_data_folder}/test_data_grib_multiple_edition_single_timestep.bz2'
asset_path = os.path.join(self.tmpdir.name, 'test_data_grib_multiple_edition_single_timestep.tiff')
asset_path = os.path.join(
self.tmpdir.name,
'test_data_grib_multiple_edition_single_timestep_20211210120000_FH-8.tiff',
)

next(self.convert_to_image_asset.process(data_path))

Expand All @@ -91,7 +165,7 @@ def test_convert_to_image_asset__with_multiple_grib_edition(self):

def test_convert_to_table_asset(self):
data_path = f'{self.test_data_folder}/test_data_grib_single_timestep'
asset_path = os.path.join(self.tmpdir.name, 'test_data_grib_single_timestep.csv')
asset_path = os.path.join(self.tmpdir.name, 'test_data_grib_single_timestep_20211018060000.csv')

next(self.convert_to_table_asset.process(data_path))

Expand All @@ -100,7 +174,10 @@ def test_convert_to_table_asset(self):

def test_convert_to_table_asset__with_multiple_grib_edition(self):
data_path = f'{self.test_data_folder}/test_data_grib_multiple_edition_single_timestep.bz2'
asset_path = os.path.join(self.tmpdir.name, 'test_data_grib_multiple_edition_single_timestep.csv')
asset_path = os.path.join(
self.tmpdir.name,
'test_data_grib_multiple_edition_single_timestep_20211210120000_FH-8.csv',
)

next(self.convert_to_table_asset.process(data_path))

Expand Down
Loading
Loading