From 098927d2c42df38a00f22520c2d6457925307a62 Mon Sep 17 00:00:00 2001 From: mpvginde <81358394+mpvginde@users.noreply.github.com> Date: Tue, 5 Mar 2024 20:42:16 +0100 Subject: [PATCH] Bugfix reproducibility & ensemble member order with dask (#347) * Bugfix: fix random placement of ensemble members in numpy array due to dask multi-threading (#337) * Bugfix: make STEPS (blending) nowcast reproducable when the seed argument is given (#346) * Bugfix: make STEPS (blending) nowcast reproducable, independent of number of workers (#346) * Formatting with black --------- Co-authored-by: ned --- pysteps/blending/steps.py | 5 ++++- pysteps/noise/utils.py | 44 +++++++++++++++++++-------------------- pysteps/nowcasts/steps.py | 7 ++++--- 3 files changed, 29 insertions(+), 27 deletions(-) diff --git a/pysteps/blending/steps.py b/pysteps/blending/steps.py index 30d87349a..ca51c1017 100644 --- a/pysteps/blending/steps.py +++ b/pysteps/blending/steps.py @@ -514,6 +514,7 @@ def forecast( ) # 2. Initialize the noise method + np.random.seed(seed) pp, generate_noise, noise_std_coeffs = _init_noise( precip, precip_thr, @@ -526,6 +527,7 @@ def forecast( noise_stddev_adj, measure_time, num_workers, + seed, ) # 3. Perform the cascade decomposition for the input precip fields and @@ -1662,6 +1664,7 @@ def _init_noise( noise_stddev_adj, measure_time, num_workers, + seed, ): """Initialize the noise method.""" if noise_method is None: @@ -1690,6 +1693,7 @@ def _init_noise( 20, conditional=True, num_workers=num_workers, + seed=seed, ) if measure_time: @@ -1944,7 +1948,6 @@ def _init_random_generators( if noise_method is not None: randgen_prec = [] randgen_motion = [] - np.random.seed(seed) for j in range(n_ens_members): rs = np.random.RandomState(seed) randgen_prec.append(rs) diff --git a/pysteps/noise/utils.py b/pysteps/noise/utils.py index 51aad3fd4..58495b8ad 100644 --- a/pysteps/noise/utils.py +++ b/pysteps/noise/utils.py @@ -96,39 +96,37 @@ def compute_noise_stddev_adjs( if dask_imported and num_workers > 1: res = [] - else: - N_stds = [] + N_stds = [None] * num_iter randstates = [] - seed = None + for k in range(num_iter): randstates.append(np.random.RandomState(seed=seed)) seed = np.random.randint(0, high=1e9) - for k in range(num_iter): - - def worker(): - # generate Gaussian white noise field, filter it using the chosen - # method, multiply it with the standard deviation of the observed - # field and apply the precipitation mask - N = noise_generator(noise_filter, randstate=randstates[k], seed=seed) - N = N / np.std(N) * sigma + mu - N[~MASK] = R_thr_2 + def worker(k): + # generate Gaussian white noise field, filter it using the chosen + # method, multiply it with the standard deviation of the observed + # field and apply the precipitation mask + N = noise_generator(noise_filter, randstate=randstates[k]) + N = N / np.std(N) * sigma + mu + N[~MASK] = R_thr_2 - # subtract the mean and decompose the masked noise field into a - # cascade - N -= mu - decomp_N = decomp_method(N, F, mask=MASK_) + # subtract the mean and decompose the masked noise field into a + # cascade + N -= mu + decomp_N = decomp_method(N, F, mask=MASK_) - return decomp_N["stds"] - - if dask_imported and num_workers > 1: - res.append(dask.delayed(worker)()) - else: - N_stds.append(worker()) + N_stds[k] = decomp_N["stds"] if dask_imported and num_workers > 1: - N_stds = dask.compute(*res, num_workers=num_workers) + for k in range(num_iter): + res.append(dask.delayed(worker)(k)) + dask.compute(*res, num_workers=num_workers) + + else: + for k in range(num_iter): + worker(k) # for each cascade level, compare the standard deviations between the # observed field and the masked noise field, which gives the correction diff --git a/pysteps/nowcasts/steps.py b/pysteps/nowcasts/steps.py index b419fb1e3..1fd3f6d85 100644 --- a/pysteps/nowcasts/steps.py +++ b/pysteps/nowcasts/steps.py @@ -443,6 +443,7 @@ def f(precip, i): precip[i, ~np.isfinite(precip[i, :])] = np.nanmin(precip[i, :]) if noise_method is not None: + np.random.seed(seed) # get methods for perturbations init_noise, generate_noise = noise.get_method(noise_method) @@ -466,6 +467,7 @@ def f(precip, i): 20, conditional=True, num_workers=num_workers, + seed=seed, ) if measure_time: @@ -543,7 +545,6 @@ def f(precip, i): if noise_method is not None: randgen_prec = [] randgen_motion = [] - np.random.seed(seed) for _ in range(n_ens_members): rs = np.random.RandomState(seed) randgen_prec.append(rs) @@ -706,7 +707,7 @@ def _check_inputs(precip, velocity, timesteps, ar_order): def _update(state, params): - precip_forecast_out = [] + precip_forecast_out = [None] * params["n_ens_members"] if params["noise_method"] is None or params["mask_method"] == "sprog": for i in range(params["n_cascade_levels"]): @@ -828,7 +829,7 @@ def worker(j): precip_forecast[params["domain_mask"]] = np.nan - precip_forecast_out.append(precip_forecast) + precip_forecast_out[j] = precip_forecast if ( DASK_IMPORTED