diff --git a/01_fetch/fetch_config.yaml b/01_fetch/fetch_config.yaml index 2e457f9..3d231a5 100644 --- a/01_fetch/fetch_config.yaml +++ b/01_fetch/fetch_config.yaml @@ -20,11 +20,6 @@ fetch_usgs.py: - '01482695' # C AND D CANAL NR DELAWARE CITY, DE - '01482800' # Delaware River at Reedy Island Jetty, DE - # start and end dates for data fetch - # should be in the format 'YYYY-MM-DD' - start_dt: '2019-01-01' - end_dt: '2019-12-31' - fetch_noaa_nos.py: #choose where you want to write your data outputs: local or S3 write_location: 'local' diff --git a/01_fetch/src/fetch_usgs.py b/01_fetch/src/fetch_usgs.py index ed7f2ae..841de4e 100644 --- a/01_fetch/src/fetch_usgs.py +++ b/01_fetch/src/fetch_usgs.py @@ -1,25 +1,75 @@ import os import urllib import yaml +import datetime +import pandas as pd import utils -def fetch_params(outfile, bucket, write_location, s3_client): +def fetch_site_info(site_num, outfile): + '''fetch USGS NWIS data for locations in site_list (gets all parameters available)''' + site_url = f'https://waterservices.usgs.gov/nwis/site/?format=rdb&sites={site_num}&seriesCatalogOutput=true' + print(f'fetcing site info for site {site_num} and saving locally') + urllib.request.urlretrieve(site_url, outfile) + +def process_site_info_to_df(raw_site_info_txt, site_info_outfile_csv, s3_bucket, write_location, s3_client): + ''' + process raw site info text file into a csv file, + return minimum date of measured data (for any parameter) as start date for site + ''' + print(f'processing raw site info to get start date for site') + site_info_df = pd.read_csv(raw_site_info_txt, comment='#', sep='\t', lineterminator='\n') + site_info_df.drop(index=0, inplace=True) + + # get subset of unit values, other codes listed below: + # “dv” (daily values) + # “uv” or “iv” (unit values) + # “qw” (water-quality) + # “sv” (sites visits) + # “pk” (peak measurements) + # “gw” (groundwater levels) + # “ad” (sites included in USGS Annual Water Data Reports External Link) + # “aw” (sites monitored by the USGS Active Groundwater Level Network External Link) + # “id” (historical instantaneous values) + site_info_df_subset = site_info_df.loc[(site_info_df['data_type_cd']=='uv') | (site_info_df['data_type_cd']=='iv'), :] + # subset to only columns of interest + site_info_df_subset = site_info_df_subset[['parm_cd', 'begin_date', 'end_date', 'count_nu']] + # save this table of info + site_info_df_subset.to_csv(site_info_outfile_csv, index=False) + if write_location == 'S3': + print('uploading to s3') + s3_client.upload_file(site_info_outfile_csv, s3_bucket, '01_fetch/out/metadata/'+os.path.basename(site_info_outfile_csv)) + + # get minimum date to pull for this site + start_dt = site_info_df_subset[['begin_date']].min().values[0] + + return start_dt + +def fetch_params(outfile): '''get table of all possible USGS site parameters''' params_url = 'https://help.waterdata.usgs.gov/code/parameter_cd_query?fmt=rdb&group_cd=%' print('fetcing parameter file and saving locally') urllib.request.urlretrieve(params_url, outfile) + +def process_params_to_csv(raw_params_txt, params_outfile_csv, write_location, bucket, s3_client): + '''process raw parameter text file into a csv file''' + print('reading raw parameter data from local') + params_df = pd.read_csv(raw_params_txt, comment='#', sep='\t', lineterminator='\n') + print('processing parameter file and saving locally') + params_df.drop(index=0, inplace=True) + params_df.to_csv(params_outfile_csv, index=False) if write_location == 'S3': print('uploading to s3') - s3_client.upload_file(outfile, bucket, '01_fetch/out/'+os.path.basename(outfile)) + s3_client.upload_file(params_outfile_csv, bucket, '01_fetch/out/metadata/'+os.path.basename(params_outfile_csv)) + return params_df -def fetch_data(site_num, start_dt, end_dt, outfile, bucket, write_location, s3_client): +def fetch_data(site_num, start_dt, end_dt, outfile, s3_bucket, write_location, s3_client): '''fetch USGS NWIS data for locations in site_list (gets all parameters available)''' data_url = f'https://waterservices.usgs.gov/nwis/iv?format=rdb&sites={site_num}&startDT={start_dt}&endDT={end_dt}' print(f'fetcing data for site {site_num} and saving locally') urllib.request.urlretrieve(data_url, outfile) if write_location == 'S3': print('uploading to s3') - s3_client.upload_file(outfile, bucket, '01_fetch/out/'+os.path.basename(outfile)) + s3_client.upload_file(outfile, s3_bucket, '01_fetch/out/'+os.path.basename(outfile)) def main(): # import config @@ -34,18 +84,25 @@ def main(): # usgs nwis sites we want to fetch data for site_ids = config['site_ids'] - # start and end dates for data fetch - start_dt = config['start_dt'] - end_dt = config['end_dt'] - # fetch raw data files for site_num in site_ids: + site_info_outfile_txt = os.path.join('.', '01_fetch', 'out', 'metadata', f'usgs_nwis_site_info_{site_num}.txt') + fetch_site_info(site_num, site_info_outfile_txt) + site_info_outfile_csv = os.path.join('.', '01_fetch', 'out', 'metadata', f'usgs_nwis_site_info_{site_num}.csv') + start_dt = process_site_info_to_df(site_info_outfile_txt, site_info_outfile_csv, s3_bucket, write_location, s3_client) + end_dt = datetime.datetime.today().strftime("%Y-%m-%d") + + # start and end dates for data fetch data_outfile_txt = os.path.join('.', '01_fetch', 'out', f'usgs_nwis_{site_num}.txt') fetch_data(site_num, start_dt, end_dt, data_outfile_txt, s3_bucket, write_location, s3_client) # fetch parameter file - params_outfile_txt = os.path.join('.', '01_fetch', 'out', 'usgs_nwis_params.txt') - fetch_params(params_outfile_txt, s3_bucket, write_location, s3_client) + params_outfile_txt = os.path.join('.', '01_fetch', 'out', 'metadata', 'usgs_nwis_params.txt') + fetch_params(params_outfile_txt) + # process raw parameter data into csv + params_outfile_csv = os.path.join('.', '01_fetch', 'out', 'metadata', 'usgs_nwis_params.csv') + params_df = process_params_to_csv(params_outfile_txt, params_outfile_csv, write_location, s3_bucket, s3_client) + if __name__ == '__main__': main() \ No newline at end of file diff --git a/02_munge/src/munge_usgs.py b/02_munge/src/munge_usgs.py index ef43d46..a1eed97 100644 --- a/02_munge/src/munge_usgs.py +++ b/02_munge/src/munge_usgs.py @@ -5,19 +5,6 @@ import yaml import utils -def process_params_to_csv(raw_params_txt, params_outfile_csv, write_location, bucket, s3_client): - '''process raw parameter text file into a csv file''' - print('reading raw parameter data from s3') - obj = s3_client.get_object(Bucket=bucket, Key=raw_params_txt) - params_df = pd.read_csv(obj.get("Body"), comment='#', sep='\t', lineterminator='\n') - print('processing parameter file and saving locally') - params_df.drop(index=0, inplace=True) - params_df.to_csv(params_outfile_csv) - if write_location == 'S3': - print('uploading to s3') - s3_client.upload_file(params_outfile_csv, bucket, '02_munge/out/'+os.path.basename(params_outfile_csv)) - return params_df - def process_to_timestep(df, cols, agg_level, prop_obs_required): # aggregate data to specified timestep if agg_level == 'daily': @@ -43,7 +30,7 @@ def param_code_to_name(df, params_df): df.rename(columns={col: name}, inplace=True) return df -def process_data_to_csv(raw_datafile, params_to_process, params_df, flags_to_drop, agg_level, prop_obs_required, write_location, bucket, s3_client): +def process_data_to_csv(raw_datafile, params_to_process, params_df, flags_to_drop, agg_level, prop_obs_required, write_location, s3_bucket, s3_client): ''' process raw data text files into clean csvs, including: dropping unwanted flags @@ -52,7 +39,7 @@ def process_data_to_csv(raw_datafile, params_to_process, params_df, flags_to_dro removing metadata columns so that only datetime and data columns remain ''' print(f'reading data from s3: {raw_datafile}') - obj = s3_client.get_object(Bucket=bucket, Key=raw_datafile) + obj = s3_client.get_object(Bucket=s3_bucket, Key=raw_datafile) # read in raw data as pandas df df = pd.read_csv(obj.get("Body"), comment='#', sep='\t', lineterminator='\n', low_memory=False) @@ -93,7 +80,7 @@ def process_data_to_csv(raw_datafile, params_to_process, params_df, flags_to_dro if write_location == 'S3': print('uploading to s3') - s3_client.upload_file(data_outfile_csv, bucket, '02_munge/out/'+os.path.basename(data_outfile_csv)) + s3_client.upload_file(data_outfile_csv, s3_bucket, '02_munge/out/'+os.path.basename(data_outfile_csv)) def main(): # import config @@ -105,10 +92,8 @@ def main(): s3_client = utils.prep_write_location(write_location, config['aws_profile']) s3_bucket = config['s3_bucket'] - # process raw parameter data into csv - raw_params_txt = '01_fetch/out/usgs_nwis_params.txt' - params_outfile_csv = os.path.join('.', '02_munge', 'out', 'usgs_nwis_params.csv') - params_df = process_params_to_csv(raw_params_txt, params_outfile_csv, write_location, s3_bucket, s3_client) + # read parameter data into df + params_df = pd.read_csv(os.path.join('.', '01_fetch', 'out', 'metadata', 'usgs_nwis_params.csv'), dtype={"parm_cd":"string"}) # get list of raw data files to process raw_datafiles = [obj['Key'] for obj in s3_client.list_objects_v2(Bucket=s3_bucket, Prefix='01_fetch/out/usgs_nwis_0')['Contents']]