Skip to content

Commit

Permalink
Ready for pull request
Browse files Browse the repository at this point in the history
  • Loading branch information
sidekock committed Dec 6, 2024
1 parent 701e726 commit b9de511
Showing 1 changed file with 84 additions and 108 deletions.
192 changes: 84 additions & 108 deletions pysteps/blending/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any, Callable, Union

# TODO: compare old and new version of the code
# TODO: look for better typing in state and params
# TODO: GO over all other todos and check if they can be removed
# TODO: compare old and new version of the code, run a benchmark to compare the two
# TODO: look at the documentation and try to improve it, lots of things are now combined together


Expand Down Expand Up @@ -111,7 +109,6 @@ class StepsBlendingConfig:
return_output: bool = True


# TODO: typing could be improved here
@dataclass
class StepsBlendingParams:
noise_std_coeffs: Optional[np.ndarray] = (
Expand All @@ -127,7 +124,6 @@ class StepsBlendingParams:
extrapolation_method: Optional[Callable[..., Any]] = None
decomposition_method: Optional[Callable[..., dict]] = None
recomposition_method: Optional[Callable[..., np.ndarray]] = None
# TODO: check of the following two are relevant or can be replaced vel_pert_... and noise_generator
velocity_perturbations: Optional[Any] = None
generate_velocity_noise: Optional[Callable[[Any, float], np.ndarray]] = None
velocity_perturbations_parallel: Optional[np.ndarray] = None
Expand All @@ -148,7 +144,6 @@ class StepsBlendingParams:
domain_mask: Optional[np.ndarray] = None


# TODO: typing could be improved here
@dataclass
class StepsBlendingState:
# Radar and noise states
Expand Down Expand Up @@ -213,6 +208,9 @@ class StepsBlendingState:
weights: Optional[np.ndarray] = None
weights_model_only: Optional[np.ndarray] = None

# Is stores here aswell becasue this is changed during the forecast loop and thus not part of the config
extrapolation_kwargs: Dict[str, Any] = field(default_factory=dict)


class StepsBlendingNowcaster:
def __init__(
Expand Down Expand Up @@ -312,9 +310,8 @@ def __blended_nowcast_main_loop(self):

if self.__config.measure_time:
starttime_mainloop = time.time()
# TODO: problem with the config here! This variable changes over time...
# extrap_kwargs is in config but by adding info to it, the next run of a blended forecast will have issues!
self.__config.extrapolation_kwargs["return_displacement"] = True
self.__state.extrapolation_kwargs = deepcopy(self.__config.extrapolation_kwargs)
self.__state.extrapolation_kwargs["return_displacement"] = True

self.__state.precip_cascades_prev_subtimestep = deepcopy(
self.__state.precip_cascades
Expand Down Expand Up @@ -697,15 +694,48 @@ def __prepare_radar_and_NWP_fields(self):

# 1. Start with the radar rainfall fields. We want the fields in a
# Lagrangian space
self.__precip = _transform_to_lagrangian(
self.__precip,
self.__velocity,
self.__config.ar_order,
self.__params.xy_coordinates,
self.__params.extrapolation_method,
self.__config.extrapolation_kwargs,
self.__config.num_workers,
)

"""Advect the previous precipitation fields to the same position with the
most recent one (i.e. transform them into the Lagrangian coordinates).
"""
self.__config.extrapolation_kwargs["xy_coords"] = self.__params.xy_coordinates
res = []

# TODO: create beter names here fore this part, adapted from previous code which is now inlined (old function was called _transform_to_lagrangian)
def f(precip, i):
return self.__params.extrapolation_method(
precip[i, :, :],
self.__velocity,
self.__config.ar_order - i,
"min",
allow_nonfinite_values=True,
**self.__config.extrapolation_kwargs.copy(),
)[-1]

if not DASK_IMPORTED:
# Process each earlier precipitation field directly
for i in range(self.__config.ar_order):
self.__precip[i, :, :] = f(self.__precip, i)
else:
# Use Dask delayed for parallelization if DASK_IMPORTED is True
for i in range(self.__config.ar_order):
res.append(dask.delayed(f)(self.__precip, i))
num_workers_ = (
len(res)
if self.__config.num_workers > len(res)
else self.__config.num_workers
)
self.__precip = np.stack(
list(dask.compute(*res, num_workers=num_workers_))
+ [self.__precip[-1, :, :]]
)

# Replace non-finite values with the minimum value for each field
self.__precip = self.__precip.copy()
for i in range(self.__precip.shape[0]):
self.__precip[i, ~np.isfinite(self.__precip[i, :])] = np.nanmin(
self.__precip[i, :]
)

# 2. Perform the cascade decomposition for the input precip fields and,
# if necessary, for the (NWP) model fields
Expand Down Expand Up @@ -740,9 +770,19 @@ def __prepare_radar_and_NWP_fields(self):

if self.__params.precip_models_provided_is_cascade:
self.__state.precip_models_cascades = self.__precip_models
self.__precip_models = _compute_cascade_recomposition_nwp(
self.__precip_models, self.__params.recomposition_method
)
# Inline logic of _compute_cascade_recomposition_nwp
temp_precip_models = []
for i in range(self.__precip_models.shape[0]):
precip_model = []
for time_step in range(self.__precip_models.shape[1]):
# Use the recomposition method to rebuild the rainfall fields
recomposed = self.__params.recomposition_method(
self.__precip_models[i, time_step]
)
precip_model.append(recomposed)
temp_precip_models.append(precip_model)

self.__precip_models = np.stack(temp_precip_models)

# 2.3 Check for zero input fields in the radar and NWP data.
self.__params.zero_precip_radar = blending.utils.check_norain(
Expand Down Expand Up @@ -1610,9 +1650,9 @@ def __perturb_blend_and_advect_extrapolation_and_noise_to_current_timestep(
# (or subtimesteps if non-integer time steps are given)

# Settings and initialize the output
extrap_kwargs_ = self.__config.extrapolation_kwargs.copy()
extrap_kwargs_noise = self.__config.extrapolation_kwargs.copy()
extrap_kwargs_pb = self.__config.extrapolation_kwargs.copy()
extrap_kwargs_ = self.__state.extrapolation_kwargs.copy()
extrap_kwargs_noise = self.__state.extrapolation_kwargs.copy()
extrap_kwargs_pb = self.__state.extrapolation_kwargs.copy()
velocity_perturbations_extrapolation = self.__velocity
# The following should be accesseble after this function
worker_state.precip_extrapolated_decomp = []
Expand Down Expand Up @@ -1721,9 +1761,7 @@ def __perturb_blend_and_advect_extrapolation_and_noise_to_current_timestep(
)
# Put back the mask
precip_forecast_recomp_subtimestep[self.__params.domain_mask] = np.nan
# TODO: problem with the config here! This variable changes over time...
# extrap_kwargs is in config but by adding info to it, the next run of a blended forecast will have issues!
self.__config.extrapolation_kwargs["displacement_prev"] = (
self.__state.extrapolation_kwargs["displacement_prev"] = (
worker_state.previous_displacement[j]
)
(
Expand All @@ -1734,7 +1772,7 @@ def __perturb_blend_and_advect_extrapolation_and_noise_to_current_timestep(
velocity_blended,
[t_diff_prev_subtimestep],
allow_nonfinite_values=True,
**self.__config.extrapolation_kwargs,
**self.__state.extrapolation_kwargs,
)
precip_extrapolated_recomp_subtimestep = (
precip_forecast_extrapolated_recomp_subtimestep_temp[0].copy()
Expand Down Expand Up @@ -2264,9 +2302,23 @@ def __post_process_output(
>= self.__config.precip_threshold
)
# Buffer the mask
precip_field_mask = _compute_incremental_mask(
precip_field_mask, self.__params.struct, self.__params.mask_rim
# buffer the observation mask Rbin using the kernel kr
# add a grayscale rim r (for smooth rain/no-rain transition)

# buffer observation mask
precip_field_mask_temp = np.ndarray.astype(
precip_field_mask.copy(), "uint8"
)
Rd = binary_dilation(precip_field_mask_temp, self.__params.struct)

# add grayscale rim
kr1 = generate_binary_structure(2, 1)
mask = Rd.astype(float)
for n in range(self.__params.mask_rim):
Rd = binary_dilation(Rd, kr1)
mask += Rd
# normalize between 0 and 1
precip_field_mask = mask / mask.max()
# Get the final mask
worker_state.final_blended_forecast_recomposed = (
precip_forecast_min_value
Expand Down Expand Up @@ -2853,7 +2905,7 @@ def forecast(
return forecast_steps_nowcast


# TODO: add the following code to the main body
# TODO: Where does this piece of code best fit: in utils or inside the class?
def calculate_weights_spn(correlations, covariance):
"""Calculate SPN blending weights for STEPS blending from correlation.
Expand Down Expand Up @@ -2935,6 +2987,7 @@ def calculate_weights_spn(correlations, covariance):
return weights


# TODO: Where does this piece of code best fit: in utils or inside the class?
def blend_means_sigmas(means, sigmas, weights):
"""Calculate the blended means and sigmas, the normalization parameters
needed to recompose the cascade. This procedure uses the weights of the
Expand Down Expand Up @@ -2999,80 +3052,3 @@ def blend_means_sigmas(means, sigmas, weights):
# TODO: substract covariances to weigthed sigmas - still necessary?

return combined_means, combined_sigmas


def _compute_incremental_mask(Rbin, kr, r):
# buffer the observation mask Rbin using the kernel kr
# add a grayscale rim r (for smooth rain/no-rain transition)

# buffer observation mask
Rbin = np.ndarray.astype(Rbin.copy(), "uint8")
Rd = binary_dilation(Rbin, kr)

# add grayscale rim
kr1 = generate_binary_structure(2, 1)
mask = Rd.astype(float)
for n in range(r):
Rd = binary_dilation(Rd, kr1)
mask += Rd
# normalize between 0 and 1
return mask / mask.max()


def _transform_to_lagrangian(
precip, velocity, ar_order, xy_coords, extrapolator, extrap_kwargs, num_workers
):
"""Advect the previous precipitation fields to the same position with the
most recent one (i.e. transform them into the Lagrangian coordinates).
"""
extrap_kwargs = extrap_kwargs.copy()
extrap_kwargs["xy_coords"] = xy_coords
res = list()

def f(precip, i):
return extrapolator(
precip[i, :, :],
velocity,
ar_order - i,
"min",
allow_nonfinite_values=True,
**extrap_kwargs,
)[-1]

for i in range(ar_order):
if not DASK_IMPORTED:
precip[i, :, :] = f(precip, i)
else:
res.append(dask.delayed(f)(precip, i))

if DASK_IMPORTED:
num_workers_ = len(res) if num_workers > len(res) else num_workers
precip = np.stack(
list(dask.compute(*res, num_workers=num_workers_)) + [precip[-1, :, :]]
)

# replace non-finite values with the minimum value
precip = precip.copy()
for i in range(precip.shape[0]):
precip[i, ~np.isfinite(precip[i, :])] = np.nanmin(precip[i, :])
return precip


def _compute_cascade_recomposition_nwp(precip_models_cascade, recompositor):
"""If necessary, recompose (NWP) model forecasts."""
precip_models = None

# Recompose the (NWP) model cascades to have rainfall fields per
# model and time step, which will be used in the probability matching steps.
# Recomposed cascade will have shape: [n_models, n_timesteps, m, n]
precip_models = []
for i in range(precip_models_cascade.shape[0]):
precip_model = []
for time_step in range(precip_models_cascade.shape[1]):
precip_model.append(recompositor(precip_models_cascade[i, time_step]))
precip_models.append(precip_model)

precip_models = np.stack(precip_models)
precip_model = None

return precip_models

0 comments on commit b9de511

Please sign in to comment.