Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#115 check and update hazard model #151

Merged
merged 11 commits into from
Oct 16, 2023
160 changes: 65 additions & 95 deletions hydromt_fiat/fiat.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
from hydromt_fiat import DATADIR
from hydromt_fiat.config import Config
from hydromt_fiat.workflows.exposure_vector import ExposureVector
from hydromt_fiat.workflows.hazard import * # TODO: when the hazard module is updated, explicitly mention the functions that are imported
from hydromt_fiat.workflows.hazard import create_lists, check_lists_size, read_maps, check_maps_metadata, check_maps_rp, check_map_uniqueness, create_risk_dataset
from hydromt_fiat.workflows.social_vulnerability_index import SocialVulnerabilityIndex
from hydromt_fiat.workflows.vulnerability import Vulnerability
from hydromt_fiat.workflows.aggregation_areas import join_exposure_aggregation_areas


__all__ = ["FiatModel"]

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -335,7 +334,8 @@ def setup_hazard(
risk_output: bool = False,
unit_conversion_factor: float = 1.0,
) -> None:
"""Set up hazard maps. This component integrates multiple checks for the maps
"""Set up hazard maps. This component integrates multiple checks for the hazard
maps.

Parameters
----------
Expand Down Expand Up @@ -372,58 +372,24 @@ def setup_hazard(
risk_output : bool, optional
The parameter that defines if a risk analysis is required, by default False
"""
# check parameters types and size, and existance of provided files of maps

params = check_parameters_type(map_fn, map_type, rp, crs, nodata, var, chunks)
check_parameters_size(params)
check_files(params, self.root)

# create lists of maps and their parameters to be able to iterate over them
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice that the fuction setup_hazard is shorter now!

params = create_lists(map_fn, map_type, rp, crs, nodata, var, chunks)
check_lists_size(params)

rp_list = []
map_name_lst = []

# retrieve maps information from parameters and datacatalog
# load maps in memory and check them and save the with st_map function
for idx, da_map_fn in enumerate(params["map_fn_lst"]):
da_map_fn, da_name, da_type = read_floodmaps(params, da_map_fn, idx)

# load flood maps to memory
# da = load_floodmaps(self.data_catalog, self.region,da_map_fn,da_name,name_catalog)
# reading from path
if isinstance(da_map_fn, Path):
if da_map_fn.stem == "sfincs_map":
sfincs_root = os.path.dirname(da_map_fn)
sfincs_model = SfincsModel(
sfincs_root, mode="r", logger=self.logger
)
sfincs_model.read_results()
# save sfincs map as GeoTIFF
# result_list = list(sfincs_model.results.keys())
# sfincs_model.write_raster("results.zsmax", compress="LZW")
da = sfincs_model.results["zsmax"]
# da = da.squeeze('timemax').drop('timemax')
da = da.isel(timemax=0).drop("timemax")


else:
if not self.region.empty:
# da = self.data_catalog.get_rasterdataset(
# da_map_fn, geom=self.region
# )
da = self.data_catalog.get_rasterdataset(da_map_fn)
else:
da = self.data_catalog.get_rasterdataset(da_map_fn)
# Convert to units of the exposure data if required
# reading from the datacatalog
else:
if not self.region.empty:
# da = self.data_catalog.get_rasterdataset(
# name_catalog, variables=da_name, geom=self.region
# )
da = self.data_catalog.get_rasterdataset(map_fn, variables=da_name)
else:
da = self.data_catalog.get_rasterdataset(map_fn, variables=da_name)
if self.exposure.unit != da.units:
da = da * unit_conversion_factor
# read maps and retrieve their attributes
da_map_fn, da_name, da_type = read_maps(params, da_map_fn, idx)

da = self.data_catalog.get_rasterdataset(da_map_fn, geom=self.region)

# Convert to units of the exposure data if required
if self.exposure in locals() or self.exposure in globals(): # change to be sure that the unit information is available from the expousure dataset
if self.exposure.unit != da.units:
da = da * unit_conversion_factor

da.encoding["_FillValue"] = None
da = da.raster.gdal_compliant()

Expand All @@ -433,84 +399,86 @@ def setup_hazard(
# check maps return periods
da_rp = check_maps_rp(params, da, da_name, idx, risk_output)

# chek if maps are unique
# TODO: check if providing methods like self.get_config can be used
# TODO: create a new funtion to check uniqueness trhough files names
# check_maps_uniquenes(self.get_config,self.staticmaps,params,da,da_map_fn,da_name,da_type,da_rp,idx)
if risk_output and da_map_fn.stem == "sfincs_map":
da_name = da_name + f"_{str(da_rp)}"

post = f"(rp {da_rp})" if risk_output else ""
self.logger.info(f"Added {hazard_type} hazard map: {da_name} {post}")

rp_list.append(da_rp)

# If a risk calculation is required and the map comes from sfincs, they
# have the same name so give another name
if risk_output and da_map_fn.stem == "sfincs_map":
da_name = da_name + f"_{str(da_rp)}"
map_name_lst.append(da_name)

da.attrs = {
"returnperiod": str(da_rp),
"type": da_type,
"name": da_name,
"analysis": "event",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this attribute doing? And this one also has a return period? Did you ask Brendan to test with the netcdf that was produced? I think the name of the return period attribute was 'rp' but I'm not sure

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will coordinate with Brandon. Attributes could be skipped in this dataset as the return period is written in the bands name of the dataset. I changed it as rp meanwhile.

}

da = da.to_dataset(name= da_name)

self.set_maps(da, da_name)

check_map_uniqueness(map_name_lst)
# in case risk_output is required maps are put in a netcdf with a raster with
# an extra dimension 'rp' accounting for return period
# select first risk maps

# in case of risk analysis, create a single netcdf with multibans per rp
if risk_output:
list_keys = list(self.maps.keys())
first_map = self.maps[list_keys[0]].rename("risk_datarray")
list_keys.pop(0)

# add additional risk maps
for idx, x in enumerate(list_keys):
key_name = list_keys[idx]
layer = self.maps[key_name]
first_map = xr.concat([first_map, layer], dim="rp")

# convert to a dataset to be able to write attributes when writing the maps
# in the ouput folders. If datarray is provided attributes will not be
# shown in the output netcdf dataset
da = first_map.to_dataset(name="risk_maps")
da.attrs = {
"returnperiod": list(rp_list),
"type": params["map_type_lst"],
"name": map_name_lst,
"Analysis": "risk",

da, sorted_rp, sorted_names = create_risk_dataset(params, rp_list, map_name_lst, self.maps)

self.set_grid(da)

self.grid.attrs = {
"rp": sorted_rp,
"type": params["map_type_lst"], #TODO: This parameter has to be changed in case that a list with different hazard types per map is provided
"name": sorted_names,
"analysis": "risk",
}
# load merged map into self.maps
self.set_maps(da)

list_maps = list(self.maps.keys())

# erase individual maps from self.maps keeping the merged map
for item in list_maps[:-1]:
for item in list_maps[:]:
self.maps.pop(item)

self.set_config("hazard.return_periods", rp_list)
# set configuration .toml file
self.set_config("hazard.return_periods",
str(da_rp) if not risk_output else sorted_rp
)

# the metadata of the hazard maps is saved in the configuration toml files
# this component was modified to provided the element [0] od the list
# in case multiple maps are required then remove [0]
self.set_config(
"hazard.file",
[
str(Path("hazard") / (hazard_map + ".nc"))
for hazard_map in self.maps.keys()
for hazard_map in self.maps.keys()
][0] if not risk_output else
[
str(Path("hazard") / ("risk_map" + ".nc"))
][0],
)
self.set_config(
"hazard.crs",
[
"EPSG:" + str((self.maps[hazard_map].rio.crs.to_epsg()))
"EPSG:" + str((self.maps[hazard_map].raster.crs.to_epsg()))
for hazard_map in self.maps.keys()
][0],
][0] if not risk_output else
[
"EPSG:" + str((self.crs.to_epsg()))
][0]
,
)

self.set_config(
"hazard.elevation_reference", "dem" if da_type == "water_depth" else "datum"
"hazard.elevation_reference",
"dem" if da_type == "water_depth" else "datum"
)

# Set the configurations for a multiband netcdf
self.set_config(
"hazard.settings.subset",
[(self.maps[hazard_map].name) for hazard_map in self.maps.keys()][0],
[
(self.maps[hazard_map].name)
for hazard_map in self.maps.keys()
][0] if not risk_output else sorted_rp,
)

self.set_config(
Expand Down Expand Up @@ -742,6 +710,8 @@ def write(self):
self.write_config()
if self.maps:
self.write_maps(fn="hazard/{name}.nc")
if self.grid:
self.write_grid(fn="hazard/risk_map.nc")
if self.geoms:
self.write_geoms(fn="exposure/{name}.gpkg", driver="GPKG")
if self._tables:
Expand Down
136 changes: 0 additions & 136 deletions hydromt_fiat/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,97 +9,6 @@ def check_dir_exist(dir, name=None):
f"The directory indicated by the '{name}' parameter does not exist."
)


def check_file_exist(root, param_lst, name=None):
root = Path(root)
param_lst = [Path(p) for p in param_lst]
for param_idx, param in enumerate(param_lst):
if isinstance(param, dict):
fn_lst = list(param.values())
else:
fn_lst = [param]
for fn_idx, fn in enumerate(fn_lst):
if not Path(fn).is_file():
if root.joinpath(fn).is_file():
if isinstance(param, dict):
param_lst[param_idx][
list(param.keys())[fn_idx]
] = root.joinpath(fn)
else:
param_lst[param_idx] = root.joinpath(fn)
else:
if isinstance(param, dict):
param_lst[param_idx][list(param.keys())[fn_idx]] = Path(fn)
else:
param_lst[param_idx] = Path(fn)
try:
if isinstance(param, dict):
assert isinstance(
param_lst[param_idx][list(param.keys())[fn_idx]], Path
)
else:
assert isinstance(param_lst[param_idx], Path)
except AssertionError:
raise TypeError(
f"The file indicated by the '{name}' parameter does not"
f" exist in the directory '{root}'."
)


# TODO: Improve this tool without calling model.get_congif(input_dir)
# def check_file_exist(get_config, root, param_lst, name=None, input_dir=None):
# root = Path(root)
# param_lst = [Path(p) for p in param_lst]
# for param_idx, param in enumerate(param_lst):
# if isinstance(param, dict):
# fn_lst = list(param.values())
# else:
# fn_lst = [param]
# for fn_idx, fn in enumerate(fn_lst):
# if not Path(fn).is_file():
# if root.joinpath(fn).is_file():
# if isinstance(param, dict):
# param_lst[param_idx][
# list(param.keys())[fn_idx]
# ] = root.joinpath(fn)
# else:
# param_lst[param_idx] = root.joinpath(fn)
# if input_dir is not None:
# if get_config(input_dir).joinpath(fn).is_file():
# if isinstance(param, dict):
# param_lst[param_idx][
# list(param.keys())[fn_idx]
# ] = get_config(input_dir).joinpath(fn)
# else:
# param_lst[param_idx] = get_config(
# input_dir
# ).joinpath(fn)
# else:
# if isinstance(param, dict):
# param_lst[param_idx][list(param.keys())[fn_idx]] = Path(fn)
# else:
# param_lst[param_idx] = Path(fn)
# try:
# if isinstance(param, dict):
# assert isinstance(
# param_lst[param_idx][list(param.keys())[fn_idx]], Path
# )
# else:
# assert isinstance(param_lst[param_idx], Path)
# except AssertionError:
# if input_dir is None:
# raise TypeError(
# f"The file indicated by the '{name}' parameter does not"
# f" exist in the directory '{root}'."
# )
# else:
# raise TypeError(
# f"The file indicated by the '{name}' parameter does not"
# f" exist in either of the directories '{root}' or "
# f"'{get_config(input_dir)}'."
# )


def check_uniqueness(map_name_lst):
def check_duplicates(lst):
unique_elements = set()
Expand All @@ -114,51 +23,6 @@ def check_duplicates(lst):
if check:
raise ValueError(f"The filenames of the hazard maps should be unique.")


# TODO: Improve this tool without calling model. Just checking the maps names
# def check_uniqueness(model, *args, file_type=None, filename=None):
# """ """

# args = list(args)
# if len(args) == 1 and "." in args[0]:
# args = args[0].split(".") + args[1:]
# branch = args.pop(-1)
# for key in args[::-1]:
# branch = {key: branch}

# if model.get_config(args[0], args[1]):
# for key in model.staticmaps.data_vars:
# if filename == key:
# raise ValueError(
# f"The filenames of the {file_type} maps should be unique."
# )
# if (
# model.get_config(args[0], args[1], key)
# == list(branch[args[0]][args[1]].values())[0]
# ):
# raise ValueError(f"Each model input layers must be unique.")


def check_param_type(param, name=None, types=None):
""" """

if not isinstance(param, list):
raise TypeError(
f"The '{name}_lst' parameter should be a of {list}, received a "
f"{type(param)} instead."
)
for i in param:
if not isinstance(i, types):
if isinstance(types, tuple):
types = " or ".join([str(j) for j in types])
else:
types = types
raise TypeError(
f"The '{name}' parameter should be a of {types}, received a "
f"{type(i)} instead."
)


def get_param(param_lst, map_fn_lst, file_type, filename, i, param_name):
""" """

Expand Down
Loading