Skip to content

Commit

Permalink
add LastObs reading and writing for command-line run of main_V04 with…
Browse files Browse the repository at this point in the history
… HYfeature hydrofabric (NOAA-OWP#701)

* add LastObs reading and writing by command-line run main_V04 with HYfeature

* add lastobs folder and files

* add wrf_hydro_lastobs_file parameter in test_AnA_V4_NHD.yaml and modified related files

* Delete src/troute-routing/troute/routing/compute.py

nothing changed.

* mistakenly removed compute.py

* refreshed compute.py

* no change for __main.py__

* add lite_restart_output_directory

* remove space in __main__.py

* deleted one lastobs file

* remove one lastobs file again

* remove lastobos_output_folder in compute_parameters and update yaml file

* remove commented lines

* remove unused lastobs file

* remove lastobs_output_folder parameter in config

* remove wrf_hydro_last_obs file in both config and DataAssimilation.py

* remove wrf_hydro_lastobs_file in compute_parameters.py and undo change in __main__.py
  • Loading branch information
kumdonoaa authored Dec 6, 2023
1 parent 7ebb747 commit 59f2b50
Show file tree
Hide file tree
Showing 12 changed files with 123 additions and 42 deletions.
3 changes: 0 additions & 3 deletions src/troute-config/troute/config/compute_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,6 @@ class StreamflowDA(BaseModel, extra='forbid'):
# Need to update this in t-route as well.
lastobs_file: Optional[FilePath] = None

# NOTE: required if lastobs are to be written out during and after simulations
lastobs_output_folder: Optional[DirectoryPath] = None

# TODO: missing from `v3_doc.yaml`
# see troute/DataAssimilation.py :57
# see troute/nhd_network_utilities_v02.py :765
Expand Down
3 changes: 3 additions & 0 deletions src/troute-config/troute/config/output_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class OutputParameters(BaseModel, extra='forbid'):
# see nwm_routing/output.py :114
test_output: Optional[FilePath] = None
stream_output: Optional["StreamOutput"] = None
# NOTE: mandatory if writing results to lastobs
lastobs_output: Optional[DirectoryPath] = None

class ChanobsOutput(BaseModel, extra='forbid'):
# NOTE: required if writing chanobs files
Expand Down Expand Up @@ -89,6 +91,7 @@ def validate_stream_output_internal_frequency(cls, value, values):
if value / 60 > values['stream_output_time']:
raise ValueError("stream_output_internal_frequency should be less than or equal to stream_output_time in minutes.")
return value


OutputParameters.update_forward_refs()
WrfHydroParityCheck.update_forward_refs()
108 changes: 93 additions & 15 deletions src/troute-network/troute/DataAssimilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def __init__(self, network, from_files, value_dict, da_run=[]):

self._last_obs_df = pd.DataFrame()
self._usgs_df = pd.DataFrame()

usgs_df = pd.DataFrame()

# If streamflow nudging is turned on, create lastobs_df and usgs_df:
Expand Down Expand Up @@ -172,21 +172,42 @@ def __init__(self, network, from_files, value_dict, da_run=[]):
self._last_obs_df = _reindex_link_to_lake_id(lastobs_df, network.link_lake_crosswalk)

else:

lastobs_file = streamflow_da_parameters.get("wrf_hydro_lastobs_file", None)
#TODO: Is it a sustainable to figure out if using NHD or HYfeature based on lastobs_crosswalk_file?
lastobs_crosswalk_file = streamflow_da_parameters.get("gage_segID_crosswalk_file", None)
lastobs_start = streamflow_da_parameters.get("wrf_hydro_lastobs_lead_time_relative_to_simulation_start_time", 0)

if lastobs_file:
self._last_obs_df = build_lastobs_df(
lastobs_file,
lastobs_crosswalk_file,
lastobs_start,
)

if lastobs_crosswalk_file:
# lastobs Dataframe for NHD Hydrofabric
lastobs_file = streamflow_da_parameters.get("lastobs_file", None)
lastobs_crosswalk_file = streamflow_da_parameters.get("gage_segID_crosswalk_file", None)
lastobs_start = streamflow_da_parameters.get("wrf_hydro_lastobs_lead_time_relative_to_simulation_start_time", 0)

if lastobs_file:
self._last_obs_df = build_lastobs_df(
lastobs_file,
lastobs_crosswalk_file,
lastobs_start,
)
else:
# lastobs Dataframe for HYfeature HYdrofabric
lastobs_file = data_assimilation_parameters.get('streamflow_da', {}).get('lastobs_file', False)
if lastobs_file:
lastobs_df = _read_lastobs_file(lastobs_file)
lastobs_df = lastobs_df.set_index('gages')
link_gage_df = network.link_gage_df.reset_index().set_index('gages')
col_name = link_gage_df.columns[0]
link_gage_df[col_name] = link_gage_df[col_name].astype(int)
gages_dict = link_gage_df.to_dict().get(col_name)
lastobs_df = lastobs_df.rename(index=gages_dict)
# remove 'nan' values from index
temp_df = lastobs_df.reset_index()
temp_df = temp_df[temp_df['gages']!='nan']
temp_df['gages'] = temp_df['gages'].astype(int)
lastobs_df = temp_df.set_index('gages').dropna()
self._last_obs_df = lastobs_df

# replace link ids with lake ids, for gages at waterbody outlets,
# otherwise, gage data will not be assimilated at waterbody outlet
# segments.
# segments because connections dic has replaced all link ids within
# waterbodies with related lake ids.
if network.link_lake_crosswalk:
self._last_obs_df = _reindex_link_to_lake_id(self._last_obs_df, network.link_lake_crosswalk)

Expand Down Expand Up @@ -217,7 +238,7 @@ def update_after_compute(self, run_results, time_increment):
- lastobs_df (DataFrame): Last gage observations data for DA
'''
streamflow_da_parameters = self._data_assimilation_parameters.get('streamflow_da', None)

if streamflow_da_parameters:
if streamflow_da_parameters.get('streamflow_nudging', False):
self._last_obs_df = new_lastobs(run_results, time_increment)
Expand Down Expand Up @@ -936,7 +957,7 @@ def _create_usgs_df(data_assimilation_parameters, streamflow_da_parameters, run_
- usgs_df (DataFrame): dataframe of USGS gage observations
'''
usgs_timeslices_folder = data_assimilation_parameters.get("usgs_timeslices_folder", None)
lastobs_file = streamflow_da_parameters.get("wrf_hydro_lastobs_file", None)
#lastobs_file = streamflow_da_parameters.get("wrf_hydro_lastobs_file", None)
lastobs_start = data_assimilation_parameters.get("wrf_hydro_lastobs_lead_time_relative_to_simulation_start_time",0)
lastobs_type = data_assimilation_parameters.get("wrf_lastobs_type", "error-based")
crosswalk_file = streamflow_da_parameters.get("gage_segID_crosswalk_file", None)
Expand Down Expand Up @@ -1714,3 +1735,60 @@ def assemble_rfc_dataframes(rfc_timeseries_df, rfc_lake_gage_crosswalk, t0, rfc_
reservoir_rfc_param_df['rfc_persist_days'] = rfc_parameters.get('reservoir_rfc_forecast_persist_days', 11)

return reservoir_rfc_df, reservoir_rfc_param_df

def _read_lastobs_file(
lastobsfile,
station_id = "stationId",
ref_t_attr_id = "modelTimeAtOutput",
time_idx_id = "timeInd",
obs_discharge_id = "discharge",
discharge_nan = -9999.0,
time_shift = 0,
):
with xr.open_dataset(lastobsfile) as ds:
gages = ds[station_id].values

ref_time = datetime.strptime(ds.attrs[ref_t_attr_id], "%Y-%m-%d_%H:%M:%S")

last_ts = ds[time_idx_id].values[-1]

df_discharge = (
ds[obs_discharge_id].to_dataframe(). # discharge to MultiIndex DF
replace(to_replace = discharge_nan, value = np.nan). # replace null values with nan
unstack(level = 0) # unstack to single Index (timeInd)
)

last_obs_index = (
df_discharge.
apply(pd.Series.last_valid_index). # index of last non-nan value, each gage
to_numpy() # to numpy array
)
last_obs_index = np.nan_to_num(last_obs_index, nan = last_ts).astype(int)

last_observations = []
lastobs_times = []
for i, idx in enumerate(last_obs_index):
last_observations.append(df_discharge.iloc[idx,i])
lastobs_times.append(ds.time.values[i, idx].decode('utf-8'))

last_observations = np.array(last_observations)
lastobs_times = pd.to_datetime(
np.array(lastobs_times),
format="%Y-%m-%d_%H:%M:%S",
errors = 'coerce'
)

lastobs_times = (lastobs_times - ref_time).total_seconds()
lastobs_times = lastobs_times - time_shift

data_var_dict = {
'gages' : gages,
'time_since_lastobs' : lastobs_times,
'lastobs_discharge' : last_observations
}

lastobs_df = pd.DataFrame(data = data_var_dict)
lastobs_df['gages'] = lastobs_df['gages'].str.decode('utf-8')

return lastobs_df

2 changes: 1 addition & 1 deletion src/troute-nwm/src/nwm_routing/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def main_v04(argv):
network._waterbody_df,
t0 + timedelta(seconds = dt * nts),
output_parameters['lite_restart']
)
)

# Prepare input forcing for next time loop simulation when mutiple time loops are presented.
if run_set_iterator < len(run_sets) - 1:
Expand Down
11 changes: 7 additions & 4 deletions src/troute-nwm/src/nwm_routing/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def nwm_output_generator(
chano = output_parameters.get("chanobs_output", None)
wbdyo = output_parameters.get("lakeout_output", None)
stream_output = output_parameters.get("stream_output", None)
lastobso = output_parameters.get("lastobs_output", None)

if csv_output:
csv_output_folder = output_parameters["csv_output"].get(
Expand Down Expand Up @@ -360,16 +361,17 @@ def nwm_output_generator(

LOG.debug("writing flow data to CHANOBS took %s seconds." % (time.time() - start))

# Write out LastObs as netcdf.
if lastobso:
# Write out LastObs as netcdf when using main_v04 or troute_model with HYfeature.
# This is only needed if 1) streamflow nudging is ON and 2) a lastobs output
# folder is provided by the user.
start = time.time()
lastobs_output_folder = None
nudging_true = None
streamflow_da = data_assimilation_parameters.get('streamflow_da', None)
if streamflow_da:
lastobs_output_folder = streamflow_da.get(
"lastobs_output_folder", None
)
lastobs_output_folder = output_parameters.get("lastobs_output", None)

nudging_true = streamflow_da.get('streamflow_nudging', None)

if nudging_true and lastobs_output_folder:
Expand All @@ -388,6 +390,7 @@ def nwm_output_generator(

LOG.debug("writing lastobs files took %s seconds." % (time.time() - start))


if 'flowveldepth' in locals():
LOG.debug(flowveldepth)

Expand Down
2 changes: 1 addition & 1 deletion src/troute_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def preprocess_static_vars(self, values: dict):
compute_parameters=self._compute_parameters,
hybrid_parameters=self._hybrid_parameters,
preprocessing_parameters=self._preprocessing_parameters,
output_parameters = self._output_parameters,
output_parameters=self._output_parameters,
from_files=False, value_dict=values,
bmi_parameters=self._bmi_parameters,)

Expand Down
Binary file not shown.
3 changes: 1 addition & 2 deletions test/LowerColorado_TX/test_AnA.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ compute_parameters:
gage_segID_crosswalk_file : domain/RouteLink.nc
crosswalk_gage_field : 'gages'
crosswalk_segID_field : 'link'
wrf_hydro_lastobs_file : lastobs/nudgingLastObs.2021-08-23_12:00:00.nc
lastobs_output_folder : lastobs/
lastobs_file : #lastobs/nudgingLastObs.2021-08-23_12:00:00.nc
reservoir_da:
#----------
reservoir_persistence_usgs : True
Expand Down
3 changes: 1 addition & 2 deletions test/LowerColorado_TX/test_AnA_V4_NHD.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ compute_parameters:
gage_segID_crosswalk_file : domain/RouteLink.nc
crosswalk_gage_field : 'gages'
crosswalk_segID_field : 'link'
lastobs_file : lastobs/nudgingLastObs.2021-08-23_12:00:00.nc
lastobs_output_folder : lastobs/
lastobs_file : #lastobs/nudgingLastObs.2021-08-23_12:00:00.nc
reservoir_da:
#----------
reservoir_parameter_file : domain/reservoir_index_AnA.nc
Expand Down
Binary file not shown.
26 changes: 14 additions & 12 deletions test/LowerColorado_TX_v4/test_AnA_V4_HYFeature.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ compute_parameters:
streamflow_da:
#----------
streamflow_nudging : True
diffusive_streamflow_nudging : False
lastobs_output_folder : #lastobs/
diffusive_streamflow_nudging : False
lastobs_file : #lastobs/nudgingLastObs.2023-04-02_00:00:00.nc
reservoir_da:
#----------
reservoir_persistence_da:
Expand All @@ -117,14 +117,16 @@ compute_parameters:
reservoir_rfc_forecasts_offset_hours : 28
#--------------------------------------------------------------------------------
output_parameters:
#----------
test_output : output/lcr_flowveldepth.pkl
lite_restart:
#----------
# #----------
test_output : output/lcr_flowveldepth.pkl
lite_restart:
# #----------
lite_restart_output_directory: restart/
lakeout_output: lakeout/
stream_output :
stream_output_directory: output/
stream_output_time: 1 #[hr]
stream_output_type: '.nc' #please select only between netcdf '.nc' or '.csv' or '.pkl'
stream_output_internal_frequency: 60 #[min] it should be order of 5 minutes. For instance if you want to output every hour put 60
lakeout_output: lakeout/
lastobs_output: lastobs/
# stream_output :
# stream_output_directory: output/
# stream_output_time: 1 #[hr]
# stream_output_type: '.nc' #please select only between netcdf '.nc' or '.csv' or '.pkl'
# stream_output_internal_frequency: 60 #[min] it should be order of 5 minutes. For instance if you want to output every hour put 60

4 changes: 2 additions & 2 deletions test/LowerColorado_TX_v4/test_AnA_V4_HYFeature_noDA.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ compute_parameters:
streamflow_da:
#----------
streamflow_nudging : False
diffusive_streamflow_nudging : False
lastobs_output_folder : lastobs/
diffusive_streamflow_nudging : False
lastobs_file : #lastobs/nudgingLastObs.2023-04-02_00:00:00.nc
reservoir_da:
#----------
reservoir_persistence_da:
Expand Down

0 comments on commit 59f2b50

Please sign in to comment.