Skip to content

Commit

Permalink
CI
Browse files Browse the repository at this point in the history
  • Loading branch information
JordanLaserGit committed Nov 21, 2023
1 parent 7fa0b0a commit 0931459
Show file tree
Hide file tree
Showing 13 changed files with 352 additions and 29 deletions.
2 changes: 1 addition & 1 deletion forcingprocessor/configs/conf_dailyrun_template.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"forcing" : {
"start_date" : "202311060000",
"end_date" : "202311060000",
"nwm_file" : "",
"nwm_file" : "/home/ec2-user/ngen-datastream/forcingprocessor/src/filenamelist.txt",
"weight_file" : "https://ngenresourcesdev.s3.us-east-2.amazonaws.com/01_weights.json"
},

Expand Down
6 changes: 6 additions & 0 deletions forcingprocessor/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[build-system]
build-backend = "setuptools.build_meta"
requires = [
"setuptools>=42",
"wheel",
]
88 changes: 88 additions & 0 deletions forcingprocessor/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
affine==2.4.0
asciitree==0.3.3
attrs==23.1.0
beautifulsoup4==4.12.2
bokeh==3.2.0
boto3==1.28.3
botocore==1.31.3
Bottleneck==1.3.7
certifi==2023.5.7
cftime==1.6.2
charset-normalizer==3.2.0
click==8.1.5
click-plugins==1.1.1
cligj==0.7.2
cloudpickle==2.2.1
contourpy==1.1.0
cycler==0.11.0
dask==2023.7.0
distributed==2023.7.0
docopt==0.6.2
entrypoints==0.4
fasteners==0.18
Fiona==1.9.4.post1
flox==0.7.2
fonttools==4.41.0
fsspec==2023.6.0
geopandas==0.13.2
h5netcdf==1.2.0
h5py==3.9.0
idna==3.4
importlib-metadata==6.8.0
importlib-resources==6.0.0
Jinja2==3.1.2
jmespath==1.0.1
kiwisolver==1.4.4
llvmlite==0.40.1
locket==1.0.0
lz4==4.3.2
MarkupSafe==2.1.3
matplotlib==3.7.2
msgpack==1.0.5
nc-time-axis==1.4.1
netCDF4==1.6.4
numba==0.57.1
numbagg==0.2.2
numcodecs==0.11.0
numpy==1.24.4
numpy-groupies==0.9.22
nwmurl==0.1.5
packaging==23.1
pandas==2.0.3
partd==1.4.0
pathlib==1.0.1
Pillow==10.0.0
platformdirs==3.8.1
pooch==1.7.0
psutil==5.9.5
pyarrow==12.0.1
pydap==3.4.1
pyparsing==3.0.9
pyproj==3.6.0
python-dateutil==2.8.2
pytz==2023.3
PyYAML==6.0
rasterio==1.3.8
requests==2.31.0
rioxarray==0.14.1
s3fs==0.4.2
s3transfer==0.6.1
scipy==1.11.1
seaborn==0.12.2
shapely==2.0.1
six==1.16.0
snuggs==1.4.7
sortedcontainers==2.4.0
soupsieve==2.4.1
tblib==2.0.0
toolz==0.12.0
tornado==6.3.2
tqdm==4.66.1
tzdata==2023.3
urllib3==1.26.16
WebOb==1.8.7
xarray==2023.6.0
xyzservices==2023.7.0
zarr==2.15.0
zict==3.0.0
zipp==3.16.1
127 changes: 127 additions & 0 deletions forcingprocessor/setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
[metadata]
name = forcingprocessor
version = attr: forcingprocessor._version.__version__
author = Jordan J. Laser
author_email = jlaser@lynker.com
description = Tool to convert nwm forcing netcdfs to ngen compatible files
long_description = file: README.md
long_description_content_type = text/markdown; charset=UTF-8
license = USDOC
license_files =
LICENSE
url = https://github.com/CIROH-UA/ngen-datastream
project_urls =
Source = https://github.com/CIROH-UA/ngen-datastream/forcingprocessor
Tracker = https://github.com/CIROH-UA/ngen-datastream/issues
classifiers =
Development Status :: 3 - Alpha
Intended Audience :: Education
Intended Audience :: Science/Research
License :: Free To Use But Restricted
Programming Language :: Python :: 3.9
Topic :: Scientific/Engineering :: Hydrology
Operating System :: OS Independent

[options]
packages = find_namespace:
package_dir =
=src
install_requires =
affine==2.4.0
asciitree==0.3.3
attrs==23.1.0
beautifulsoup4==4.12.2
bokeh==3.2.0
boto3==1.28.3
botocore==1.31.3
Bottleneck==1.3.7
certifi==2023.5.7
cftime==1.6.2
charset-normalizer==3.2.0
click==8.1.5
click-plugins==1.1.1
cligj==0.7.2
cloudpickle==2.2.1
contourpy==1.1.0
cycler==0.11.0
dask==2023.7.0
distributed==2023.7.0
docopt==0.6.2
entrypoints==0.4
fasteners==0.18
Fiona==1.9.4.post1
flox==0.7.2
fonttools==4.41.0
fsspec==2023.6.0
geopandas==0.13.2
h5netcdf==1.2.0
h5py==3.9.0
idna==3.4
importlib-metadata==6.8.0
importlib-resources==6.0.0
Jinja2==3.1.2
jmespath==1.0.1
kiwisolver==1.4.4
llvmlite==0.40.1
locket==1.0.0
lz4==4.3.2
MarkupSafe==2.1.3
matplotlib==3.7.2
msgpack==1.0.5
nc-time-axis==1.4.1
netCDF4==1.6.4
numba==0.57.1
numbagg==0.2.2
numcodecs==0.11.0
numpy==1.24.4
numpy-groupies==0.9.22
nwmurl==0.1.5
packaging==23.1
pandas==2.0.3
partd==1.4.0
pathlib==1.0.1
Pillow==10.0.0
platformdirs==3.8.1
pooch==1.7.0
psutil==5.9.5
pyarrow==12.0.1
pydap==3.4.1
pyparsing==3.0.9
pyproj==3.6.0
python-dateutil==2.8.2
pytz==2023.3
PyYAML==6.0
rasterio==1.3.8
requests==2.31.0
rioxarray==0.14.1
s3fs==0.4.2
s3transfer==0.6.1
scipy==1.11.1
seaborn==0.12.2
shapely==2.0.1
six==1.16.0
snuggs==1.4.7
sortedcontainers==2.4.0
soupsieve==2.4.1
tblib==2.0.0
toolz==0.12.0
tornado==6.3.2
tqdm==4.66.1
tzdata==2023.3
urllib3==1.26.16
WebOb==1.8.7
xarray==2023.6.0
xyzservices==2023.7.0
zarr==2.15.0
zict==3.0.0
zipp==3.16.1

python_requires = >=3.9
include_package_data = True

[options.packages.find]
where = src

[options.extras_require]
develop =
pytest
6 changes: 0 additions & 6 deletions forcingprocessor/src/filenamelist.txt

This file was deleted.

1 change: 1 addition & 0 deletions forcingprocessor/src/forcingprocessor/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from ._version import __version__
1 change: 1 addition & 0 deletions forcingprocessor/src/forcingprocessor/_version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__version__ = '0.0.1'
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,10 @@ def forcing_grid2catchment(crosswalk_dict: dict, nwm_files: list, var_list: list
if ii_verbose: print(f'\nTime for s3 open file: {topen:.2f}\nTime for xarray open dataset: {txrds:.2f}\nTime to fill array: {tfill:.2f}\nTime to calculate catchment values: {tdata:.2f}\nAverage time per file {ttotal/(j+1)}', end=None,flush=True)
report_usage()

if ii_verbose: print(f'{id} comleted data extraction, returning data to primary process')
if ii_verbose: print(f'{id} completed data extraction, returning data to primary process')
return [data_list, t_list]

def threaded_write_fun(data,t_ax,catchments,nprocs,storage_type,output_bucket,output_file_type,out_path,vars,ii_append):
def threaded_write_fun(data,t_ax,catchments,nprocs,storage_type,output_file_type,output_bucket,out_path,vars,ii_append):
"""
Sets up the thread pool for write_data
Expand Down Expand Up @@ -243,14 +243,14 @@ def threaded_write_fun(data,t_ax,catchments,nprocs,storage_type,output_bucket,ou

ids = []
with cf.ProcessPoolExecutor(max_workers=nprocs) as pool:
for results in pool.map(
for results in pool.map(
write_data,
worker_data_list,
worker_time_list,
worker_catchment_list,
storage_type_list,
bucket_list,
storage_type_list,
output_file_type_list,
bucket_list,
out_path_list,
var_list,
append_list,
Expand Down Expand Up @@ -309,7 +309,7 @@ def write_data(
t0 = time.perf_counter()

if ii_append:
key = out_path + f"cat-{cat_id}.csv"
key = (out_path/f"cat-{cat_id}.csv").resolve()
df_bucket = pd.read_csv(s3_client.get_object(Bucket = bucket, Key = key).get("Body"))
df = pd.concat([df_bucket,df])
del df_bucket
Expand All @@ -331,7 +331,7 @@ def write_data(
t_put += time.perf_counter() - t0

elif storage_type == 'local':
filename = out_path + f"/cat-{cat_id}." + output_file_type
filename = str((out_path/Path(f"cat-{cat_id}." + output_file_type)).resolve())
if output_file_type == "parquet":
df.to_parquet(filename, index=False)
elif output_file_type == "csv":
Expand Down Expand Up @@ -481,17 +481,17 @@ def prep_ngen_data(conf):
output_path = os.path.join(os.getcwd(),datentime)

# Prep output directory
top_dir = Path(os.path.dirname(__file__)).parent
bucket_path = Path(top_dir, output_path, output_bucket)
bucket_path = Path(output_path, output_bucket)
forcing_path = Path(bucket_path, 'forcings')
meta_path = Path(bucket_path, 'metadata')
metaf_path = Path(bucket_path, 'metadata','forcings_metadata')
if not os.path.exists(bucket_path): os.system(f"mkdir {bucket_path}")
if not os.path.exists(forcing_path): os.system(f"mkdir {forcing_path}")
if not os.path.exists(meta_path): os.system(f"mkdir {meta_path}")
if not os.path.exists(metaf_path): os.system(f"mkdir {metaf_path}")
output_path = bucket_path

with open(f"{output_path}/metadata/forcings_metadata/conf.json", 'w') as f:
with open(f"{bucket_path}/metadata/forcings_metadata/conf.json", 'w') as f:
json.dump(conf, f)

elif storage_type == "S3":
Expand Down Expand Up @@ -571,7 +571,7 @@ def prep_ngen_data(conf):
if ii_verbose: print(f'Data extract threads: {proc_threads:.2f}\nExtract time: {t_extract:.2f}\nComplexity: {complexity:.2f}\nScore: {score:.2f}\n', end=None)

t0 = time.perf_counter()
out_path = output_path + '/forcings/'
out_path = (output_path/'forcings/').resolve()
if ii_verbose: print(f'Writing catchment forcings to {output_bucket} at {out_path}!', end=None)
forcing_cat_ids = threaded_write_fun(data_array,t_ax,crosswalk_dict.keys(),write_threads,storage_type,output_file_type,output_bucket,out_path,var_list_out,ii_append)

Expand Down Expand Up @@ -608,7 +608,7 @@ def prep_ngen_data(conf):
for j, jcatch in enumerate(forcing_cat_ids):
# Check forcing size
if j > 10: break
filename = f"{output_path}/forcings/" + f"cat-{jcatch}." + output_file_type
filename = str(output_path) + "/forcings/" + f"cat-{jcatch}." + output_file_type
if storage_type.lower() == 's3':
response = s3.head_object(
Bucket=output_bucket,
Expand All @@ -622,7 +622,7 @@ def prep_ngen_data(conf):

# get zipped size
zipname = f"cat-{jcatch}." + 'zip'
zip_dir = f"{output_path}/metadata/forcings_metadata/zipped_forcing/"
zip_dir = f"{str(output_path)}/metadata/forcings_metadata/zipped_forcing/"
key_name = zip_dir + zipname
if storage_type.lower() == 's3':
buf = BytesIO()
Expand Down Expand Up @@ -686,7 +686,7 @@ def prep_ngen_data(conf):
if storage_type == 'S3':

# Write files to s3 bucket
meta_path = f"{output_path}/metadata/forcings_metadata/"
meta_path = f"{str(output_path)}/metadata/forcings_metadata/"
buf = BytesIO()
filename = f"metadata." + output_file_type
if output_file_type == "csv": metadata_df.to_csv(buf, index=False)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@
import json, argparse, os
from pathlib import Path

def generate_weights_file(geopackage,grid,weights_filepath):
def generate_weights_file(geopackage,grid_file,weights_filepath):

try:
ds = xr.open_dataset(grid_file,engine='h5netcdf')
grid = ds['RAINRATE']
except:
raise Exception(f'\n\nThere\'s a problem with {example_grid_filepath}!\n')

g_df = gpd.read_file(geopackage, layer='divides')
gdf_proj = g_df.to_crs('PROJCS["Lambert_Conformal_Conic",GEOGCS["GCS_Sphere",DATUM["D_Sphere",SPHEROID["Sphere",6370000.0,0.0]], \
Expand Down Expand Up @@ -52,10 +58,4 @@ def generate_weights_file(geopackage,grid,weights_filepath):
args = parser.parse_args()
if 'example_grid_filepath' in args: example_grid_filepath = args.example_grid_filepath

try:
ds = xr.open_dataset(example_grid_filepath,engine='h5netcdf')
grid = ds['RAINRATE']
except:
raise Exception(f'\n\nThere\'s a problem with {example_grid_filepath}!\n')

generate_weights_file(args.geopackage, grid, args.weights_filename)
generate_weights_file(args.geopackage, example_grid_filepath, args.weights_filename)
1 change: 1 addition & 0 deletions forcingprocessor/tests/data/small_weights.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions forcingprocessor/tests/data/weights_test.json

Large diffs are not rendered by default.

Loading

0 comments on commit 0931459

Please sign in to comment.