Skip to content

Commit

Permalink
fix -F, multiprocess hf2df, fix s3 checker
Browse files Browse the repository at this point in the history
  • Loading branch information
JordanLaserGit committed Oct 30, 2024
1 parent eea39f4 commit 52d65a7
Show file tree
Hide file tree
Showing 17 changed files with 211 additions and 81 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/forcingprocessor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
- name: Test with pytest
run: |
cd forcingprocessor
python -m pytest -vv --deselect="tests/test_forcingprocessor.py::test_google_cloud_storage" --deselect="tests/test_forcingprocessor.py::test_gcs" --deselect="tests/test_forcingprocessor.py::test_gs" --deselect="tests/test_forcingprocessor.py::test_ciroh_zarr" --deselect="tests/test_forcingprocessor.py::test_nomads_post_processed" --deselect="tests/test_forcingprocessor.py::test_retro_ciroh_zarr" --deselect="tests/test_forcingprocessor.py::test_noaa_nwm_pds_https_analysis_assim" --deselect="tests/test_hf2ds.py" --deselect="tests/test_plotter.py"
python -m pytest -vv --deselect="tests/test_forcingprocessor.py::test_google_cloud_storage" --deselect="tests/test_forcingprocessor.py::test_gcs" --deselect="tests/test_forcingprocessor.py::test_gs" --deselect="tests/test_forcingprocessor.py::test_ciroh_zarr" --deselect="tests/test_forcingprocessor.py::test_nomads_post_processed" --deselect="tests/test_forcingprocessor.py::test_retro_ciroh_zarr" --deselect="tests/test_hf2ds.py" --deselect="tests/test_plotter.py"
python -m pytest -vv tests/test_hf2ds.py
python -m pytest -vv tests/test_plotter.py
python -m pytest -vv -k test_google_cloud_storage
Expand Down
36 changes: 20 additions & 16 deletions .github/workflows/test_datastream_options.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,12 @@ jobs:
- name: Get geopackage from hfsubset
run: |
# HFSUBSET IS BROKEN
# hfsubset -w medium_range -s nextgen -v 2.1.1 -l divides,flowlines,network,nexus,forcing-weights,flowpath-attributes,model-attributes -o palisade.gpkg -t hl "Gages-09106150"
curl -L -O https://ngen-datastream.s3.us-east-2.amazonaws.com/palisade.gpkg
hfsubset -w medium_range -s nextgen -v 2.1.1 -l divides,flowlines,network,nexus,forcing-weights,flowpath-attributes,model-attributes -o palisade.gpkg -t hl "Gages-09106150"
# curl -L -O https://ngen-datastream.s3.us-east-2.amazonaws.com/palisade.gpkg
- name: Base test and NWM_RETRO_V3
run: |
sudo rm -rf $(pwd)/data/datastream_test
sudo rm -rf $(pwd)/data/datastream_test
./scripts/stream.sh -s 202006200100 -e 202006200200 -C NWM_RETRO_V3 -d $(pwd)/data/datastream_test -g $(pwd)/palisade.gpkg -R $(pwd)/configs/ngen/realization_sloth_nom_cfe_pet.json
- name: Cache resource directory
Expand All @@ -60,6 +58,12 @@ jobs:
cp -r ./data/cache/datastream-resources ./data/cache/datastream-resources-missing
sudo rm -rf ./data/cache/datastream-resources-no-forcings/ngen-forcings
- name: NextGen forcings CLI option test
if: always()
run: |
sudo rm -rf $(pwd)/data/datastream_test
./scripts/stream.sh -g $(pwd)/palisade.gpkg -R $(pwd)/data/cache/datastream-resources/config/realization_sloth_nom_cfe_pet.json -F $(pwd)/data/cache/datastream-resources/ngen-forcings/1_forcings.nc -s 202006200100 -e 202006200200 -C NWM_RETRO_V3 -d $(pwd)/data/datastream_test
- name: Resource directory test missing all
if: always()
run: |
Expand Down Expand Up @@ -101,11 +105,11 @@ jobs:
TODAY=$(env TZ=US/Eastern date +'%Y%m%d')
./scripts/stream.sh -r ./data/cache/datastream-resources-no-forcings -s $TODAY"0100" -e $TODAY"0200" -C NOMADS -d $(pwd)/data/datastream_test
# - name: Test hfsubset options
# if: always()
# run: |
# sudo rm -rf $(pwd)/data/datastream_test
# ./scripts/stream.sh -s 202006200100 -e 202006200200 -C NWM_RETRO_V3 -d $(pwd)/data/datastream_test -I "Gages-09106150" -i hl -v 2.1.1 -R $(pwd)/configs/ngen/realization_sloth_nom_cfe_pet.json
- name: Test hfsubset options
if: always()
run: |
sudo rm -rf $(pwd)/data/datastream_test
./scripts/stream.sh -s 202006200100 -e 202006200200 -C NWM_RETRO_V3 -d $(pwd)/data/datastream_test -I "Gages-09106150" -i hl -v 2.1.1 -R $(pwd)/configs/ngen/realization_sloth_nom_cfe_pet.json
- name: S3 write out test
if: always()
Expand Down Expand Up @@ -140,10 +144,10 @@ jobs:
./scripts/stream.sh -s DAILY -C NWM_MEDIUM_RANGE -d $(pwd)/data/datastream_test -g $(pwd)/palisade.gpkg -R $(pwd)/configs/ngen/realization_sloth_nom_cfe_pet.json
# - name: DAILY analysis assim extend today test
# if: always()
# run: |
# sudo rm -rf $(pwd)/data/datastream_test
# ./scripts/stream.sh -s DAILY -C NWM_ANALYSIS_ASSIM_EXTEND -e $(date -d '-2 day' '+%Y%m%d0000') -d $(pwd)/data/datastream_test -g $(pwd)/palisade.gpkg -R $(pwd)/configs/ngen/realization_sloth_nom_cfe_pet.json
- name: DAILY analysis assim extend today test
if: always()
run: |
sudo rm -rf $(pwd)/data/datastream_test
./scripts/stream.sh -s DAILY -C NWM_ANALYSIS_ASSIM_EXTEND -e $(date -d '-2 day' '+%Y%m%d0000') -d $(pwd)/data/datastream_test -g $(pwd)/palisade.gpkg -R $(pwd)/configs/ngen/realization_sloth_nom_cfe_pet.json

10 changes: 8 additions & 2 deletions forcingprocessor/src/forcingprocessor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from datetime import datetime
import gzip
import tarfile, tempfile
from forcingprocessor.weights_hf2ds import hf2ds
from forcingprocessor.weights_hf2ds import multiprocess_hf2ds
from forcingprocessor.plot_forcings import plot_ngen_forcings
from forcingprocessor.utils import get_window, log_time, convert_url2key, report_usage, nwm_variables, ngen_variables

Expand Down Expand Up @@ -784,7 +784,7 @@ def prep_ngen_data(conf):
log_time("READWEIGHTS_START", log_file)
if ii_verbose: print(f'Obtaining weights from geopackage(s)\n',flush=True)
global weights_json
weights_json, jcatchment_dict = hf2ds(gpkg_files)
weights_json, jcatchment_dict = multiprocess_hf2ds(gpkg_files)
ncatchments = len(weights_json)
global x_min, x_max, y_min, y_max
x_min, x_max, y_min, y_max = get_window(weights_json)
Expand Down Expand Up @@ -833,6 +833,12 @@ def prep_ngen_data(conf):
# t_ax = t_ax
# nwm_data=nwm_data[0][None,:]
data_array, t_ax, nwm_data = multiprocess_data_extract(jnwm_files,nprocs,weights_json,fs)

if datetime.strptime(t_ax[0],'%Y-%m-%d %H:%M:%S') > datetime.strptime(t_ax[-1],'%Y-%m-%d %H:%M:%S'):
# Hack to ensure data is always written out with time moving forward.
t_ax=list(reversed(t_ax))
data_array = np.flip(data_array,axis=0)

t_extract = time.perf_counter() - t0
complexity = (nfiles_tot * ncatchments) / 10000
score = complexity / t_extract
Expand Down
33 changes: 32 additions & 1 deletion forcingprocessor/src/forcingprocessor/weights_hf2ds.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,43 @@
import json
import argparse, time
import geopandas as gpd
import boto3
import re, os
import concurrent.futures as cf
import pandas as pd
gpd.options.io_engine = "pyogrio"

def multiprocess_hf2ds(files : list):
nprocs = min(len(files),os.cpu_count())
i = 0
k = 0
nfiles = len(files)
files_list = []
nper = nfiles // nprocs
nleft = nfiles - (nper * nprocs)
for i in range(nprocs):
k = nper + i + nleft
files_list.append(files[i:k])
i=k

weight_jsons = []
jcatchment_dicts = []
with cf.ProcessPoolExecutor(max_workers=nprocs) as pool:
for results in pool.map(
hf2ds,
files_list,
):
weight_jsons.append(results[0])
jcatchment_dicts.append(results[1])

print(f'Processes have returned')
weight_json = {}
[weight_json.update(x) for x in weight_jsons]
jcatchment_dict = {}
[jcatchment_dict.update(x) for x in jcatchment_dicts]

return weight_json, jcatchment_dict


def hf2ds(files : list):
"""
Extracts the weights from a list of files
Expand Down
18 changes: 9 additions & 9 deletions forcingprocessor/tests/test_forcingprocessor.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import os
from pathlib import Path
from datetime import datetime
from datetime import datetime
from datetime import datetime, timedelta, timezone
from forcingprocessor.processor import prep_ngen_data
from forcingprocessor.nwm_filenames_generator import generate_nwmfiles
import pytz as tz
import pytest

HF_VERSION="v2.1.1"
date = datetime.now(tz.timezone('US/Eastern'))
date = datetime.now(timezone.utc)
date = date.strftime('%Y%m%d')
hourminute = '0000'
yesterday = datetime.now(timezone.utc) - timedelta(hours=24)
yesterday = yesterday.strftime('%Y%m%d')
test_dir = Path(__file__).parent
data_dir = (test_dir/'data').resolve()
forcings_dir = (data_dir/'forcings').resolve()
Expand Down Expand Up @@ -163,7 +163,6 @@ def test_noaa_nwm_pds_https_medium_range():
os.remove(assert_file)

def test_noaa_nwm_pds_https_analysis_assim():
assert False, f'test_nomads_post_processed() is BROKEN - https://github.com/CIROH-UA/nwmurl/issues/36'
nwmurl_conf['start_date'] = date + hourminute
nwmurl_conf['end_date'] = date + hourminute
nwmurl_conf["urlbaseinput"] = 7
Expand All @@ -174,11 +173,11 @@ def test_noaa_nwm_pds_https_analysis_assim():
os.remove(assert_file)

def test_noaa_nwm_pds_https_analysis_assim_extend():
assert False, f'test_nomads_post_processed() is BROKEN - https://github.com/CIROH-UA/nwmurl/issues/36'
nwmurl_conf['start_date'] = date + hourminute
nwmurl_conf['end_date'] = date + hourminute
nwmurl_conf['start_date'] = yesterday + hourminute
nwmurl_conf['end_date'] = yesterday + hourminute
nwmurl_conf["urlbaseinput"] = 7
nwmurl_conf["runinput"] = 7
nwmurl_conf["runinput"] = 6
nwmurl_conf["fcst_cycle"] = [16]
generate_nwmfiles(nwmurl_conf)
prep_ngen_data(conf)
assert assert_file.exists()
Expand All @@ -189,6 +188,7 @@ def test_noaa_nwm_pds_s3():
nwmurl_conf['end_date'] = date + hourminute
nwmurl_conf["runinput"] = 1
nwmurl_conf["urlbaseinput"] = 8
nwmurl_conf["fcst_cycle"] = [0]
generate_nwmfiles(nwmurl_conf)
prep_ngen_data(conf)
assert assert_file.exists()
Expand Down
37 changes: 25 additions & 12 deletions python_tools/src/python_tools/configure_datastream.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import argparse, json, os, copy, re
from datetime import datetime, timedelta
from datetime import datetime, timezone, timedelta
from pathlib import Path
import pytz as tz
import platform
import psutil

Expand Down Expand Up @@ -83,22 +82,25 @@ def create_conf_nwm(args):
start = args.start_date
end = args.end_date

fcst_cycle = 0

if "DAILY" in start:
if end == "":
start_dt = datetime.now(tz.timezone('US/Eastern'))
start_dt = datetime.now(timezone.utc)
else:
start_dt = datetime.strptime(end,'%Y%m%d%H%M')
end_dt = start_dt
num_hrs= 24
else:
start_dt = datetime.strptime(start,'%Y%m%d%H%M')
end_dt = datetime.strptime(end,'%Y%m%d%H%M')
num_hrs = (end_dt - start_dt).seconds // 3600

start_dt = start_dt.replace(hour=1,minute=0,second=0,microsecond=0)
end_dt = end_dt.replace(hour=1,minute=0,second=0,microsecond=0)
end_dt = end_dt.replace(hour=1,minute=0,second=0,microsecond=0)
start_str_real = start_dt.strftime('%Y-%m-%d %H:%M:%S')
end_str_real = end_dt.strftime('%Y-%m-%d %H:%M:%S')
start_str_nwm = start_dt.strftime('%Y%m%d%H%M')
end_str_nwm = start_dt.strftime('%Y%m%d%H%M')
end_str_nwm = start_dt.strftime('%Y%m%d%H%M')

if "RETRO" in args.forcing_source:
if "V2" in args.forcing_source:
Expand All @@ -115,8 +117,7 @@ def create_conf_nwm(args):
"write_to_file" : True
}
else:
varinput = 5
num_hrs = (end_dt - start_dt).seconds // 3600
varinput = 5

if "HAWAII" in args.forcing_source:
geoinput=2
Expand Down Expand Up @@ -146,12 +147,17 @@ def create_conf_nwm(args):
runinput=6
num_hrs=28
dt=0
fcst_cycle = 16
start_dt = start_dt - timedelta(hours=12)
start_str_real = start_dt.strftime('%Y-%m-%d %H:%M:%S')
else:
start_dt = start_dt - timedelta(hours=3)
start_str_real = start_dt.strftime('%Y-%m-%d %H:%M:%S')
runinput=5
num_hrs=3
else:
runinput=2
num_hrs = 24
num_hrs=24

nwm_conf = {
"forcing_type" : "operational_archive",
Expand All @@ -162,10 +168,13 @@ def create_conf_nwm(args):
"geoinput" : geoinput,
"meminput" : 0,
"urlbaseinput" : urlbaseinput,
"fcst_cycle" : [0],
"fcst_cycle" : [fcst_cycle],
"lead_time" : [x+dt for x in range(num_hrs)]
}

end_str_real = start_dt + timedelta(hours=num_hrs-1)
end_str_real = end_str_real.strftime('%Y-%m-%d %H:%M:%S')

return nwm_conf, start_str_real, end_str_real

def create_conf_fp(args):
Expand All @@ -178,7 +187,7 @@ def create_conf_fp(args):
output_file_type = ["netcdf"]
if len(args.s3_bucket) > 0:
if "DAILY" in args.start_date:
args.s3_prefix = re.sub(r"\$DAILY",datetime.now(tz.timezone('US/Eastern')).strftime('%Y%m%d'),args.s3_prefix)
args.s3_prefix = re.sub(r"\DAILY",datetime.now(timezone.utc).strftime('%Y%m%d'),args.s3_prefix)
output_path = f"s3://{args.s3_bucket}/{args.s3_prefix}"
elif len(args.docker_mount) > 0:
gpkg_file = [f"{args.docker_mount}/datastream-resources/config/{geo_base}"]
Expand Down Expand Up @@ -225,6 +234,10 @@ def create_confs(args):
nwm_conf = {}
fp_conf = {}
fp_conf['forcing'] = args.forcings
start_real = datetime.strptime(args.start_date,'%Y%m%d%H%M')
end_real = datetime.strptime(args.end_date,'%Y%m%d%H%M')
start_real = start_dt.strftime('%Y-%m-%d %H:%M:%S')
end_real = end_dt.strftime('%Y-%m-%d %H:%M:%S')
elif os.path.exists(os.path.join(args.resource_path,"nwm-forcings")):
nwm_conf = {}
fp_conf = create_conf_fp(args)
Expand Down Expand Up @@ -297,4 +310,4 @@ def create_confs(args):
if args.hydrofabric_version == "": args.hydrofabric_version="v2.1.1"
if args.nprocs == "": args.nprocs=os.cpu_count()

create_confs(args)
create_confs(args)
Loading

0 comments on commit 52d65a7

Please sign in to comment.