Skip to content
This repository has been archived by the owner on Jun 2, 2023. It is now read-only.

Commit

Permalink
pull entire time series of USGS NWIS data
Browse files Browse the repository at this point in the history
  • Loading branch information
amsnyder committed Jan 7, 2022
1 parent 3651eb7 commit e9e5055
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 35 deletions.
5 changes: 0 additions & 5 deletions 01_fetch/fetch_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ fetch_usgs.py:
- '01482695' # C AND D CANAL NR DELAWARE CITY, DE
- '01482800' # Delaware River at Reedy Island Jetty, DE

# start and end dates for data fetch
# should be in the format 'YYYY-MM-DD'
start_dt: '2019-01-01'
end_dt: '2019-12-31'

fetch_noaa_nos.py:
#choose where you want to write your data outputs: local or S3
write_location: 'local'
Expand Down
77 changes: 67 additions & 10 deletions 01_fetch/src/fetch_usgs.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,75 @@
import os
import urllib
import yaml
import datetime
import pandas as pd
import utils

def fetch_params(outfile, bucket, write_location, s3_client):
def fetch_site_info(site_num, outfile):
'''fetch USGS NWIS data for locations in site_list (gets all parameters available)'''
site_url = f'https://waterservices.usgs.gov/nwis/site/?format=rdb&sites={site_num}&seriesCatalogOutput=true'
print(f'fetcing site info for site {site_num} and saving locally')
urllib.request.urlretrieve(site_url, outfile)

def process_site_info_to_df(raw_site_info_txt, site_info_outfile_csv, s3_bucket, write_location, s3_client):
'''
process raw site info text file into a csv file,
return minimum date of measured data (for any parameter) as start date for site
'''
print(f'processing raw site info to get start date for site')
site_info_df = pd.read_csv(raw_site_info_txt, comment='#', sep='\t', lineterminator='\n')
site_info_df.drop(index=0, inplace=True)

# get subset of unit values, other codes listed below:
# “dv” (daily values)
# “uv” or “iv” (unit values)
# “qw” (water-quality)
# “sv” (sites visits)
# “pk” (peak measurements)
# “gw” (groundwater levels)
# “ad” (sites included in USGS Annual Water Data Reports External Link)
# “aw” (sites monitored by the USGS Active Groundwater Level Network External Link)
# “id” (historical instantaneous values)
site_info_df_subset = site_info_df.loc[(site_info_df['data_type_cd']=='uv') | (site_info_df['data_type_cd']=='iv'), :]
# subset to only columns of interest
site_info_df_subset = site_info_df_subset[['parm_cd', 'begin_date', 'end_date', 'count_nu']]
# save this table of info
site_info_df_subset.to_csv(site_info_outfile_csv, index=False)
if write_location == 'S3':
print('uploading to s3')
s3_client.upload_file(site_info_outfile_csv, s3_bucket, '01_fetch/out/metadata/'+os.path.basename(site_info_outfile_csv))

# get minimum date to pull for this site
start_dt = site_info_df_subset[['begin_date']].min().values[0]

return start_dt

def fetch_params(outfile):
'''get table of all possible USGS site parameters'''
params_url = 'https://help.waterdata.usgs.gov/code/parameter_cd_query?fmt=rdb&group_cd=%'
print('fetcing parameter file and saving locally')
urllib.request.urlretrieve(params_url, outfile)

def process_params_to_csv(raw_params_txt, params_outfile_csv, write_location, bucket, s3_client):
'''process raw parameter text file into a csv file'''
print('reading raw parameter data from local')
params_df = pd.read_csv(raw_params_txt, comment='#', sep='\t', lineterminator='\n')
print('processing parameter file and saving locally')
params_df.drop(index=0, inplace=True)
params_df.to_csv(params_outfile_csv, index=False)
if write_location == 'S3':
print('uploading to s3')
s3_client.upload_file(outfile, bucket, '01_fetch/out/'+os.path.basename(outfile))
s3_client.upload_file(params_outfile_csv, bucket, '01_fetch/out/metadata/'+os.path.basename(params_outfile_csv))
return params_df

def fetch_data(site_num, start_dt, end_dt, outfile, bucket, write_location, s3_client):
def fetch_data(site_num, start_dt, end_dt, outfile, s3_bucket, write_location, s3_client):
'''fetch USGS NWIS data for locations in site_list (gets all parameters available)'''
data_url = f'https://waterservices.usgs.gov/nwis/iv?format=rdb&sites={site_num}&startDT={start_dt}&endDT={end_dt}'
print(f'fetcing data for site {site_num} and saving locally')
urllib.request.urlretrieve(data_url, outfile)
if write_location == 'S3':
print('uploading to s3')
s3_client.upload_file(outfile, bucket, '01_fetch/out/'+os.path.basename(outfile))
s3_client.upload_file(outfile, s3_bucket, '01_fetch/out/'+os.path.basename(outfile))

def main():
# import config
Expand All @@ -34,18 +84,25 @@ def main():
# usgs nwis sites we want to fetch data for
site_ids = config['site_ids']

# start and end dates for data fetch
start_dt = config['start_dt']
end_dt = config['end_dt']

# fetch raw data files
for site_num in site_ids:
site_info_outfile_txt = os.path.join('.', '01_fetch', 'out', 'metadata', f'usgs_nwis_site_info_{site_num}.txt')
fetch_site_info(site_num, site_info_outfile_txt)
site_info_outfile_csv = os.path.join('.', '01_fetch', 'out', 'metadata', f'usgs_nwis_site_info_{site_num}.csv')
start_dt = process_site_info_to_df(site_info_outfile_txt, site_info_outfile_csv, s3_bucket, write_location, s3_client)
end_dt = datetime.datetime.today().strftime("%Y-%m-%d")

# start and end dates for data fetch
data_outfile_txt = os.path.join('.', '01_fetch', 'out', f'usgs_nwis_{site_num}.txt')
fetch_data(site_num, start_dt, end_dt, data_outfile_txt, s3_bucket, write_location, s3_client)

# fetch parameter file
params_outfile_txt = os.path.join('.', '01_fetch', 'out', 'usgs_nwis_params.txt')
fetch_params(params_outfile_txt, s3_bucket, write_location, s3_client)
params_outfile_txt = os.path.join('.', '01_fetch', 'out', 'metadata', 'usgs_nwis_params.txt')
fetch_params(params_outfile_txt)
# process raw parameter data into csv
params_outfile_csv = os.path.join('.', '01_fetch', 'out', 'metadata', 'usgs_nwis_params.csv')
params_df = process_params_to_csv(params_outfile_txt, params_outfile_csv, write_location, s3_bucket, s3_client)


if __name__ == '__main__':
main()
25 changes: 5 additions & 20 deletions 02_munge/src/munge_usgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,6 @@
import yaml
import utils

def process_params_to_csv(raw_params_txt, params_outfile_csv, write_location, bucket, s3_client):
'''process raw parameter text file into a csv file'''
print('reading raw parameter data from s3')
obj = s3_client.get_object(Bucket=bucket, Key=raw_params_txt)
params_df = pd.read_csv(obj.get("Body"), comment='#', sep='\t', lineterminator='\n')
print('processing parameter file and saving locally')
params_df.drop(index=0, inplace=True)
params_df.to_csv(params_outfile_csv)
if write_location == 'S3':
print('uploading to s3')
s3_client.upload_file(params_outfile_csv, bucket, '02_munge/out/'+os.path.basename(params_outfile_csv))
return params_df

def process_to_timestep(df, cols, agg_level, prop_obs_required):
# aggregate data to specified timestep
if agg_level == 'daily':
Expand All @@ -43,7 +30,7 @@ def param_code_to_name(df, params_df):
df.rename(columns={col: name}, inplace=True)
return df

def process_data_to_csv(raw_datafile, params_to_process, params_df, flags_to_drop, agg_level, prop_obs_required, write_location, bucket, s3_client):
def process_data_to_csv(raw_datafile, params_to_process, params_df, flags_to_drop, agg_level, prop_obs_required, write_location, s3_bucket, s3_client):
'''
process raw data text files into clean csvs, including:
dropping unwanted flags
Expand All @@ -52,7 +39,7 @@ def process_data_to_csv(raw_datafile, params_to_process, params_df, flags_to_dro
removing metadata columns so that only datetime and data columns remain
'''
print(f'reading data from s3: {raw_datafile}')
obj = s3_client.get_object(Bucket=bucket, Key=raw_datafile)
obj = s3_client.get_object(Bucket=s3_bucket, Key=raw_datafile)
# read in raw data as pandas df
df = pd.read_csv(obj.get("Body"), comment='#', sep='\t', lineterminator='\n', low_memory=False)

Expand Down Expand Up @@ -93,7 +80,7 @@ def process_data_to_csv(raw_datafile, params_to_process, params_df, flags_to_dro

if write_location == 'S3':
print('uploading to s3')
s3_client.upload_file(data_outfile_csv, bucket, '02_munge/out/'+os.path.basename(data_outfile_csv))
s3_client.upload_file(data_outfile_csv, s3_bucket, '02_munge/out/'+os.path.basename(data_outfile_csv))

def main():
# import config
Expand All @@ -105,10 +92,8 @@ def main():
s3_client = utils.prep_write_location(write_location, config['aws_profile'])
s3_bucket = config['s3_bucket']

# process raw parameter data into csv
raw_params_txt = '01_fetch/out/usgs_nwis_params.txt'
params_outfile_csv = os.path.join('.', '02_munge', 'out', 'usgs_nwis_params.csv')
params_df = process_params_to_csv(raw_params_txt, params_outfile_csv, write_location, s3_bucket, s3_client)
# read parameter data into df
params_df = pd.read_csv(os.path.join('.', '01_fetch', 'out', 'metadata', 'usgs_nwis_params.csv'), dtype={"parm_cd":"string"})

# get list of raw data files to process
raw_datafiles = [obj['Key'] for obj in s3_client.list_objects_v2(Bucket=s3_bucket, Prefix='01_fetch/out/usgs_nwis_0')['Contents']]
Expand Down

0 comments on commit e9e5055

Please sign in to comment.