diff --git a/river_dl/postproc_utils.py b/river_dl/postproc_utils.py index 96cd8db..eca8285 100755 --- a/river_dl/postproc_utils.py +++ b/river_dl/postproc_utils.py @@ -132,7 +132,7 @@ def plot_ts_obs_preds(pred_file, obs_file, index_start = 0, index_end=3, outfile ax.legend() ax.set_title(seg) plt.tight_layout() - if out_file: + if outfile: plt.savefig(outfile) else: plt.show() diff --git a/river_dl/predict.py b/river_dl/predict.py index 0ebc50f..521a915 100755 --- a/river_dl/predict.py +++ b/river_dl/predict.py @@ -59,6 +59,9 @@ def predict_from_io_data( tst_val_offset = 1.0, spatial_idx_name="seg_id_nat", time_idx_name="date", + trn_latest_time=None, + val_latest_time=None, + tst_latest_time=None ): """ make predictions from trained model @@ -70,14 +73,34 @@ def predict_from_io_data( prep :param trn_offset: [str] value for the training offset :param tst_val_offset: [str] value for the testing and validation offset + :param trn_latest_time: [str] when specified, the training partition preds will + be trimmed to use trn_latest_time as the last date + :param val_latest_time: [str] when specified, the validation partition preds will + be trimmed to use val_latest_time as the last date + :param tst_latest_time: [str] when specified, the test partition preds will + be trimmed to use tst_latest_time as the last date :return: [pd dataframe] predictions """ io_data = get_data_if_file(io_data) if partition == "trn": keep_portion = trn_offset - else: + if trn_latest_time: + latest_time = trn_latest_time + else: + latest_time = None + elif partition == "val": keep_portion = tst_val_offset - + if val_latest_time: + latest_time = val_latest_time + else: + latest_time = None + elif partition == "tst": + keep_portion = tst_val_offset + if tst_latest_time: + latest_time = tst_latest_time + else: + latest_time = None + preds = predict( model, io_data[f"x_{partition}"], @@ -91,6 +114,8 @@ def predict_from_io_data( log_vars=log_vars, spatial_idx_name=spatial_idx_name, time_idx_name=time_idx_name, + latest_time=latest_time, + pad_mask=io_data[f"padded_{partition}"] ) return preds @@ -108,6 +133,8 @@ def predict( log_vars=False, spatial_idx_name="seg_id_nat", time_idx_name="date", + latest_time=None, + pad_mask=None ): """ use trained model to make predictions @@ -126,7 +153,10 @@ def predict( :param y_vars:[np array] the variable names of the y_dataset data :param outfile: [str] the file where the output data should be stored :param log_vars: [list-like] which variables_to_log (if any) were logged in data - prep + :param latest_time: [str] when provided, the latest time that should be included + in the returned dataframe + :param pad_mask: [np array] bool array with True for padded data and False + otherwise :return: out predictions """ num_segs = len(np.unique(pred_ids)) @@ -143,6 +173,11 @@ def predict( y_pred = model.predict(x_data, batch_size=num_segs) else: raise TypeError("Model must be a torch.nn.Module or tf.Keras.Model") + + #set to nan the data that were added to fill batches + if pad_mask is not None: + y_pred[pad_mask] = np.nan + # keep only specified part of predictions if keep_last_portion>1: frac_seq_len = int(keep_last_portion) @@ -154,8 +189,19 @@ def predict( pred_dates = pred_dates[:, -frac_seq_len:,...] y_pred_pp = prepped_array_to_df(y_pred, pred_dates, pred_ids, y_vars, spatial_idx_name, time_idx_name) - - y_pred_pp = unscale_output(y_pred_pp, y_stds, y_means, y_vars, log_vars,) + + y_pred_pp = unscale_output(y_pred_pp, y_stds, y_means, y_vars, log_vars) + + #remove data that were added to fill batches + y_pred_pp.dropna(inplace=True) + y_pred_pp = y_pred_pp.reset_index().drop(columns='index') + + #Cut off the end times if specified + if latest_time: + y_pred_pp = (y_pred_pp.drop(y_pred_pp[y_pred_pp[time_idx_name] > np.datetime64(latest_time)].index) + .reset_index() + .drop(columns='index') + ) if outfile: y_pred_pp.to_feather(outfile) diff --git a/river_dl/preproc_utils.py b/river_dl/preproc_utils.py index 55beefd..7176a18 100755 --- a/river_dl/preproc_utils.py +++ b/river_dl/preproc_utils.py @@ -180,15 +180,30 @@ def separate_trn_tst( return train, val, test -def split_into_batches(data_array, seq_len=365, offset=1.0): +def split_into_batches(data_array, seq_len=365, offset=1.0, + fill_batch=True, fill_nan=False, fill_time=False, + fill_pad=False): """ - split training data into batches with size of batch_size + split training data into batches with size of seq_len :param data_array: [numpy array] array of training data with dims [nseg, ndates, nfeat] :param seq_len: [int] length of sequences (e.g., 365) :param offset: [float] How to offset the batches. Values < 1 are taken as fractions, (e.g., 0.5 means that the first batch will be 0-365 and the second will be 182-547), values > 1 are used as a constant number of observations to offset by. + :param fill_batch: [bool] when True, batches are filled to match the seq_len. + This ensures that data are not dropped when the data_array length is not + a multiple of the seq_len. Data are added to the end of the sequence. + When False, data may be dropped. + :param fill_nan: [bool] When True, filled in data are np.nan (e.g., because + data_array is observation data that should not contribute to the loss). + When False, filled in data are replicates of the previous timesteps. + :param fill_time: [bool] When True, filled in data are time indices that + follow in sequence from the previous timesteps. When False, filled in data + are replicates of the previous timesteps. + :param fill_pad: [bool] When True, the returned data are bool indicating + True when it is padded and False otherwise. When False, filled in data + are based on other fill_ rules. :return: [numpy array] batched data with dims [nbatches, nseg, seq_len (batch_size), nfeat] """ @@ -196,11 +211,85 @@ def split_into_batches(data_array, seq_len=365, offset=1.0): period = int(offset) else: period = int(offset*seq_len) - num_batches = data_array.shape[1]//period + + nsteps = data_array.shape[1] + num_batches = nsteps//period + if fill_batch: + #Check if nsteps is an exact multiple of the period. + exact_batches = nsteps/period + if (num_batches != exact_batches): + #append timesteps to the data_array + #exact_batches will always be greater than num_batches. + #Determine how many timesteps to replicate to get a full batch + num_rep_steps = int(period - np.floor((exact_batches - num_batches) * seq_len)) + if fill_nan: + #fill in with nan values + nan_array = np.empty((data_array.shape[0], + num_rep_steps, + data_array.shape[2])) + nan_array.fill(np.nan) + data_array = np.concatenate((data_array, + nan_array), + axis = 1) + elif fill_pad: + #fill in with True for padded values and False otherwise + False_array = np.empty((data_array.shape[0], + data_array.shape[1], + data_array.shape[2]), + dtype=bool) + True_array = np.empty((data_array.shape[0], + num_rep_steps, + data_array.shape[2]), + dtype=bool) + False_array.fill(False) + True_array.fill(True) + data_array = np.concatenate((False_array, + True_array), + axis = 1) + else: + #fill in by replicating the previous timesteps in the data_array + if fill_time: + #data are an np.datetime64 object. These must be unique, so + #cannot be replicated. Add timesteps sequentially + fill_dates_array = data_array[:,(nsteps-num_rep_steps):nsteps,:].copy() + #add num_rep_steps to each index. + # Sending the smallest possible sample of data_array to the function + time_unit = get_time_unit(data_array[0:2,0:2,0:2]) + fill_dates_array = fill_dates_array[:,:,:] + np.timedelta64(num_rep_steps, time_unit) + + data_array = np.concatenate((data_array, + fill_dates_array), + axis = 1) + else: + data_array = np.concatenate((data_array, + data_array[:,(nsteps-num_rep_steps):nsteps,:]), + axis = 1) + + num_batches = num_batches+1 + + elif fill_pad: + #return array of False - no padded data + False_array = np.empty((data_array.shape[0], + data_array.shape[1], + data_array.shape[2]), + dtype=bool) + False_array.fill(False) + data_array = False_array + + elif fill_pad: + #return array of False - no padded data + False_array = np.empty((data_array.shape[0], + data_array.shape[1], + data_array.shape[2]), + dtype=bool) + False_array.fill(False) + data_array = False_array + + combined=[] for i in range(num_batches+1): idx = int(period*i) - batch = data_array[:,idx:idx+seq_len,...] + batch = data_array[:,idx:idx+seq_len,...] combined.append(batch) combined = [b for b in combined if b.shape[1]==seq_len] combined = np.asarray(combined) @@ -385,7 +474,11 @@ def convert_batch_reshape( spatial_idx_name="seg_id_nat", time_idx_name="date", seq_len=365, - offset=1.0 + offset=1.0, + fill_batch=True, + fill_nan=False, + fill_time=False, + fill_pad=False ): """ convert xarray dataset into numpy array, swap the axes, batch the array and @@ -398,13 +491,57 @@ def convert_batch_reshape( :param seq_len: [int] length of sequences (e.g., 365) :param offset: [float] 0-1, how to offset the batches (e.g., 0.5 means that the first batch will be 0-365 and the second will be 182-547) + :param fill_batch: [bool] when True, batches are filled to match the seq_len. + This ensures that data are not dropped when the data_array length is not + a multiple of the seq_len. Data are added to the end of the sequence. + When False, data may be dropped. + :param fill_nan: [bool] When True, filled in data are np.nan (e.g., because + data_array is observation data that should not contribute to the loss). + :param fill_time: [bool] When True, filled in data are time indices that + follow in sequence from the previous timesteps. When False, filled in data + are replicates of the previous timesteps. + :param fill_pad: [bool] When True, the returned data are bool indicating + True when it is padded and False otherwise. When False, filled in data + are based on other fill_ rules. :return: [numpy array] batched and reshaped dataset """ # If there is no dataset (like if a test or validation set is not supplied) # just return None if not dataset: return None - + + if fill_batch: + continuous_start_inds = [] + #Check if there are gaps in the timeseries as a result of using a + # discontinuous partition. + #identify all gaps greater than the 1st timestep + time_diff = np.diff(dataset[time_idx_name]) + timestep_1 = time_diff[0] + if any(time_diff != timestep_1): + #fill those gaps based on the sequence length. + gap_timesteps = np.delete(np.unique(time_diff), + np.where(np.unique(time_diff) == timestep_1)) + for t in gap_timesteps: + #using [0] to return an array + gap_ind = np.where(time_diff == t)[0] + #there can be multiple gaps of the same length + for i in gap_ind: + #determine if the gap is longer than the sequence length + date_before_gap = dataset[time_idx_name][i].values + next_date = dataset[time_idx_name][i+1].values + #gap length in the same unit as the timestep + gap_length = int((next_date - date_before_gap)/timestep_1) + if gap_length < seq_len: + #I originally had this as a sys.exit, but did not want + # to force this condition. + print("The gap between this partition's continuous time periods is less than the sequence length") + + #get the start date indices. These are used to split the + # dataset before creating batches + continuous_start_inds.append(i+1) + + continuous_start_inds = np.sort(continuous_start_inds) + # convert xr.dataset to numpy array dataset = dataset.transpose(spatial_idx_name, time_idx_name) @@ -420,7 +557,36 @@ def convert_batch_reshape( # batch the data # after [nbatch, nseg, seq_len, nfeat] - batched = split_into_batches(arr, seq_len=seq_len, offset=offset) + if fill_batch: + if len(continuous_start_inds) != 0: + #using a discontinuous partition. create a set of batches for each + # continuous period in this partion and join + for g in range(len(continuous_start_inds)+1): + if g == 0: + arr_g = arr[:,0:continuous_start_inds[g],:].copy() + + batched = split_into_batches(arr_g, seq_len=seq_len, offset=offset, + fill_batch=fill_batch, fill_nan=fill_nan, + fill_time=fill_time, fill_pad=fill_pad) + + else: + if g == len(continuous_start_inds): + arr_g = arr[:,continuous_start_inds[g-1]:arr.shape[1],:].copy() + else: + arr_g = arr[:,continuous_start_inds[g-1]:continuous_start_inds[g],:].copy() + + batched_g = split_into_batches(arr_g, seq_len=seq_len, offset=offset, + fill_batch=fill_batch, fill_nan=fill_nan, + fill_time=fill_time, fill_pad=fill_pad) + batched = np.append(batched, batched_g, axis = 0) + else: + batched = split_into_batches(arr, seq_len=seq_len, offset=offset, + fill_batch=fill_batch, fill_nan=fill_nan, + fill_time=fill_time, fill_pad=fill_pad) + else: + batched = split_into_batches(arr, seq_len=seq_len, offset=offset, + fill_batch=fill_batch, fill_nan=fill_nan, + fill_time=fill_time, fill_pad=fill_pad) # reshape data # after [nbatch * nseg, seq_len, nfeat] @@ -435,6 +601,10 @@ def coord_as_reshaped_array( time_idx_name="date", seq_len=365, offset=1.0, + fill_batch=True, + fill_nan=False, + fill_time=False, + fill_pad=False ): """ convert an xarray coordinate to an xarray data array and reshape that array @@ -447,6 +617,19 @@ def coord_as_reshaped_array( :param seq_len: [int] length of sequences (e.g., 365) :param offset: [float] 0-1, how to offset the batches (e.g., 0.5 means that the first batch will be 0-365 and the second will be 182-547) + :param fill_batch: [bool] when True, batches are filled to match the seq_len. + This ensures that data are not dropped when the data_array length is not + a multiple of the seq_len. Data are added to the end of the sequence. + When False, data may be dropped. + :param fill_nan: [bool] When True, filled in data are np.nan (e.g., because + data_array is observation data that should not contribute to the loss). + When False, filled in data are replicates of the previous timesteps. + :param fill_time: [bool] When True, filled in data are time indices that + follow in sequence from the previous timesteps. When False, filled in data + are replicates of the previous timesteps. + :param fill_pad: [bool] When True, the returned data are bool indicating + True when it is padded and False otherwise. When False, filled in data + are based on other fill_ rules. :return: """ # If there is no dataset (like if a test or validation set is not supplied) @@ -466,6 +649,10 @@ def coord_as_reshaped_array( time_idx_name, seq_len=seq_len, offset=offset, + fill_batch=fill_batch, + fill_nan=fill_nan, + fill_time=fill_time, + fill_pad=fill_pad ) return reshaped_np_arr @@ -511,7 +698,9 @@ def prep_y_data( y_std=None, y_mean=None, trn_offset = 1.0, - tst_val_offset = 1.0 + tst_val_offset = 1.0, + fill_batch=True, + fill_nan=True ): """ prepare y_dataset data @@ -552,6 +741,13 @@ def prep_y_data( :param y_mean: [array-like] means of y_dataset variables_to_log :param trn_offset: [str] value for the training offset :param tst_val_offset: [str] value for the testing and validation offset + :param fill_batch: [bool] when True, batches are filled to match the seq_len. + This ensures that data are not dropped when the data_array length is not + a multiple of the seq_len. Data are added to the end of the sequence. + When False, data may be dropped. + :param fill_nan: [bool] When True, filled in data are np.nan (e.g., because + data_array is observation data that should not contribute to the loss). + When False, filled in data are replicates of the previous timesteps. :returns: training and testing data along with the means and standard deviations of the training input and output data """ @@ -635,13 +831,16 @@ def prep_y_data( if y_type == 'obs': data = { "y_obs_trn": convert_batch_reshape( - y_trn, spatial_idx_name, time_idx_name, offset=trn_offset, seq_len=seq_len + y_trn, spatial_idx_name, time_idx_name, offset=trn_offset, seq_len=seq_len, + fill_batch=fill_batch, fill_nan=fill_nan, fill_time=False ), "y_obs_val": convert_batch_reshape( - y_val, spatial_idx_name, time_idx_name, offset=tst_val_offset, seq_len=seq_len + y_val, spatial_idx_name, time_idx_name, offset=tst_val_offset, seq_len=seq_len, + fill_batch=fill_batch, fill_nan=fill_nan, fill_time=False ), "y_obs_tst": convert_batch_reshape( - y_tst, spatial_idx_name, time_idx_name, offset=tst_val_offset, seq_len=seq_len + y_tst, spatial_idx_name, time_idx_name, offset=tst_val_offset, seq_len=seq_len, + fill_batch=fill_batch, fill_nan=fill_nan, fill_time=False ), "y_std": y_std.to_array().values, "y_mean": y_mean.to_array().values, @@ -656,10 +855,12 @@ def prep_y_data( data = { "y_pre_full": convert_batch_reshape( - y_data, spatial_idx_name, time_idx_name, offset=trn_offset, seq_len=seq_len + y_data, spatial_idx_name, time_idx_name, offset=trn_offset, seq_len=seq_len, + fill_batch=fill_batch, fill_nan=fill_nan, fill_time=False ), "y_pre_trn": convert_batch_reshape( - y_trn, spatial_idx_name, time_idx_name, offset=trn_offset, seq_len=seq_len + y_trn, spatial_idx_name, time_idx_name, offset=trn_offset, seq_len=seq_len, + fill_batch=fill_batch, fill_nan=fill_nan, fill_time=False ), "y_pre_vars": y_vars, } @@ -699,7 +900,8 @@ def prep_all_data( normalize_y=True, trn_offset = 1.0, tst_val_offset = 1.0, - check_pre_partitions=True + check_pre_partitions=True, + fill_batch=True ): """ prepare input and output data for DL model training read in and process @@ -765,6 +967,10 @@ def prep_all_data( :param tst_val_offset: [str] value for the testing and validation offset :param check_pre_partitions [bool] when True, pretarining partitions are checked for unique data in each partition. + :param fill_batch: [bool] when True, batches are filled to match the seq_len. + This ensures that data are not dropped when the data_array length is not + a multiple of the seq_len. Data are added to the end of the sequence. + When False, data may be dropped. :returns: training and testing data along with the means and standard deviations of the training input and output data "x_trn": x training data @@ -775,10 +981,13 @@ def prep_all_data( "x_cols": x column names "ids_trn": segment ids of the training data "times_trn": dates of the training data + "padded_trn": bool with True for padded times "ids_val": segment ids of the validation data "times_val": dates of the validation data + "padded_val": times_val with True for padded times "ids_tst": segment ids of the test data "times_tst": dates of the test data + "padded_tst": times_tst with True for padded times 'y_pre_trn': y_dataset pretrain data for train set 'y_obs_trn': y_dataset observations for train set "y_pre_wgts": y_dataset weights for pretrain data @@ -850,25 +1059,35 @@ def prep_all_data( x_data_dict = { "x_pre_full": convert_batch_reshape( x_scl,spatial_idx_name, time_idx_name, seq_len=seq_len, - offset=trn_offset, + offset=trn_offset, fill_batch=fill_batch, fill_nan=False, fill_time=False, + fill_pad=False ), "x_trn": convert_batch_reshape( x_trn_scl, spatial_idx_name, time_idx_name, seq_len=seq_len, - offset=trn_offset, + offset=trn_offset, fill_batch=fill_batch, fill_nan=False, fill_time=False, + fill_pad=False ), "x_val": convert_batch_reshape( x_val_scl, spatial_idx_name, time_idx_name, offset=tst_val_offset, - seq_len=seq_len, + seq_len=seq_len, + fill_batch=fill_batch, + fill_nan=False, + fill_time=False, + fill_pad=False ), "x_tst": convert_batch_reshape( x_tst_scl, spatial_idx_name, time_idx_name, offset=tst_val_offset, - seq_len=seq_len, + seq_len=seq_len, + fill_batch=fill_batch, + fill_nan=False, + fill_time=False, + fill_pad=False ), "x_std": x_std.to_array().values, "x_mean": x_mean.to_array().values, @@ -879,7 +1098,11 @@ def prep_all_data( spatial_idx_name, time_idx_name, offset=trn_offset, - seq_len=seq_len, + seq_len=seq_len, + fill_batch=fill_batch, + fill_nan=False, + fill_time=False, + fill_pad=False ), "times_trn": coord_as_reshaped_array( x_trn, @@ -887,7 +1110,23 @@ def prep_all_data( spatial_idx_name, time_idx_name, offset=trn_offset, - seq_len=seq_len, + seq_len=seq_len, + fill_batch=fill_batch, + fill_nan=False, + fill_time=True, + fill_pad=False + ), + "padded_trn": coord_as_reshaped_array( + x_trn, + spatial_idx_name, + spatial_idx_name, + time_idx_name, + offset=trn_offset, + seq_len=seq_len, + fill_batch=fill_batch, + fill_nan=False, + fill_time=False, + fill_pad=True ), "ids_val": coord_as_reshaped_array( x_val, @@ -895,7 +1134,11 @@ def prep_all_data( spatial_idx_name, time_idx_name, offset=tst_val_offset, - seq_len=seq_len, + seq_len=seq_len, + fill_batch=fill_batch, + fill_nan=False, + fill_time=False, + fill_pad=False ), "times_val": coord_as_reshaped_array( x_val, @@ -903,7 +1146,23 @@ def prep_all_data( spatial_idx_name, time_idx_name, offset=tst_val_offset, - seq_len=seq_len, + seq_len=seq_len, + fill_batch=fill_batch, + fill_nan=False, + fill_time=True, + fill_pad=False + ), + "padded_val": coord_as_reshaped_array( + x_val, + spatial_idx_name, + spatial_idx_name, + time_idx_name, + offset=tst_val_offset, + seq_len=seq_len, + fill_batch=fill_batch, + fill_nan=False, + fill_time=False, + fill_pad=True ), "ids_tst": coord_as_reshaped_array( x_tst, @@ -911,7 +1170,11 @@ def prep_all_data( spatial_idx_name, time_idx_name, offset=tst_val_offset, - seq_len=seq_len, + seq_len=seq_len, + fill_batch=fill_batch, + fill_nan=False, + fill_time=False, + fill_pad=False ), "times_tst": coord_as_reshaped_array( x_tst, @@ -920,7 +1183,23 @@ def prep_all_data( time_idx_name, offset=tst_val_offset, seq_len=seq_len, + fill_batch=fill_batch, + fill_nan=False, + fill_time=True, + fill_pad=False ), + "padded_tst": coord_as_reshaped_array( + x_tst, + spatial_idx_name, + spatial_idx_name, + time_idx_name, + offset=tst_val_offset, + seq_len=seq_len, + fill_batch=fill_batch, + fill_nan=False, + fill_time=False, + fill_pad=True + ) } if distfile: x_data_dict["dist_matrix"] = prep_adj_matrix( @@ -954,7 +1233,9 @@ def prep_all_data( normalize_y=normalize_y, y_type="obs", trn_offset = trn_offset, - tst_val_offset = tst_val_offset + tst_val_offset = tst_val_offset, + fill_batch=fill_batch, + fill_nan=True ) #check that the trn, val, and tst partitions have unique data @@ -982,7 +1263,9 @@ def prep_all_data( y_std=y_obs_data["y_std"], y_mean=y_obs_data["y_mean"], trn_offset = trn_offset, - tst_val_offset = tst_val_offset + tst_val_offset = tst_val_offset, + fill_batch=fill_batch, + fill_nan=True ) if check_pre_partitions: #check that the trn, val, and tst partitions have unique data @@ -1008,7 +1291,9 @@ def prep_all_data( normalize_y=normalize_y, y_type="pre", trn_offset = trn_offset, - tst_val_offset = tst_val_offset + tst_val_offset = tst_val_offset, + fill_batch=fill_batch, + fill_nan=True ) if check_pre_partitions: #check that the trn, val, and tst partitions have unique data @@ -1124,3 +1409,26 @@ def check_partitions(data, y, pre=False): sys.exit('There are observations within multiple data partitions') else: return(0) + +def get_time_unit(data_array): + ''' + Function to get the timestep unit from a numpy.datetime64 array column + + :param data_array: [np 3D array] the array's second axis must be time + in np.datetime64 format with nanoseconds specified (default). + + returns the unit of the timestep (day, month, etc.) + ''' + time_delta = data_array[0,1,0] - data_array[0,0,0] + time_unit = np.datetime_data(time_delta)[0] + if time_unit != 'ns': + sys.exit('time unit must be provided as YYYY-MM-DDT:HH:MM:SS.000000000 nanoseconds') + + if time_delta == 86400000000000: + time_unit = 'D' + elif time_delta == 24000000000: + time_unit = 'h' + else: + sys.exit('time_delta does not correspond to 1 h or D') + + return(time_unit) \ No newline at end of file