From f5960b804b3e9ce16bdaba3d2068fd6d453710d6 Mon Sep 17 00:00:00 2001 From: julietcohen Date: Wed, 13 Mar 2024 15:54:11 -0500 Subject: [PATCH] Updated scripts for processing infrastructure dataset, generalized workflow script name and contents to import appropriate config depending on dataset, generalized name of config object, removed unnecessary comments and old code, updated python environment in slurm script to reflect recent updates to vis-raster package. --- infrastructure_config.py | 75 +++++++++ rsync_staging_to_scratch.py | 20 +-- rsync_step2_raster_highest_to_scratch.py | 42 ++--- slurm/BEST_cpu_ray_double_srun.slurm | 7 +- ..._change_viz_workflow.py => viz_workflow.py | 150 ++++++------------ 5 files changed, 147 insertions(+), 147 deletions(-) create mode 100644 infrastructure_config.py rename lake_change_viz_workflow.py => viz_workflow.py (83%) diff --git a/infrastructure_config.py b/infrastructure_config.py new file mode 100644 index 0000000..0dbf3f2 --- /dev/null +++ b/infrastructure_config.py @@ -0,0 +1,75 @@ +from datetime import datetime +import subprocess +import numpy as np + +# always include the tailing slash "/" +# define user on Delta, avoid writing files to other user's dir +user = subprocess.check_output("whoami").strip().decode("ascii") +head_node = 'cn102/' +#head_node = 'gpub___' + +INPUT = '/scratch/bbou/julietcohen/infrastructure/input/' +output_subdir = 'infrastructure/output' +OUTPUT = f'/scratch/bbou/{user}/{output_subdir}/' + +STAGING_LOCAL = '/tmp/staged/' +STAGING_REMOTE = OUTPUT + 'staged/' +STAGING_REMOTE_MERGED = STAGING_REMOTE + head_node + +GEOTIFF_LOCAL = '/tmp/geotiff/' +GEOTIFF_REMOTE = OUTPUT + 'geotiff/' + +WEBTILE_REMOTE = OUTPUT + 'web_tiles/' + +""" final config is exported here, and imported in the workflow python file. """ +CONFIG = { + "deduplicate_clip_to_footprint": False, + "deduplicate_method": None, + "deduplicate_at": None, + "deduplicate_keep_rules": None, + "dir_output": OUTPUT, + "dir_input": INPUT, + "ext_input": ".gpkg", + "dir_geotiff_remote": GEOTIFF_REMOTE, + "dir_geotiff_local": GEOTIFF_LOCAL, + "dir_web_tiles": WEBTILE_REMOTE, + "dir_staged_remote": STAGING_REMOTE, + "dir_staged_remote_merged": STAGING_REMOTE_MERGED, + "dir_staged_local": STAGING_LOCAL, + "filename_staging_summary": STAGING_REMOTE + "staging_summary.csv", + "filename_rasterization_events": GEOTIFF_REMOTE + "raster_events.csv", + "filename_rasters_summary": GEOTIFF_REMOTE + "raster_summary.csv", + "version": datetime.now().strftime("%B%d,%Y"), + "simplify_tolerance": 0.1, + "tms_id": "WGS1984Quad", + "z_range": [ + 0, + 12 + ], + "geometricError": 57, + "z_coord": 0, + "statistics": [ + { + "name": "infrastructure_code", + "weight_by": "area", + "property": "DN", + "aggregation_method": "max", + "resampling_method": "nearest", + "val_range": [ + 11, + 50 + ], + "palette": [ + "#f48525", + "#f4e625", + "#47f425", + "#25f4e2", + "#2525f4", + "#f425c3", + "#f42525" + ], + "nodata_val": 0, + "nodata_color": "#ffffff00" + } + ] +} diff --git a/rsync_staging_to_scratch.py b/rsync_staging_to_scratch.py index 6684166..484c33c 100644 --- a/rsync_staging_to_scratch.py +++ b/rsync_staging_to_scratch.py @@ -14,25 +14,25 @@ # IWP_CONFIG = PRODUCTION_IWP_CONFIG.IWP_CONFIG # IWP_CONFIG2 = IWP_CONFIG.copy() -# for processing lake data: -import lake_change_config -IWP_CONFIG = lake_change_config.IWP_CONFIG -IWP_CONFIG2 = IWP_CONFIG.copy() - # for testing branches with IWP data: # import branch_testing_iwp_config # IWP_CONFIG = branch_testing_iwp_config.IWP_CONFIG # IWP_CONFIG2 = IWP_CONFIG.copy() + +# for infrastructure data: +import infrastructure_config +CONFIG = infrastructure_config.CONFIG +CONFIG2 = CONFIG.copy() # ----------------------------------------------------- # set config properties for current context -IWP_CONFIG2['dir_staged'] = IWP_CONFIG2['dir_staged_local'] -SOURCE = IWP_CONFIG2['dir_staged'] -IWP_CONFIG2['dir_staged'] = IWP_CONFIG2['dir_staged_remote'] -DESTINATION = IWP_CONFIG2['dir_staged'] +CONFIG2['dir_staged'] = CONFIG2['dir_staged_local'] +SOURCE = CONFIG2['dir_staged'] +CONFIG2['dir_staged'] = CONFIG2['dir_staged_remote'] +DESTINATION = CONFIG2['dir_staged'] print("Using config: ") -pprint.pprint(IWP_CONFIG2) +pprint.pprint(CONFIG2) # define user on Delta, avoid writing files to other user's dir user = subprocess.check_output("whoami").strip().decode("ascii") diff --git a/rsync_step2_raster_highest_to_scratch.py b/rsync_step2_raster_highest_to_scratch.py index d19cede..91f8877 100644 --- a/rsync_step2_raster_highest_to_scratch.py +++ b/rsync_step2_raster_highest_to_scratch.py @@ -10,20 +10,20 @@ #import PRODUCTION_IWP_CONFIG #IWP_CONFIG = PRODUCTION_IWP_CONFIG.IWP_CONFIG -# for processing lake data: -import lake_change_config -IWP_CONFIG = lake_change_config.IWP_CONFIG - # for testing branches with IWP data: # import branch_testing_iwp_config # IWP_CONFIG = branch_testing_iwp_config.IWP_CONFIG + +# for infrastructure data: +import infrastructure_config +CONFIG = infrastructure_config.CONFIG # ----------------------------------------------------- # set config properties for current context -IWP_CONFIG['dir_geotiff'] = IWP_CONFIG['dir_geotiff_local'] -SOURCE = IWP_CONFIG['dir_geotiff'] -IWP_CONFIG['dir_geotiff'] = IWP_CONFIG['dir_geotiff_remote'] -DESTINATION = IWP_CONFIG['dir_geotiff'] +CONFIG['dir_geotiff'] = CONFIG['dir_geotiff_local'] +SOURCE = CONFIG['dir_geotiff'] +CONFIG['dir_geotiff'] = CONFIG['dir_geotiff_remote'] +DESTINATION = CONFIG['dir_geotiff'] # define user on Delta, avoid writing files to other user's dir user = subprocess.check_output("whoami").strip().decode("ascii") @@ -57,28 +57,4 @@ process = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) print("All jobs launched! They will work in the background WITHOUT stdout printing. ") - -# OLD CODE THAT IS WRONG BECAUSE WE DO NOT WANT HOSTNAMES TO BE SUBDIRS OF THE GEOTIFF DIR IN SCRATCH BC -# ALL RASTER HIGHEST NEED TO BE IN THE SAME SUBDIR TO CORRECTLY EXECUTE THE RASTER LOWER STEP -# count = 0 -# for hostname in hostnames: -# # to use ssh in rsync (over a remote sheel) use the following: `rsync -rv --rsh=ssh hostname::module /dest`` -# # see https://manpages.ubuntu.com/manpages/focal/en/man1/rsync.1.html (USING RSYNC-DAEMON FEATURES VIA A REMOTE-SHELL CONNECTION) - -# # mkdir then sync -# mkdir = ['mkdir', '-p', f'{DESTINATION}{hostname}'] -# process = Popen(mkdir, stdin=PIPE, stdout=PIPE, stderr=PIPE) -# time.sleep(0.2) - -# ssh = ['ssh', f'{hostname}',] -# rsync = ['rsync', '-r', '--update', SOURCE, f'{DESTINATION}{hostname}'] -# cmd = ssh + rsync -# print(f"'{count} of {len(hostnames)}'. running command: {cmd}") -# count += 1 - -# process = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) - -# print("All jobs launched! They will work in the background WITHOUT stdout printing. ") - -# otpional improvement -# shlex.split(s) -- turn cmd line args into a list. \ No newline at end of file + \ No newline at end of file diff --git a/slurm/BEST_cpu_ray_double_srun.slurm b/slurm/BEST_cpu_ray_double_srun.slurm index 27fac58..034db86 100644 --- a/slurm/BEST_cpu_ray_double_srun.slurm +++ b/slurm/BEST_cpu_ray_double_srun.slurm @@ -3,11 +3,11 @@ #SBATCH --job-name=pdg_viz #SBATCH --partition=cpu #SBATCH --account= -#SBATCH --time=24:00:00 +#SBATCH --time=48:00:00 #SBATCH --export=ALL,RAY_worker_register_timeout_seconds=120 -#SBATCH --nodes=20 +#SBATCH --nodes=1 #SBATCH --mem=0 #SBATCH --exclusive @@ -39,7 +39,8 @@ set -x echo "This is BEST_cpu_ray_double_srun.slurm" # venv init -source /scratch/bbou/julietcohen/venv/iwp_3/bin/activate +source /scratch/bbou/julietcohen/venv/infrastructure/bin/activate +# source /scratch/bbou/julietcohen/venv/iwp_3/bin/activate # set file soft limit to maximum value (not unlimited because that's not permitted) # before any srun's are executed ulimit -n 32768 diff --git a/lake_change_viz_workflow.py b/viz_workflow.py similarity index 83% rename from lake_change_viz_workflow.py rename to viz_workflow.py index 041c9d5..e4f3e96 100644 --- a/lake_change_viz_workflow.py +++ b/viz_workflow.py @@ -2,7 +2,6 @@ import json import logging -#import logging.config import logging.handlers import os @@ -28,8 +27,8 @@ # define user on Delta, avoid writing files to other user's dir user = subprocess.check_output("whoami").strip().decode("ascii") -import lake_change_config -IWP_CONFIG = lake_change_config.IWP_CONFIG +import infrastructure_config +CONFIG = infrastructure_config.CONFIG # configure logger logger = logging.getLogger("logger") @@ -45,7 +44,6 @@ logger.addHandler(handler) logger.setLevel(logging.INFO) -print(logger.handlers) def main(): result = subprocess.run(["hostname", "-i"], capture_output=True, text=True) @@ -59,8 +57,6 @@ def main(): print("🎯 Ray initialized.") print_cluster_stats() - # create file workflow_log.txt in output dir - # start_logging() # removed this because trying to get logger to work with special config to log.log rather than Kastan's file start = time.time() try: @@ -77,8 +73,6 @@ def main(): # todo: immediately after initiating above step, start rsync script to continuously sync geotiff files, # or immediately after the above step is done, rsync all files at once if there is time left in job # step3_raster_lower(batch_size_geotiffs=100) # rasterize all LOWER Z levels - # todo: immediately after initiating above step, start rsync script to continuously sync geotiff files, - # or immediately after the above step is done, rsync all files at once if there is time left in job step4_webtiles(batch_size_web_tiles=250) # convert to web tiles. except Exception as e: @@ -91,7 +85,6 @@ def main(): ############### 👇 MAIN STEPS FUNCTIONS 👇 ############### -# @workflow.step(name="Step0_Stage_All") def step0_staging(): FAILURES = [] FAILURE_PATHS = [] @@ -103,15 +96,14 @@ def step0_staging(): logger.info("step0_staging() has initiated.") # update the config for the current context: write staged files to local /tmp dir - iwp_config = deepcopy(IWP_CONFIG) - iwp_config['dir_staged'] = iwp_config['dir_staged_local'] + config = deepcopy(CONFIG) + config['dir_staged'] = config['dir_staged_local'] # make directory /tmp/staged on each node - # not really necessary cause Robyn's functions are set up to do this + # not really necessary cause functions are set up to do this # and /tmp allows dirs to be created to write files - os.makedirs(iwp_config['dir_staged'], exist_ok = True) + os.makedirs(config['dir_staged'], exist_ok = True) - # OLD METHOD "glob" all files. - stager = pdgstaging.TileStager(iwp_config, check_footprints=False) + stager = pdgstaging.TileStager(config, check_footprints=False) staging_input_files_list = stager.tiles.get_filenames_from_dir('input') @@ -119,7 +111,7 @@ def step0_staging(): # with open(os.path.join(iwp_config['dir_output'], "workflow_log.txt"), "a+") as file: # file.write(f"Number of filepaths in staging_input_files_list: {len(staging_input_files_list)}\n\n") # removed this because trying to get logger to work with special config to log.log rather than Kastan's file - with open(os.path.join(iwp_config['dir_output'], "staging_input_files_list.json") , "w") as f: + with open(os.path.join(config['dir_output'], "staging_input_files_list.json") , "w") as f: json.dump(staging_input_files_list, f) """ @@ -145,7 +137,7 @@ def step0_staging(): for itr, filepath in enumerate(staging_input_files_list): # if itr <= 6075: # create list of remote function ids (i.e. futures) - app_futures.append(stage_remote.remote(filepath, iwp_config)) + app_futures.append(stage_remote.remote(filepath, config)) # record how many app_futures were created to determine if it is the full number of input paths # with open(os.path.join(iwp_config['dir_output'], "workflow_log.txt"), "a+") as file: @@ -220,13 +212,13 @@ def prep_only_high_ice_input_list(): try: staging_input_files_list_raw = json.load(open('./iwp-file-list.json')) except FileNotFoundError as e: - print("❌❌❌ Hey you, please specify a 👉 json file containing a list of input file paths 👈 (relative to `BASE_DIR_OF_INPUT`).", e) + print("❌❌❌ Specify a json file containing a list of input file paths.", e) #if ONLY_SMALL_TEST_RUN: # for testing only # staging_input_files_list_raw = staging_input_files_list_raw[:TEST_RUN_SIZE] # make paths absolute (not relative) - staging_input_files_list = prepend(staging_input_files_list_raw, IWP_CONFIG['dir_input']) + staging_input_files_list = prepend(staging_input_files_list_raw, CONFIG['dir_input']) # ONLY use high_ice for a test run. print("⚠️ WARNING: ONLY USING HIGH_ICE FOR A TEST RUN!!!!! ⚠️") @@ -330,25 +322,21 @@ def step2_raster_highest(batch_size=100): This is a BLOCKING step (but it can run in parallel). Rasterize all staged tiles (only highest z-level). """ - # import gc - # from random import randrange - # from pympler import muppy, summary, tracker #os.makedirs(iwp_config['dir_geotiff'], exist_ok=True) - iwp_config = deepcopy(IWP_CONFIG) + config = deepcopy(CONFIG) - print(f"Using config {iwp_config}") + print(f"Using config {config}") # update the config for the current context: write geotiff files to local /tmp dir - iwp_config['dir_geotiff'] = iwp_config['dir_geotiff_local'] + config['dir_geotiff'] = config['dir_geotiff_local'] - print(f"2️⃣ Step 2 Rasterize only highest Z, saving to {iwp_config['dir_geotiff']}") + print(f"2️⃣ Step 2 Rasterize only highest Z, saving to {config['dir_geotiff']}") - iwp_config['dir_staged'] = iwp_config['dir_staged_remote_merged'] - stager = pdgstaging.TileStager(iwp_config, check_footprints=False) - stager.tiles.add_base_dir('output_of_staging', iwp_config['dir_staged'], '.gpkg') - # i dont think this next line is necessary cause dedup occurs based on the flag that is inserted during staging: + config['dir_staged'] = config['dir_staged_remote_merged'] + stager = pdgstaging.TileStager(config, check_footprints=False) + stager.tiles.add_base_dir('output_of_staging', config['dir_staged'], '.gpkg') #rasterizer = pdgraster.RasterTiler(iwp_config) # make directories in /tmp so geotiffs can populate there @@ -364,17 +352,15 @@ def step2_raster_highest(batch_size=100): # first define the dir that contains the staged files into a base dir to pull all filepaths correctly #stager.tiles.add_base_dir('output_of_staging', iwp_config['dir_staged'], '.gpkg') - print(f"Collecting all STAGED files from `{iwp_config['dir_staged']}`...") + print(f"Collecting all STAGED files from `{config['dir_staged']}`...") # Get paths to all the newly staged tiles staged_paths = stager.tiles.get_filenames_from_dir(base_dir = 'output_of_staging') - - # stager.kas_check_footprints(staged_paths) #if ONLY_SMALL_TEST_RUN: # staged_paths = staged_paths[:TEST_RUN_SIZE] # save a copy of the files we're rasterizing. - staged_path_json_filepath = os.path.join(iwp_config['dir_output'], "staged_paths_to_rasterize_highest.json") + staged_path_json_filepath = os.path.join(config['dir_output'], "staged_paths_to_rasterize_highest.json") print(f"Writing a copy of the files we're rasterizing to {staged_path_json_filepath}...") with open(staged_path_json_filepath, "w") as f: json.dump(staged_paths, f, indent=2) @@ -382,7 +368,7 @@ def step2_raster_highest(batch_size=100): print(f"Step 2️⃣ -- Making batches of staged files... batch_size: {batch_size}") staged_batches = make_batch(staged_paths, batch_size) - print(f"The input to this step, Rasterization, is the output of Staging.\n Using Staging path: {iwp_config['dir_staged']}") + print(f"The input to this step, Rasterization, is the output of Staging.\n Using Staging path: {config['dir_staged']}") print(f"🌄 Rasterize total files {len(staged_paths)} gpkgs, using batch size: {batch_size}") print(f"🏎 Parallel batches of jobs: {len(staged_batches)}...\n") @@ -409,13 +395,10 @@ def step2_raster_highest(batch_size=100): app_futures = [] for i, batch in enumerate(staged_batches): - # inserted to try to get geotiffs to stop trying to write to scratch: - #iwp_config['dir_footprints'] = iwp_config['dir_footprints_local'] #iwp_config['dir_geotiff'] = iwp_config['dir_geotiff_local'] - #stager = pdgstaging.TileStager(iwp_config, check_footprints=False) # maybe remove this, added when troubleshooting #rasterizer = pdgraster.RasterTiler(iwp_config) - os.makedirs(iwp_config['dir_geotiff'], exist_ok=True) - app_future = rasterize.remote(batch, iwp_config) + os.makedirs(config['dir_geotiff'], exist_ok=True) + app_future = rasterize.remote(batch, config) app_futures.append(app_future) for i in range(0, len(app_futures)): @@ -465,32 +448,29 @@ def step2_raster_highest(batch_size=100): # @workflow.step(name="Step3_Rasterize_lower_z_levels") def step3_raster_lower(batch_size_geotiffs=20): ''' - STEP 3: Create parent geotiffs for all z-levels (except highest) - THIS IS HARD TO PARALLELIZE multiple zoom levels at once..... sad, BUT - 👉 WE DO PARALLELIZE BATCHES WITHIN one zoom level. + STEP 3: Create parent geotiffs for all z-levels except highest + (parallelize batches within one zoom level) ''' print("3️⃣ Step 3: Create parent geotiffs for all lower z-levels (everything except highest zoom)") - iwp_config = deepcopy(IWP_CONFIG) + config = deepcopy(CONFIG) - iwp_config['dir_geotiff'] = iwp_config['dir_geotiff_remote'] + config['dir_geotiff'] = config['dir_geotiff_remote'] # update the config for the current context: pull stager that represents staged files in /scratch - # next line is likely not necessary but can't hurt - iwp_config['dir_staged'] = iwp_config['dir_staged_remote_merged'] - stager = pdgstaging.TileStager(iwp_config, check_footprints=False) + config['dir_staged'] = config['dir_staged_remote_merged'] + stager = pdgstaging.TileStager(config, check_footprints=False) # find all Z levels min_z = stager.config.get_min_z() max_z = stager.config.get_max_z() parent_zs = range(max_z - 1, min_z - 1, -1) - # next line is likely not necessary but can't hurt (we already defined this a few lines above) - iwp_config['dir_geotiff'] = iwp_config['dir_geotiff_remote'] - rasterizer = pdgraster.RasterTiler(iwp_config) + config['dir_geotiff'] = config['dir_geotiff_remote'] + rasterizer = pdgraster.RasterTiler(config) - print(f"Collecting all Geotiffs (.tif) in: {iwp_config['dir_geotiff']}") - # removing this next line bc robyn removed it from kastan's script: - stager.tiles.add_base_dir('geotiff_remote', iwp_config['dir_geotiff'], '.tif') # had to change this from geotiff to geotiff_remote bc got error that the geotiff base dir already existed + print(f"Collecting all Geotiffs (.tif) in: {config['dir_geotiff']}") + # had to change next line from 'geotiff' to 'geotiff_remote' bc got error that the geotiff base dir already existed + stager.tiles.add_base_dir('geotiff_remote', config['dir_geotiff'], '.tif') start = time.time() # Can't start lower z-level until higher z-level is complete. @@ -528,10 +508,10 @@ def step3_raster_lower(batch_size_geotiffs=20): # I dont think theres a need to set rasterizer with new config after chaning this property cause # that is done within the function create_composite_geotiffs() but troubleshooting # so lets do it anyway - rasterizer = pdgraster.RasterTiler(iwp_config) + rasterizer = pdgraster.RasterTiler(config) # MANDATORY: include placement_group for better stability on 200+ cpus. - app_future = create_composite_geotiffs.remote(parent_tile_batch, iwp_config, logging_dict=None) + app_future = create_composite_geotiffs.remote(parent_tile_batch, config, logging_dict=None) app_futures.append(app_future) # Don't start the next z-level (or move to step 4) until the @@ -554,7 +534,6 @@ def step3_raster_lower(batch_size_geotiffs=20): print(f"⏰ Total time to create parent geotiffs: {(time.time() - start)/60:.2f} minutes\n") return "Done step 3." -# @workflow.step(name="Step4_create_webtiles") def step4_webtiles(batch_size_web_tiles=300): ''' STEP 4: Create web tiles from geotiffs @@ -562,49 +541,36 @@ def step4_webtiles(batch_size_web_tiles=300): ''' print("4️⃣ -- Creating web tiles from geotiffs...") - iwp_config = deepcopy(IWP_CONFIG) + config = deepcopy(CONFIG) # instantiate classes for their helper functions # not sure that the next line is necessary but might as well keep the config up to date # with where files currently are - iwp_config['dir_staged'] = iwp_config['dir_staged_remote_merged'] + config['dir_staged'] = config['dir_staged_remote_merged'] # pull all z-levels of rasters from /scratch for web tiling - iwp_config['dir_geotiff'] = iwp_config['dir_geotiff_remote'] + config['dir_geotiff'] = config['dir_geotiff_remote'] # define rasterizer here just to updates_ranges() - rasterizer = pdgraster.RasterTiler(iwp_config) + rasterizer = pdgraster.RasterTiler(config) # stager = pdgstaging.TileStager(iwp_config, check_footprints=False) remove this line, no point in defining the stager right before we update the config, do it after! start = time.time() # Update color ranges print(f"Updating ranges...") rasterizer.update_ranges() - iwp_config_new = rasterizer.config.config - print(f"Defined new config: {iwp_config_new}") - - # define the stager so we can pull filepaths from the geotiff base dir in a few lines - # stager = pdgstaging.TileStager(iwp_config_new, check_footprints=False) + config_new = rasterizer.config.config + print(f"Defined new config: {config_new}") # Note: we also define rasterizer later in each of the 3 functions: # raster highest, raster lower, and webtile functions - print(f"Collecting all Geotiffs (.tif) in: {iwp_config['dir_geotiff']}...") # the original config here is correct, it is just printing the filepath we are using for source of geotiff files - #print(f"Collecting all Geotiffs (.tif) in: {IWP_CONFIG_NEW['dir_geotiff']}...") - - #stager.tiles.add_base_dir('geotiff_path', IWP_CONFIG_NEW['dir_geotiff'], '.tif') # don't think we need this line because we are using the same geotiff base dir set in the raster lower step (remote geotiff dir with geotiffs of all z-levels) - #stager = pdgstaging.TileStager(IWP_CONFIG, check_footprints=False) - #stager.tiles.add_base_dir('geotiff_remote', IWP_CONFIG['dir_geotiff'], '.tif') - #geotiff_paths = stager.tiles.get_filenames_from_dir(base_dir = 'geotiff_path') + # the original config here is fine, just printing filepath for source of geotiff files + print(f"Collecting all Geotiffs (.tif) in: {config['dir_geotiff']}...") # set add base dir with rasterizer instead of stager #rasterizer.tiles.add_base_dir('geotiff_remote', IWP_CONFIG['dir_geotiff'], '.tif') #geotiff_paths = rasterizer.tiles.get_filenames_from_dir(base_dir = 'geotiff_remote') - # added next 2 lines 20230214: - rasterizer.tiles.add_base_dir('geotiff_remote_all_zs', iwp_config_new['dir_geotiff'], '.tif') # call it something different than geotiff_remote because we already made that base dir earlier and it might not overwrite and might error cause already exists - # and also remove line just below bc it was that line's replacement for time being: - # rasterizer.tiles.add_base_dir('geotiff_remote_all_zs', IWP_CONFIG['dir_geotiff'], '.tif') + rasterizer.tiles.add_base_dir('geotiff_remote_all_zs', config_new['dir_geotiff'], '.tif') # call it something different than geotiff_remote because we already made that base dir earlier and it might not overwrite and might error cause already exists geotiff_paths = rasterizer.tiles.get_filenames_from_dir(base_dir = 'geotiff_remote_all_zs') - # check if the rasterizer can add_base_dir?? robyn added that so must be correct - # change made 2023-02-14: change rasterizer to stager to pull filenames from stager base dir created in raster lower step #geotiff_paths = stager.tiles.get_filenames_from_dir(base_dir = 'geotiff_remote') #if ONLY_SMALL_TEST_RUN: @@ -628,8 +594,7 @@ def step4_webtiles(batch_size_web_tiles=300): # MANDATORY: include placement_group for better stability on 200+ cpus. # app_future = create_web_tiles.options(placement_group=pg).remote(batch, IWP_CONFIG) # app_future = create_web_tiles.remote(batch, IWP_CONFIG) # remove this line - app_future = create_web_tiles.remote(batch, iwp_config_new) - #app_future = create_web_tiles.remote(batch, IWP_CONFIG) # remove this line when line "reintroduce" lines are reintroduced # update 8/22: dont remember why I wrote that comment + app_future = create_web_tiles.remote(batch, config_new) app_futures.append(app_future) for i in range(0, len(app_futures)): @@ -663,10 +628,6 @@ def rasterize(staged_paths, config, logging_dict=None): # print(staged_paths) try: - #IWP_CONFIG['dir_geotiff'] = IWP_CONFIG['dir_geotiff_remote'] - #os.makedirs(IWP_CONFIG['dir_geotiff'], exist_ok = True) - #IWP_CONFIG['dir_footprints'] = IWP_CONFIG['dir_footprints_local'] - #IWP_CONFIG['dir_staged'] = IWP_CONFIG['dir_staged_remote_merged'] rasterizer = pdgraster.RasterTiler(config) # todo: fix warning `python3.9/site-packages/geopandas/base.py:31: UserWarning: The indices of the two GeoSeries are different.` # with suppress(UserWarning): @@ -687,7 +648,6 @@ def create_composite_geotiffs(tiles, config, logging_dict=None): import logging.config logging.config.dictConfig(logging_dict) try: - #iwp_config['dir_footprints'] = iwp_config['dir_footprints_local'] rasterizer = pdgraster.RasterTiler(config) rasterizer.parent_geotiffs_from_children(tiles, recursive=False) except Exception as e: @@ -704,7 +664,6 @@ def create_web_tiles(geotiff_paths, config, logging_dict=None): import logging.config logging.config.dictConfig(logging_dict) try: - #IWP_CONFIG['dir_geotiff'] = IWP_CONFIG['dir_geotiff_remote'] using new config here...not sure if it will have the property 'dir_geotiff_remote'??? If so, uncomment this line rasterizer = pdgraster.RasterTiler(config) rasterizer.webtiles_from_geotiffs(geotiff_paths, update_ranges=False) except Exception as e: @@ -725,9 +684,7 @@ def stage_remote(filepath, config, logging_dict = None): Parallelism at the per-shape-file level. """ # deepcopy makes a realy copy, not using references. Helpful for parallelism. - # config_path = deepcopy(IWP_CONFIG) - - iwp_config = deepcopy(IWP_CONFIG) + #config = deepcopy(CONFIG) try: stager = pdgstaging.TileStager(config=config, check_footprints=False) @@ -737,8 +694,6 @@ def stage_remote(filepath, config, logging_dict = None): # file.write(f"Successfully staged file:\n") # file.write(f"{filepath}\n\n") # removed this because trying to get logger to work with special config to log.log rather than Kastan's file - #logging.info(f"Juliet's logging: Successfully staged tile: {filepath}.") - if 'Skipping' in str(ret): print(f"⚠️ Skipping {filepath}") @@ -746,12 +701,8 @@ def stage_remote(filepath, config, logging_dict = None): # file.write(f"SKIPPING FILE:\n") # file.write(f"{filepath}\n\n") # removed this because trying to get logger to work with special config to log.log rather than Kastan's file - #logging.info(f"Juliet's logging: Skipping staging tile: {filepath}.") - except Exception as e: - #logging.info(f"Juliet's logging :Failed to stage tile: {filepath}.") - # with open(os.path.join(iwp_config['dir_output'], "workflow_log.txt"), "a+") as file: # file.write(f"FAILURE TO STAGE FILE:\n") # file.write(f"{filepath}\n") @@ -777,10 +728,7 @@ def make_workflow_id(name: str) -> str: return f"{name}-{str(curr_time.strftime('%h_%d,%Y@%H:%M'))}" def build_filepath(input_file): - # Demo input: /home/kastanday/output/staged/WorldCRS84Quad/13/1047/1047.gpkg - # Replace 'staged' -> '3d_tiles' in path - # In: /home/kastanday/output/staged/WorldCRS84Quad/13/1047/1047.gpkg # new_path: /home/kastanday/output/3d_tiles/WorldCRS84Quad/13/1047/1047.gpkg path = pathlib.Path(input_file) index = path.parts.index('staged') @@ -906,10 +854,10 @@ def start_logging(): ''' Writes file workflow_log.txt in output directory. ''' - filepath = pathlib.Path(IWP_CONFIG['dir_output'] + 'workflow_log.txt') + filepath = pathlib.Path(CONFIG['dir_output'] + 'workflow_log.txt') filepath.parent.mkdir(parents=True, exist_ok=True) filepath.touch(exist_ok=True) - logging.basicConfig(level=logging.INFO, filename= IWP_CONFIG['dir_output'] + 'workflow_log.txt', filemode='w', format='%(asctime)s - %(levelname)s - %(message)s') + logging.basicConfig(level=logging.INFO, filename= CONFIG['dir_output'] + 'workflow_log.txt', filemode='w', format='%(asctime)s - %(levelname)s - %(message)s') @ray.remote def rsync_raster_to_scatch(rsync_python_file='utilities/rsync_merge_raster_to_scratch.py'):