Skip to content

Commit

Permalink
update configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
JordanLaserGit committed Sep 26, 2024
1 parent e927923 commit 561ec97
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 74 deletions.
133 changes: 66 additions & 67 deletions python_tools/src/python_tools/configure_datastream.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import argparse, json, os
import argparse, json, os, copy, re
from datetime import datetime, timedelta
from pathlib import Path
import pytz as tz
import platform
import psutil

PATTERN_VPU = r'\$VPU'

def bytes2human(n):
# http://code.activestate.com/recipes/578019
# >>> bytes2human(10000)
Expand Down Expand Up @@ -56,47 +58,30 @@ def write_json(conf, out_dir, name):
json.dump(conf, fp, indent=2)
return conf_path

def create_conf_fp(start,end,nprocs,docker_mount,forcing_split_vpu,retro_or_op,geo_base):
def create_conf_fp(start,end,nprocs,docker_mount,forcing_split_vpu,retro_or_op,geo_base,hf_version):
if retro_or_op == "retrospective":
filename = "retro_filenamelist.txt"
else:
filename = "filenamelist.txt"

if forcing_split_vpu:
weights = [
"s3://ngen-datastream/resources/v20.1/VPU_01/weights.json",
"s3://ngen-datastream/resources/v20.1/VPU_02/weights.json",
"s3://ngen-datastream/resources/v20.1/VPU_03N/weights.json",
"s3://ngen-datastream/resources/v20.1/VPU_03S/weights.json",
"s3://ngen-datastream/resources/v20.1/VPU_03W/weights.json",
"s3://ngen-datastream/resources/v20.1/VPU_04/weights.json",
"s3://ngen-datastream/resources/v20.1/VPU_05/weights.json",
"s3://ngen-datastream/resources/v20.1/VPU_06/weights.json",
"s3://ngen-datastream/resources/v20.1/VPU_07/weights.json",
"s3://ngen-datastream/resources/v20.1/VPU_08/weights.json",
"s3://ngen-datastream/resources/v20.1/VPU_09/weights.json",
"s3://ngen-datastream/resources/v20.1/VPU_10L/weights.json",
"s3://ngen-datastream/resources/v20.1/VPU_10U/weights.json",
"s3://ngen-datastream/resources/v20.1/VPU_11/weights.json",
"s3://ngen-datastream/resources/v20.1/VPU_12/weights.json",
"s3://ngen-datastream/resources/v20.1/VPU_13/weights.json",
"s3://ngen-datastream/resources/v20.1/VPU_14/weights.json",
"s3://ngen-datastream/resources/v20.1/VPU_15/weights.json",
"s3://ngen-datastream/resources/v20.1/VPU_16/weights.json",
"s3://ngen-datastream/resources/v20.1/VPU_17/weights.json",
"s3://ngen-datastream/resources/v20.1/VPU_18/weights.json"
]
output_path = f"s3://ngen-datastream/forcings/v20.1/{start}-{end}"
output_file_type = ["tar"]
if len(forcing_split_vpu) > 0:
template = "https://lynker-spatial.s3-us-west-2.amazonaws.com/hydrofabric/{hf_version}/nextgen/conus_forcing-weights/vpuid%3D$VPU/part-0.parquet"
gpkg_file = []
for jvpu in forcing_split_vpu:
tmpl_cpy = copy.deepcopy(template)
gpkg_file.append(re.sub(PATTERN_VPU, jvpu, tmpl_cpy))

output_path = f"s3://ngen-datastream/forcings/{hf_version}/{start}-{end}"
output_file_type = ["netcdf"]
else:
weights = [f"{docker_mount}/datastream-resources/config/{geo_base}"]
gpkg_file = [f"{docker_mount}/datastream-resources/config/{geo_base}"]
output_path = f"{docker_mount}/ngen-run"
output_file_type = ["netcdf"]

fp_conf = {
"forcing" : {
"nwm_file" : f"{docker_mount}/datastream-metadata/{filename}",
"gpkg_file" : weights,
"gpkg_file" : gpkg_file,
},
"storage" : {
"output_path" : output_path,
Expand All @@ -111,7 +96,7 @@ def create_conf_fp(start,end,nprocs,docker_mount,forcing_split_vpu,retro_or_op,g

return fp_conf

def create_conf_nwm(start, end, retro_or_op,urlbaseinput):
def create_conf_nwm(start, end, retro_or_op,runinput,urlbaseinput):

if retro_or_op == "retrospective":
nwm_conf = {
Expand All @@ -136,7 +121,7 @@ def create_conf_nwm(start, end, retro_or_op,urlbaseinput):
"forcing_type" : "operational_archive",
"start_date" : start,
"end_date" : end,
"runinput" : 2,
"runinput" : runinput,
"varinput" : 5,
"geoinput" : 1,
"meminput" : 0,
Expand All @@ -148,44 +133,58 @@ def create_conf_nwm(start, end, retro_or_op,urlbaseinput):
return nwm_conf

def create_confs(args):
forcing_split_vpu = args.forcing_split_vpu.split(',')
conf = config_class2dict(args)
realization = args.realization_file
geo_base = args.gpkg.split('/')[-1]

if "OPERATIONAL" in args.forcing_source:
retro_or_op = "operational"
if "V3" in args.forcing_source:
urlbaseinput = 7
if "NOMADS" in args.forcing_source:
urlbaseinput = 1
elif "RETRO" in args.forcing_source:
retro_or_op = "retrospective"
if "V2" in args.forcing_source:
urlbaseinput = 1
if "V3" in args.forcing_source:
urlbaseinput = 4
geo_base = args.gpkg.split('/')[-1]

if conf['globals']['start_date'] == "DAILY":
if conf['globals']['end_date'] != "":
if "DAILY" in conf['globals']['start_date']:
retro_or_op = "operational"
urlbaseinput = 7
duration = 24
runinput = 2
if conf['globals']['end_date'] == "":
start_date = datetime.now(tz.timezone('US/Eastern'))
else:
# allows for a "daily" run that is not the current date
start_date = datetime.strptime(conf['globals']['end_date'],'%Y%m%d%H%M')
else:
start_date = datetime.now(tz.timezone('US/Eastern'))
retro_or_op="operational"
urlbaseinput=7
today = start_date.replace(hour=1, minute=0, second=0, microsecond=0)
tomorrow = today + timedelta(hours=23)
retro_or_op="operational"

if "SHORT_RANGE" in conf['globals']['start_date']:
duration = 18
runinput = 1
if "MEDIUM_RANGE" in conf['globals']['start_date']:
duration = 240
runinput = 2

if conf['globals'].get('data_path',"") == "":
conf['globals']['data_path'] = today.strftime('%Y%m%d')
start_datetime0 = start_date.replace(hour=1, minute=0, second=0, microsecond=0)
end_datetime = start_datetime0 + timedelta(hours=duration-1)
start_str = start_datetime0.strftime('%Y%m%d%H%M')
start_realization = start_datetime0.strftime('%Y-%m-%d %H:%M:%S')
end_str = end_datetime.strftime('%Y%m%d%H%M')
end_realization = end_datetime.strftime('%Y-%m-%d %H:%M:%S')

nwm_conf = create_conf_nwm(start_str, end_str, retro_or_op, runinput, urlbaseinput)
fp_conf = create_conf_fp(start_str, end_str, conf['globals']['nprocs'],args.docker_mount,forcing_split_vpu,retro_or_op,geo_base,args.hydrofabric_version)

start = today.strftime('%Y%m%d%H%M')
end = tomorrow.strftime('%Y%m%d%H%M')
start_realization = today.strftime('%Y-%m-%d %H:%M:%S')
end_realization = tomorrow.strftime('%Y-%m-%d %H:%M:%S')
nwm_conf = create_conf_nwm(start, end, retro_or_op, urlbaseinput)
fp_conf = create_conf_fp(start, end, conf['globals']['nprocs'],args.docker_mount,args.forcing_split_vpu,retro_or_op,geo_base)
else:
if "OPERATIONAL" in args.forcing_source:
retro_or_op = "operational"
runinput = 2
if "V3" in args.forcing_source:
urlbaseinput = 7
if "NOMADS" in args.forcing_source:
urlbaseinput = 1
elif "RETRO" in args.forcing_source:
retro_or_op = "retrospective"
runinput = None
if "V2" in args.forcing_source:
urlbaseinput = 1
if "V3" in args.forcing_source:
urlbaseinput = 4
else:
raise Exception(f'forcing_source not understood, lacks either OPERATIONAL or RETRO')

start = conf['globals']['start_date']
end = conf['globals']['end_date']
start_realization_dt = datetime.strptime(start,'%Y%m%d%H%M')
Expand All @@ -198,10 +197,10 @@ def create_confs(args):
fp_conf['forcing'] = args.forcings
elif os.path.exists(os.path.join(args.resource_path,"nwm-forcings")):
nwm_conf = {}
fp_conf = create_conf_fp(start, end, conf['globals']['nprocs'], args.docker_mount, args.forcing_split_vpu,retro_or_op,geo_base)
fp_conf = create_conf_fp(start, end, conf['globals']['nprocs'], args.docker_mount, forcing_split_vpu,retro_or_op,geo_base,args.hydrofabric_version)
else:
nwm_conf = create_conf_nwm(start,end, retro_or_op,urlbaseinput)
fp_conf = create_conf_fp(start, end, conf['globals']['nprocs'], args.docker_mount, args.forcing_split_vpu,retro_or_op,geo_base)
nwm_conf = create_conf_nwm(start,end, retro_or_op,runinput,urlbaseinput)
fp_conf = create_conf_fp(start, end, conf['globals']['nprocs'], args.docker_mount, forcing_split_vpu,retro_or_op,geo_base,args.hydrofabric_version)

conf['nwmurl'] = nwm_conf
conf['forcingprocessor'] = nwm_conf
Expand Down Expand Up @@ -257,8 +256,8 @@ def create_confs(args):
parser.add_argument("--host_platform", type=str,help="Type of host",default="")
parser.add_argument("--host_os", type=str,help="Operating system of host",default="")
parser.add_argument("--domain_name", type=str,help="Name of spatial domain",default="Not Specified")
parser.add_argument("--forcing_split_vpu", type=bool,help="true for forcingprocessor split",default=False)
parser.add_argument("--forcing_split_vpu", type=str,help="true for forcingprocessor split",default="")
parser.add_argument("--realization_file", type=str,help="ngen realization file",required=True)

args = parser.parse_args()
args = parser.parse_args()
create_confs(args)
78 changes: 71 additions & 7 deletions python_tools/tests/test_configurer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __init__(self,
nprocs=None,
host_os="",
domain_name="Not Specified",
forcing_split_vpu=False,
forcing_split_vpu="",
realization_file=""):
self.docker_mount = docker_mount
self.start_date = start_date
Expand Down Expand Up @@ -67,7 +67,7 @@ def __init__(self,
nprocs = 2,
host_os = "Linux",
domain_name = "",
forcing_split_vpu = False,
forcing_split_vpu = "",
realization_file = str(REALIZATION_ORIG)
)

Expand Down Expand Up @@ -99,16 +99,13 @@ def test_conf_daily():

with open(REALIZATION_RUN,'r') as fp:
data = json.load(fp)

start = datetime.strptime(data['time']['start_time'],"%Y-%m-%d %H:%M:%S")
assert start.day == datetime.now(tz.timezone('US/Eastern')).day

with open(CONF_NWM,'r') as fp:
data = json.load(fp)

assert data['urlbaseinput'] == 7


def test_conf_daily_pick():
inputs.start_date = "DAILY"
inputs.end_date = "202006200000"
Expand All @@ -122,14 +119,81 @@ def test_conf_daily_pick():

with open(REALIZATION_RUN,'r') as fp:
data = json.load(fp)

start = datetime.strptime(data['time']['start_time'],"%Y-%m-%d %H:%M:%S")
assert start.day == datetime.strptime(inputs.end_date,"%Y%m%d%H%M%S").day

with open(CONF_NWM,'r') as fp:
data = json.load(fp)
assert data['urlbaseinput'] == 7
assert data['runinput'] == 2

def test_conf_daily_short_range():
inputs.start_date = "DAILY_SHORT_RANGE"
inputs.end_date = ""
create_confs(inputs)
assert os.path.exists(CONF_NWM)
assert os.path.exists(CONF_FP)
assert os.path.exists(CONF_DATASTREAM)
assert os.path.exists(REALIZATION_META_USER)
assert os.path.exists(REALIZATION_META_DS)
assert os.path.exists(REALIZATION_RUN)

with open(REALIZATION_RUN,'r') as fp:
data = json.load(fp)
start = datetime.strptime(data['time']['start_time'],"%Y-%m-%d %H:%M:%S")
assert start.day == datetime.now(tz.timezone('US/Eastern')).day

with open(CONF_NWM,'r') as fp:
data = json.load(fp)
assert data['urlbaseinput'] == 7
assert data['runinput'] == 1

def test_conf_daily_medium_range():
inputs.start_date = "DAILY_MEDIUM_RANGE"
inputs.end_date = ""
create_confs(inputs)
assert os.path.exists(CONF_NWM)
assert os.path.exists(CONF_FP)
assert os.path.exists(CONF_DATASTREAM)
assert os.path.exists(REALIZATION_META_USER)
assert os.path.exists(REALIZATION_META_DS)
assert os.path.exists(REALIZATION_RUN)

with open(REALIZATION_RUN,'r') as fp:
data = json.load(fp)
start = datetime.strptime(data['time']['start_time'],"%Y-%m-%d %H:%M:%S")
assert start.day == datetime.now(tz.timezone('US/Eastern')).day

with open(CONF_NWM,'r') as fp:
data = json.load(fp)
assert data['urlbaseinput'] == 7
assert data['runinput'] == 2


def test_conf_daily_short_range_split_vpu():
inputs.start_date = "DAILY_SHORT_RANGE"
inputs.end_date = ""
inputs.forcing_split_vpu = "01,02,03W,16"
create_confs(inputs)
assert os.path.exists(CONF_NWM)
assert os.path.exists(CONF_FP)
assert os.path.exists(CONF_DATASTREAM)
assert os.path.exists(REALIZATION_META_USER)
assert os.path.exists(REALIZATION_META_DS)
assert os.path.exists(REALIZATION_RUN)

with open(REALIZATION_RUN,'r') as fp:
data = json.load(fp)
start = datetime.strptime(data['time']['start_time'],"%Y-%m-%d %H:%M:%S")
assert start.day == datetime.now(tz.timezone('US/Eastern')).day

assert data['urlbaseinput'] == 4
with open(CONF_NWM,'r') as fp:
data = json.load(fp)
assert data['urlbaseinput'] == 7
assert data['runinput'] == 1

with open(CONF_FP,'r') as fp:
data = json.load(fp)
assert len(data['forcing']['gpkg_file']) == 4


0 comments on commit 561ec97

Please sign in to comment.