Skip to content

Commit

Permalink
Merge pull request #962 from nasa/961_disp_s1_historical_chunking_fix
Browse files Browse the repository at this point in the history
#961: Unbroke historical chunking logic for disp-s1. See associated b…
  • Loading branch information
hhlee445 authored Aug 22, 2024
2 parents 626b064 + 42b98d2 commit 6ca36e6
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 21 deletions.
33 changes: 22 additions & 11 deletions data_subscriber/asf_cslc_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,19 +288,30 @@ def get_downloads(self, args, es_conn):

all_downloads = []

# Download CSLC granules
downloads = es_conn.get_download_granule_revision(batch_ids[-1])
logger.info(f"Got {len(downloads)=} cslc granules downloads for batch_id={batch_ids[-1]}")
assert len(downloads) > 0, f"No downloads found for batch_id={batch_ids[-1]}!"
all_downloads.extend(downloads)

# Download K-CSLC granules
for batch_id in batch_ids[:-1]:
downloads = k_es_conn.get_download_granule_revision(batch_id)
logger.info(f"Got {len(downloads)=} k cslc downloads for {batch_id=}")
assert len(downloads) > 0, f"No downloads found for batch_id={batch_id}!"
# Historical mode stores all granules in normal cslc_catalog
if "proc_mode" in args and args.proc_mode == "historical":
logger.info("Downloading cslc files for historical mode")
for batch_id in batch_ids:
downloads = es_conn.get_download_granule_revision(batch_id)
logger.info(f"Got {len(downloads)=} cslc downloads for {batch_id=}")
assert len(downloads) > 0, f"No downloads found for batch_id={batch_id}!"
all_downloads.extend(downloads)

# Forward and reprocessing modes store all granules in k_cslc_catalog
else:
logger.info("Downloading cslc files for forward/reprocessing mode")
downloads = es_conn.get_download_granule_revision(batch_ids[-1])
logger.info(f"Got {len(downloads)=} cslc granules downloads for batch_id={batch_ids[-1]}")
assert len(downloads) > 0, f"No downloads found for batch_id={batch_ids[-1]}!"
all_downloads.extend(downloads)

# Download K-CSLC granules
for batch_id in batch_ids[:-1]:
downloads = k_es_conn.get_download_granule_revision(batch_id)
logger.info(f"Got {len(downloads)=} k cslc downloads for {batch_id=}")
assert len(downloads) > 0, f"No downloads found for batch_id={batch_id}!"
all_downloads.extend(downloads)

return all_downloads

def query_cslc_static_files_for_cslc_batch(self, cslc_files, args, token, job_id, settings):
Expand Down
28 changes: 18 additions & 10 deletions data_subscriber/cslc/cslc_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,6 @@ def query_cmr(self, args, token, cmr, settings, timerange, now):
def create_download_job_params(self, query_timerange, chunk_batch_ids):
'''Same as base class except inject batch_ids for k granules'''

assert len(chunk_batch_ids) == 1
chunk_batch_ids.extend(list(self.k_batch_ids[chunk_batch_ids[0]]))
return super().create_download_job_params(query_timerange, chunk_batch_ids)

Expand Down Expand Up @@ -478,16 +477,25 @@ def get_download_chunks(self, batch_id_to_urls_map):
'''For CSLC chunks we must group them by the batch_id that were determined at the time of triggering'''

chunk_map = defaultdict(list)
if len(list(batch_id_to_urls_map)) == 0:
return chunk_map.values()

frame_id, _ = split_download_batch_id(list(batch_id_to_urls_map)[0])

for batch_chunk in batch_id_to_urls_map.items():
chunk_map[batch_chunk[0]].append(batch_chunk) # We don't actually care about the URLs, we only care about the batch_id

'''indices = self.download_batch_ids[batch_chunk[0]]
for index in indices:
chunk_map[index].append(batch_chunk)
if (len(chunk_map[index]) > self.args.k):
logger.error([chunk for chunk, data in chunk_map[index]])
err_str = f"Number of download batches {len(chunk_map[index])} for frame {index} is greater than K {self.args.k}."
raise AssertionError(err_str)'''

# Chunking is done differently between historical and forward/reprocessing
if self.proc_mode == "historical":
chunk_map[frame_id].append(batch_chunk)
else:
chunk_map[batch_chunk[0]].append(
batch_chunk) # We don't actually care about the URLs, we only care about the batch_id

if self.proc_mode == "historical":
if (len(chunk_map[frame_id]) != self.args.k):
logger.error([chunk for chunk, data in chunk_map[frame_id]])
err_str = f"Number of download batches {len(chunk_map[frame_id])} for frame {frame_id} does not equal K {self.args.k}."
raise AssertionError(err_str)

return chunk_map.values()

Expand Down

0 comments on commit 6ca36e6

Please sign in to comment.