diff --git a/concatenator/__init__.py b/concatenator/__init__.py index e69de29..c55e95b 100644 --- a/concatenator/__init__.py +++ b/concatenator/__init__.py @@ -0,0 +1,4 @@ +"""Convenience variables used across the package.""" + +GROUP_DELIM = '__' +COORD_DELIM = " " diff --git a/concatenator/attribute_handling.py b/concatenator/attribute_handling.py new file mode 100644 index 0000000..b99fcab --- /dev/null +++ b/concatenator/attribute_handling.py @@ -0,0 +1,87 @@ +""" +attribute_handling.py + +Functions for converting "coordinates" in netCDF variable attributes + between paths that reference a group hierarchy and flattened paths. +""" +import re + +import netCDF4 + +from concatenator import COORD_DELIM, GROUP_DELIM + + +def regroup_coordinate_attribute(attribute_string: str) -> str: + """ + Examples + -------- + >>> coord_att = "__Time_and_Position__time __Time_and_Position__instrument_fov_latitude __Time_and_Position__instrument_fov_longitude" + >>> _flatten_coordinate_attribute(coord_att) + Time_and_Position/time Time_and_Position/instrument_fov_latitude Time_and_Position/instrument_fov_longitude + + Parameters + ---------- + attribute_string : str + + Returns + ------- + str + """ + # Use the separator that's in the attribute string only if all separators in the string are the same. + # Otherwise, we will use our own default separator. + whitespaces = re.findall(r'\s+', attribute_string) + if len(set(whitespaces)) <= 1: + new_sep = whitespaces[0] + else: + new_sep = COORD_DELIM + + return new_sep.join( + '/'.join(c.split(GROUP_DELIM))[1:] + for c + in attribute_string.split() # split on any whitespace + ) + + +def flatten_coordinate_attribute_paths(dataset: netCDF4.Dataset, + var: netCDF4.Variable, + variable_name: str) -> None: + """Flatten the paths of variables referenced in the coordinates attribute.""" + if 'coordinates' in var.ncattrs(): + coord_att = var.getncattr('coordinates') + + new_coord_att = _flatten_coordinate_attribute(coord_att) + + dataset.variables[variable_name].setncattr('coordinates', new_coord_att) + + +def _flatten_coordinate_attribute(attribute_string: str) -> str: + """Converts attributes that specify group membership via "/" to use new group delimiter, even for the root level. + + Examples + -------- + >>> coord_att = "Time_and_Position/time Time_and_Position/instrument_fov_latitude Time_and_Position/instrument_fov_longitude" + >>> _flatten_coordinate_attribute(coord_att) + __Time_and_Position__time __Time_and_Position__instrument_fov_latitude __Time_and_Position__instrument_fov_longitude + + Parameters + ---------- + attribute_string : str + + Returns + ------- + str + """ + # Use the separator that's in the attribute string only if all separators in the string are the same. + # Otherwise, we will use our own default separator. + whitespaces = re.findall(r'\s+', attribute_string) + if len(set(whitespaces)) <= 1: + new_sep = whitespaces[0] + else: + new_sep = COORD_DELIM + + # A new string is constructed. + return new_sep.join( + f'{GROUP_DELIM}{c.replace("/", GROUP_DELIM)}' + for c + in attribute_string.split() # split on any whitespace + ) diff --git a/concatenator/bumblebee.py b/concatenator/bumblebee.py index 722afef..9fd8531 100644 --- a/concatenator/bumblebee.py +++ b/concatenator/bumblebee.py @@ -9,6 +9,8 @@ import netCDF4 as nc # type: ignore import xarray as xr +from concatenator import GROUP_DELIM +from concatenator.dimension_cleanup import remove_duplicate_dims from concatenator.file_ops import add_label_to_path from concatenator.group_handling import (flatten_grouped_dataset, regroup_flattened_dataset) @@ -20,6 +22,7 @@ def bumblebee(files_to_concat: list[str], output_file: str, write_tmp_flat_concatenated: bool = False, keep_tmp_files: bool = True, + concat_dim: str = "", logger: Logger = default_logger) -> str: """Concatenate netCDF data files along an existing dimension. @@ -28,6 +31,7 @@ def bumblebee(files_to_concat: list[str], files_to_concat : list[str] output_file : str keep_tmp_files : bool + concat_dim : str, optional logger : logging.Logger Returns @@ -46,30 +50,40 @@ def bumblebee(files_to_concat: list[str], return "" logger.debug("Flattening all input files...") + xrdataset_list = [] for i, filepath in enumerate(input_files): # The group structure is flattened. start_time = time.time() logger.debug(" ..file %03d/%03d <%s>..", i + 1, num_input_files, filepath) - flat_dataset, coord_vars, string_vars = flatten_grouped_dataset(nc.Dataset(filepath, 'r'), filepath, + flat_dataset, coord_vars, _ = flatten_grouped_dataset(nc.Dataset(filepath, 'r'), filepath, ensure_all_dims_are_coords=True) + + flat_dataset = remove_duplicate_dims(flat_dataset) + xrds = xr.open_dataset(xr.backends.NetCDF4DataStore(flat_dataset), decode_times=False, decode_coords=False, drop_variables=coord_vars) + benchmark_log['flattening'] = time.time() - start_time # The flattened file is written to disk. - flat_file_path = add_label_to_path(filepath, label="_flat_intermediate") - xrds.to_netcdf(flat_file_path, encoding={v_name: {'dtype': 'str'} for v_name in string_vars}) - intermediate_flat_filepaths.append(flat_file_path) + # flat_file_path = add_label_to_path(filepath, label="_flat_intermediate") + # xrds.to_netcdf(flat_file_path, encoding={v_name: {'dtype': 'str'} for v_name in string_vars}) + # intermediate_flat_filepaths.append(flat_file_path) + # xrdataset_list.append(xr.open_dataset(flat_file_path)) + xrdataset_list.append(xrds) # Flattened files are concatenated together (Using XARRAY). start_time = time.time() logger.debug("Concatenating flattened files...") - combined_ds = xr.open_mfdataset(intermediate_flat_filepaths, - decode_times=False, - decode_coords=False, - data_vars='minimal', - coords='minimal', - compat='override') + # combined_ds = xr.open_mfdataset(intermediate_flat_filepaths, + # decode_times=False, + # decode_coords=False, + # data_vars='minimal', + # coords='minimal', + # compat='override') + + combined_ds = xr.concat(xrdataset_list, dim=GROUP_DELIM + concat_dim, data_vars='minimal', coords='minimal') + benchmark_log['concatenating'] = time.time() - start_time if write_tmp_flat_concatenated: diff --git a/concatenator/dimension_cleanup.py b/concatenator/dimension_cleanup.py new file mode 100644 index 0000000..151c32d --- /dev/null +++ b/concatenator/dimension_cleanup.py @@ -0,0 +1,110 @@ +""" +dimension_cleanup.py + +Functions for renaming duplicated dimension names for netCDF variables, so that xarray can handle the dataset. +""" +import collections + +import netCDF4 as nc + + +def remove_duplicate_dims(nc_dataset: nc.Dataset) -> nc.Dataset: + """ + xarray cannot read netCDF4 datasets with duplicate dimensions. + Function goes through a dataset to catch any variables with duplicate dimensions. + creates an exact copy of the dimension duplicated with a new name. Variable + is reset with new dimensions without duplicates. Old variable deleted, new variable's name + is changed to the original name. + + Notes + ----- + Requires the dataset to be 'flat', i.e., with no groups and every variable at the root-level. + """ + dup_vars = {} + dup_new_varnames = [] + + for var_name, var in nc_dataset.variables.items(): + dim_list = list(var.dimensions) + if len(set(dim_list)) != len(dim_list): # get true if var.dimensions has a duplicate + dup_vars[var_name] = var # populate dictionary with variables with vars with dup dims + + for dup_var_name, dup_var in dup_vars.items(): + dim_list = list(dup_var.dimensions) # original dimensions of the variable with duplicated dims + + # Dimension(s) that are duplicated are retrieved. + # Note: this is not yet tested for more than one duplicated dimension. + dim_dup = [item for item, count in collections.Counter(dim_list).items() if count > 1][0] + dim_dup_length = dup_var.shape[dup_var.dimensions.index(dim_dup)] # length of the duplicated dimension + + # New dimension and variable names are created. + dim_dup_new = dim_dup+'_1' + var_name_new = dup_var_name+'_1' + dup_new_varnames.append(var_name_new) + + # The last dimension for the variable is replaced with the new name in a temporary list. + new_dim_list = dim_list[:-1] + new_dim_list.extend([dim_dup_new]) + + new_dup_var = {} + + # Attributes for the original variable are retrieved. + attrs_contents = get_attributes_minus_fillvalue_and_renamed_coords(original_var_name=dup_var_name, + new_var_name=dim_dup_new, + original_dataset=nc_dataset) + # for attrname in dup_var.ncattrs(): + # if attrname != '_FillValue': + # contents: str = nc_dataset.variables[dup_var_name].getncattr(attrname) + # if attrname == 'coordinates': + # contents.replace(dim_dup, dim_dup_new) + # + # attrs_contents[attrname] = contents + + fill_value = dup_var._FillValue # pylint: disable=W0212 + + # Only create a new *Dimension* if it doesn't already exist. + if dim_dup_new not in nc_dataset.dimensions.keys(): + + # New dimension is created by copying from the duplicated dimension. + nc_dataset.createDimension(dim_dup_new, dim_dup_length) + + # Only create a new dimension *Variable* if it existed originally in the NetCDF structure. + if dim_dup in nc_dataset.variables.keys(): + + # New variable object is created for the renamed, previously duplicated dimension. + new_dup_var[dim_dup_new] = nc_dataset.createVariable(dim_dup_new, nc_dataset.variables[dim_dup].dtype, + (dim_dup_new,), fill_value=fill_value) + dim_var_attr_contents = get_attributes_minus_fillvalue_and_renamed_coords(original_var_name=dim_dup, + new_var_name=dim_dup_new, + original_dataset=nc_dataset) + for attr_name, contents in dim_var_attr_contents.items(): + new_dup_var[dim_dup_new].setncattr(attr_name, contents) + + new_dup_var[dim_dup_new][:] = nc_dataset.variables[dim_dup][:] + + # Delete existing Variable + del nc_dataset.variables[dup_var_name] + + # Replace original *Variable* with new variable with no duplicated dimensions. + new_dup_var[dup_var_name] = nc_dataset.createVariable(dup_var_name, str(dup_var[:].dtype), + tuple(new_dim_list), fill_value=fill_value) + for attr_name, contents in attrs_contents.items(): + new_dup_var[dup_var_name].setncattr(attr_name, contents) + new_dup_var[dup_var_name][:] = dup_var[:] + + return nc_dataset + + +def get_attributes_minus_fillvalue_and_renamed_coords(original_var_name: str, + new_var_name: str, + original_dataset: nc.Dataset) -> dict: + """Variable attributes are retrieved.""" + attrs_contents = {} + + for ncattr in original_dataset.variables[original_var_name].ncattrs(): + if ncattr != '_FillValue': + contents: str = original_dataset.variables[original_var_name].getncattr(ncattr) + if ncattr == 'coordinates': + contents.replace(original_var_name, new_var_name) + attrs_contents[ncattr] = contents + + return attrs_contents diff --git a/concatenator/group_handling.py b/concatenator/group_handling.py index 7aab79e..1fa82a5 100644 --- a/concatenator/group_handling.py +++ b/concatenator/group_handling.py @@ -11,13 +11,13 @@ import numpy as np import xarray as xr -GROUP_DELIM = '__' - -COORD_DELIM = " " +from concatenator import GROUP_DELIM +from concatenator.attribute_handling import ( + flatten_coordinate_attribute_paths, regroup_coordinate_attribute) +# Match dimension names such as "__char28" or "__char16". Used for CERES datasets. _string_dimension_name_pattern = re.compile(r"__char[0-9]+") - def walk(group_node: nc.Group, path: str, new_dataset: nc.Dataset, @@ -44,11 +44,8 @@ def walk(group_node: nc.Group, var_group_name = f'{group_path}{GROUP_DELIM}{var_name}' new_dataset.variables[var_group_name] = var - # Flatten the paths of variables referenced in the coordinates attribute. - if 'coordinates' in var.ncattrs(): - coord_att = var.getncattr('coordinates') - new_dataset.variables[var_group_name].setncattr('coordinates', - _flatten_coordinate_attribute(coord_att)) + # Flatten the paths of variables referenced in the coordinates attribute + flatten_coordinate_attribute_paths(new_dataset, var, var_group_name) if (len(var.dimensions) == 1) and _string_dimension_name_pattern.fullmatch(var.dimensions[0]): list_of_character_string_vars.append(var_group_name) @@ -118,9 +115,7 @@ def flatten_grouped_dataset(nc_dataset: nc.Dataset, nc_dataset.variables[new_var_name] = var # Flatten the paths of variables referenced in the coordinates attribute. - if 'coordinates' in var.ncattrs(): - coord_att = var.getncattr('coordinates') - nc_dataset.variables[new_var_name].setncattr('coordinates', _flatten_coordinate_attribute(coord_att)) + flatten_coordinate_attribute_paths(nc_dataset, var, new_var_name) del nc_dataset.variables[var_name] # Delete old variable @@ -234,7 +229,7 @@ def regroup_flattened_dataset(dataset: xr.Dataset, output_file: str) -> None: # # Reconstruct the grouped paths of variables referenced in the coordinates attribute. if 'coordinates' in var_group[new_var_name].ncattrs(): coord_att = var_group[new_var_name].getncattr('coordinates') - var_group[new_var_name].setncattr('coordinates', _regroup_coordinate_attribute(coord_att)) + var_group[new_var_name].setncattr('coordinates', regroup_coordinate_attribute(coord_att)) except Exception as err: raise err @@ -247,70 +242,6 @@ def _get_nested_group(dataset: nc.Dataset, group_path: str) -> nc.Group: return nested_group -def _flatten_coordinate_attribute(attribute_string: str) -> str: - """Converts attributes that specify group membership via "/" to use new group delimieter, even for the root level. - - Examples - -------- - >>> coord_att = "Time_and_Position/time Time_and_Position/instrument_fov_latitude Time_and_Position/instrument_fov_longitude" - >>> _flatten_coordinate_attribute(coord_att) - __Time_and_Position__time __Time_and_Position__instrument_fov_latitude __Time_and_Position__instrument_fov_longitude - - Parameters - ---------- - attribute_string : str - - Returns - ------- - str - """ - # Use the separator that's in the attribute string only if all separators in the string are the same. - # Otherwise, we will use our own default separator. - whitespaces = re.findall(r'\s+', attribute_string) - if len(set(whitespaces)) <= 1: - new_sep = whitespaces[0] - else: - new_sep = COORD_DELIM - - # A new string is constructed. - return new_sep.join( - f'{GROUP_DELIM}{c.replace("/", GROUP_DELIM)}' - for c - in attribute_string.split() # split on any whitespace - ) - - -def _regroup_coordinate_attribute(attribute_string: str) -> str: - """ - Examples - -------- - >>> coord_att = "__Time_and_Position__time __Time_and_Position__instrument_fov_latitude __Time_and_Position__instrument_fov_longitude" - >>> _flatten_coordinate_attribute(coord_att) - Time_and_Position/time Time_and_Position/instrument_fov_latitude Time_and_Position/instrument_fov_longitude - - Parameters - ---------- - attribute_string : str - - Returns - ------- - str - """ - # Use the separator that's in the attribute string only if all separators in the string are the same. - # Otherwise, we will use our own default separator. - whitespaces = re.findall(r'\s+', attribute_string) - if len(set(whitespaces)) <= 1: - new_sep = whitespaces[0] - else: - new_sep = COORD_DELIM - - return new_sep.join( - '/'.join(c.split(GROUP_DELIM))[1:] - for c - in attribute_string.split() # split on any whitespace - ) - - def _calculate_chunks(dim_sizes: list, default_low_dim_chunksize=4000) -> tuple: """ For the given dataset, calculate if the size on any dimension is diff --git a/tests/test_concat.py b/tests/test_concat.py index 07ef005..7f731ed 100644 --- a/tests/test_concat.py +++ b/tests/test_concat.py @@ -44,9 +44,11 @@ def run_verification_with_bumblebee(self, shutil.copyfile(filepath, copied_input_new_path) input_files.append(str(copied_input_new_path)) - output_path = bumblebee(files_to_concat=input_files, output_file=output_path, + output_path = bumblebee(files_to_concat=input_files, + output_file=output_path, write_tmp_flat_concatenated=True, - keep_tmp_files=True) + keep_tmp_files=True, + concat_dim=record_dim_name) merged_dataset = nc.Dataset(output_path) @@ -82,11 +84,18 @@ def run_verification_with_nco(self, data_dir, output_name, record_dim_name='mirr assert length_sum == len(merged_dataset.variables[record_dim_name]) def test_tempo_no2_concat_with_bumblebee(self): - self.run_verification_with_bumblebee('tempo/tempo_no2', + self.run_verification_with_bumblebee('tempo/tempo_no2_two', 'tempo_no2_concat_with_bumblebee.nc') + def test_tempo_no2_many_concat_with_bumblebee(self): + self.run_verification_with_bumblebee('tempo/tempo_no2_many', + 'tempo_no2_concat_many_with_bumblebee.nc') + def test_tempo_hcho_concat_with_bumblebee(self): - self.run_verification_with_bumblebee('tempo/tempo_hcho', 'tempo_hcho_concat_with_bumblebee.nc') + self.run_verification_with_bumblebee('tempo/tempo_hcho_two', 'tempo_hcho_concat_with_bumblebee.nc') + + def test_tempo_hcho_many_concat_with_bumblebee(self): + self.run_verification_with_bumblebee('tempo/tempo_hcho_many', 'tempo_hcho_concat_with_bumblebee.nc') def test_tempo_cld04_concat_with_bumblebee(self): self.run_verification_with_bumblebee('tempo/tempo_cld04', 'tempo_cld04_concat_with_bumblebee.nc') diff --git a/tests/test_group_handling.py b/tests/test_group_handling.py index bc5e72b..0515910 100644 --- a/tests/test_group_handling.py +++ b/tests/test_group_handling.py @@ -1,9 +1,9 @@ """Tests for manipulating netCDF groups.""" -# pylint: disable=C0116 +# pylint: disable=C0116, C0301 -from concatenator.group_handling import (_flatten_coordinate_attribute, - _regroup_coordinate_attribute) +from concatenator.attribute_handling import (_flatten_coordinate_attribute, + regroup_coordinate_attribute) def test_coordinate_attribute_flattening(): @@ -20,11 +20,9 @@ def test_coordinate_attribute_flattening(): def test_coordinate_attribute_regrouping(): # Case with groups present and double spaces. - assert _regroup_coordinate_attribute( - '__Time_and_Position__time __Time_and_Position__instrument_fov_latitude __Time_and_Position__instrument_fov_longitude' - ) == "Time_and_Position/time Time_and_Position/instrument_fov_latitude Time_and_Position/instrument_fov_longitude" + assert regroup_coordinate_attribute( + '__Time_and_Position__time __Time_and_Position__instrument_fov_latitude __Time_and_Position__instrument_fov_longitude') == "Time_and_Position/time Time_and_Position/instrument_fov_latitude Time_and_Position/instrument_fov_longitude" # Case with NO groups present and single spaces. - assert _regroup_coordinate_attribute( - "__time __longitude __latitude __ozone_profile_pressure __ozone_profile_altitude" - ) == "time longitude latitude ozone_profile_pressure ozone_profile_altitude" + assert regroup_coordinate_attribute( + "__time __longitude __latitude __ozone_profile_pressure __ozone_profile_altitude") == "time longitude latitude ozone_profile_pressure ozone_profile_altitude"