Skip to content

Commit

Permalink
parallel computing for storing output (NOAA-OWP#716)
Browse files Browse the repository at this point in the history
* parallel computing

* Add LOG and option to do or avoid multiproccessing

* deleting pdb

* Addressing comments
  • Loading branch information
AminTorabi-NOAA authored Dec 22, 2023
1 parent 4cd3e49 commit e149ba5
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 81 deletions.
193 changes: 113 additions & 80 deletions src/troute-network/troute/nhd_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -1948,14 +1948,103 @@ def write_waterbody_netcdf(
}
)

def helper_write_flowveldepth(flowveldepth, subset_df, current_time_step,
stream_output_directory, stream_output_type, gage,
nudge_timesteps, time_dim, stream_output_internal_frequency, time_steps,
counter, t0, stream_output_timediff):
'''
Just a helper function for 'write_flowveldepth_netcdf' function for parallel computing.
'''

if stream_output_directory:
if stream_output_type =='.nc':
file_name = f"{current_time_step}.flowveldepth.nc"

elif stream_output_type=='.csv':
file_name = f"{current_time_step}.flowveldepth.csv"
# Save the data to CSV file
subset_df.to_csv(f"{stream_output_directory}/{file_name}", index=True)
LOG.debug(f"Flowveldepth data saved as CSV files in {stream_output_directory}")

elif stream_output_type=='.pkl':
file_name = f"{current_time_step}.flowveldepth.pkl"
# Save the data to Pickle file
subset_df.to_pickle(f"{stream_output_directory}/{file_name}")
LOG.debug(f"Flowveldepth data saved as PICKLE files in {stream_output_directory}")

else:
print('WRONG FORMAT')

if stream_output_directory:
if (stream_output_type =='.nc'):
# Open netCDF4 Dataset in write mode
with netCDF4.Dataset(
filename=f"{stream_output_directory}/{file_name}",
mode='w',
format='NETCDF4'
) as ncfile:

# ============ DIMENSIONS ===================
_ = ncfile.createDimension('feature_id', None)
_ = ncfile.createDimension('time_step (sec)', subset_df.iloc[:, 0::4].shape[1])
_ = ncfile.createDimension('gage', gage)
_ = ncfile.createDimension('nudge_timestep', nudge_timesteps) # Add dimension for nudge time steps

# =========== q,v,d,ndg VARIABLES ===============
for counters, var in enumerate(['flowrate', 'velocity', 'depth', 'nudge']):
QVD = ncfile.createVariable(
varname=var,
datatype=np.float32,
dimensions=('feature_id', 'time_step (sec)',),
)

QVD.units = 'm3/s m/s m m3/s'
QVD.description = f'Data for {var}'

# Prepare data for writing
data_array = subset_df.iloc[:, counters::4].to_numpy(dtype=np.float32)

# Set data for each feature_id and time_step
ncfile.variables[var][:] = data_array
feature_id = ncfile.createVariable(
varname='feature_id',
datatype=np.int32,
dimensions=('feature_id',),
)
feature_id[:] = flowveldepth.index.to_numpy(dtype=np.int32)
feature_id.units = 'None'
feature_id.description = 'Feature IDs'
###
time_step = ncfile.createVariable(
varname='time_step (sec)',
datatype=np.int32,
dimensions=('time_step (sec)',),
)
time_step[:] = np.array(time_dim[:subset_df.iloc[:, 0::4].shape[1]], dtype=np.int32)
time_step.units = 'sec'
time_step.description = 'time stamp'
# =========== GLOBAL ATTRIBUTES ===============
ncfile.setncatts(
{
'TITLE': 'OUTPUT FROM T-ROUTE',
'Time step (sec)': f'{stream_output_internal_frequency}',
'model_initialization_time': t0.strftime('%Y-%m-%d_%H:%M:%S'),
'model_reference_time': time_steps[counter].strftime('%Y-%m-%d_%H:%M:%S'),
'comment': f'The file includes {stream_output_timediff} hour data which includes {len(time_dim)} timesteps',
'code_version': '',
}
)
LOG.debug(f"Flowveldepth data saved as NetCDF files in {stream_output_directory}")

def write_flowveldepth_netcdf(stream_output_directory,
flowveldepth,
nudge,
usgs_positions_id,
t0,
stream_output_timediff,
stream_output_type,
stream_output_internal_frequency = 5):
stream_output_internal_frequency = 5,
cpu_pool = 6):
'''
Write the results of flowveldepth and nudge to netcdf- break.
Arguments
Expand Down Expand Up @@ -2006,8 +2095,11 @@ def write_flowveldepth_netcdf(stream_output_directory,
# Create time step values based on t0
time_steps = [t0 + timedelta(hours= (i * stream_output_timediff)) for i in range(nsteps//nstep_nc)]
time_dim = [t * stream_output_internal_frequency*60 for t in range(1, int(stream_output_timediff * 60 / stream_output_internal_frequency) + 1)]
jobs = []

for counter, i in enumerate(range(0, nsteps, nstep_nc)):


# Define the range of columns for this file
start_col = i * 4
end_col = min((i + nstep_nc) * 4 , nsteps * 4)
Expand All @@ -2021,83 +2113,24 @@ def write_flowveldepth_netcdf(stream_output_directory,

# Create the file name based on the current time step
current_time_step = time_steps[counter].strftime('%Y%m%d%H%M')
if stream_output_directory:
if stream_output_type =='.nc':
file_name = f"{current_time_step}.flowveldepth.nc"

elif stream_output_type=='.csv':
file_name = f"{current_time_step}.flowveldepth.csv"
# Save the data to CSV file
subset_df.to_csv(f"{stream_output_directory}/{file_name}", index=True)
LOG.debug(f"Flowveldepth data saved as CSV files in {stream_output_directory}")

elif stream_output_type=='.pkl':
file_name = f"{current_time_step}.flowveldepth.pkl"
# Save the data to Pickle file
subset_df.to_pickle(f"{stream_output_directory}/{file_name}")
LOG.debug(f"Flowveldepth data saved as PICKLE files in {stream_output_directory}")

else:
print('WRONG FORMAT')

if stream_output_directory:
if (stream_output_type =='.nc'):
# Open netCDF4 Dataset in write mode
with netCDF4.Dataset(
filename=f"{stream_output_directory}/{file_name}",
mode='w',
format='NETCDF4'
) as ncfile:

# ============ DIMENSIONS ===================
_ = ncfile.createDimension('feature_id', None)
_ = ncfile.createDimension('time_step (sec)', subset_df.iloc[:, 0::4].shape[1])
_ = ncfile.createDimension('gage', gage)
_ = ncfile.createDimension('nudge_timestep', nudge_timesteps) # Add dimension for nudge time steps

# =========== q,v,d,ndg VARIABLES ===============
for counters, var in enumerate(['flowrate', 'velocity', 'depth', 'nudge']):
QVD = ncfile.createVariable(
varname=var,
datatype=np.float32,
dimensions=('feature_id', 'time_step (sec)',),
)

QVD.units = 'm3/s m/s m m3/s'
QVD.description = f'Data for {var}'

# Prepare data for writing
data_array = subset_df.iloc[:, counters::4].to_numpy(dtype=np.float32)

# Set data for each feature_id and time_step
ncfile.variables[var][:] = data_array
feature_id = ncfile.createVariable(
varname='feature_id',
datatype=np.int32,
dimensions=('feature_id',),
)
feature_id[:] = flowveldepth.index.to_numpy(dtype=np.int32)
feature_id.units = 'None'
feature_id.description = 'Feature IDs'
###
time_step = ncfile.createVariable(
varname='time_step (sec)',
datatype=np.int32,
dimensions=('time_step (sec)',),
)
time_step[:] = np.array(time_dim[:subset_df.iloc[:, 0::4].shape[1]], dtype=np.int32)
time_step.units = 'sec'
time_step.description = 'time stamp'
# =========== GLOBAL ATTRIBUTES ===============
ncfile.setncatts(
{
'TITLE': 'OUTPUT FROM T-ROUTE',
'Time step (sec)': f'{stream_output_internal_frequency}',
'model_initialization_time': t0.strftime('%Y-%m-%d_%H:%M:%S'),
'model_reference_time': time_steps[counter].strftime('%Y-%m-%d_%H:%M:%S'),
'comment': f'The file includes {stream_output_timediff} hour data which includes {len(time_dim)} timesteps',
'code_version': '',
}
)
LOG.debug(f"Flowveldepth data saved as NetCDF files in {stream_output_directory}")

args = (flowveldepth, subset_df, current_time_step,
stream_output_directory, stream_output_type, gage,
nudge_timesteps, time_dim, stream_output_internal_frequency, time_steps,
counter, t0, stream_output_timediff)

if cpu_pool > 1:
jobs.append(delayed(helper_write_flowveldepth)(*args))
LOG.debug(f"Job for step {counter} added for parallel processing.")
else:
helper_write_flowveldepth(*args)

if cpu_pool > 1:
try:
# Execute all jobs in parallel
with Parallel(n_jobs=cpu_pool) as parallel:
parallel(jobs)
except Exception as e:
LOG.error("Error during parallel processing: %s", e)

LOG.debug("Completed the write_flowveldepth_netcdf function")
3 changes: 2 additions & 1 deletion src/troute-nwm/src/nwm_routing/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ def nwm_output_generator(
t0,
int(stream_output_timediff),
stream_output_type,
stream_output_internal_frequency)
stream_output_internal_frequency,
cpu_pool = cpu_pool)

if test:
flowveldepth.to_pickle(Path(test))
Expand Down

0 comments on commit e149ba5

Please sign in to comment.