Skip to content

Commit

Permalink
initial commit of Harmony service code (cli, adapter, downloader,utils)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielfromearth committed Oct 31, 2023
1 parent 2361609 commit 58b1553
Show file tree
Hide file tree
Showing 5 changed files with 372 additions and 0 deletions.
Empty file.
33 changes: 33 additions & 0 deletions concatenator/harmony/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""A Harmony CLI wrapper around the concatenate-batcher"""
from argparse import ArgumentParser

import harmony

from concatenator.harmony.service_adapter import StitcheeAdapter as HarmonyAdapter


def main(config: harmony.util.Config = None) -> None:
"""Parse command line arguments and invoke the service to respond to them.
Parameters
----------
config : harmony.util.Config
harmony.util.Config is injectable for tests
Returns
-------
None
"""
parser = ArgumentParser(
prog="Stitchee", description="Run the STITCH by Extending a dimEnsion service"
)
harmony.setup_cli(parser)
args = parser.parse_args()
if harmony.is_harmony_cli(args):
harmony.run_cli(parser, args, HarmonyAdapter, cfg=config)
else:
parser.error("Only --harmony CLIs are supported")


if __name__ == "__main__":
main()
113 changes: 113 additions & 0 deletions concatenator/harmony/download_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""A utility for downloading multiple granules simultaneously"""

import queue
import re
from copy import deepcopy
from multiprocessing import Manager, Process
from os import cpu_count
from pathlib import Path
from urllib.parse import urlparse

from harmony.logging import build_logger
from harmony.util import download


def multi_core_download(urls, destination_dir, access_token, cfg, process_count=None):
"""
A method which automagically scales downloads to the number of CPU
cores. For further explaination, see documentation on "multi-track
drifting"
Parameters
----------
urls : list
list of urls to download
destination_dir : str
output path for downloaded files
access_token : str
access token as provided in Harmony input
cfg : dict
Harmony configuration information
process_count : int
Number of worker processes to run (expected >= 1)
Returns
-------
list
list of downloaded files as pathlib.Path objects
"""

if process_count is None:
process_count = cpu_count()

with Manager() as manager:
url_queue = manager.Queue(len(urls))
path_list = manager.list()

for url in urls:
url_queue.put(url)

# Spawn worker processes
processes = []
for _ in range(process_count):
download_process = Process(
target=_download_worker,
args=(url_queue, path_list, destination_dir, access_token, cfg),
)
processes.append(download_process)
download_process.start()

# Ensure worker processes exit successfully
for process in processes:
process.join()
if process.exitcode != 0:
raise RuntimeError(f"Download failed - exit code: {process.exitcode}")

process.close()

path_list = deepcopy(path_list) # ensure GC can cleanup multiprocessing

return [Path(path) for path in path_list]


def _download_worker(url_queue, path_list, destination_dir, access_token, cfg):
"""
A method to be executed in a separate process which processes the url_queue
and places paths to completed downloads into the path_list. Downloads are
handled by harmony.util.download
Parameters
----------
url_queue : queue.Queue
URLs to process - should be filled from start and only decreases
path_list : list
paths to completed file downloads
destination_dir : str
output path for downloaded files
access_token : str
access token as provided in Harmony input
cfg : dict
Harmony configuration information
"""

logger = build_logger(cfg)

while not url_queue.empty():
try:
url = url_queue.get_nowait()
except queue.Empty:
break

path = Path(
download(url, destination_dir, logger=logger, access_token=access_token, cfg=cfg)
)
filename_match = re.match(r".*\/(.+\..+)", urlparse(url).path)

if filename_match is not None:
filename = filename_match.group(1)
dest_path = path.parent.joinpath(filename)
path = path.rename(dest_path)
else:
logger.warning("Origin filename could not be assertained - %s", url)

path_list.append(str(path))
128 changes: 128 additions & 0 deletions concatenator/harmony/service_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
from pathlib import Path
from tempfile import TemporaryDirectory
from uuid import uuid4

import pystac
from harmony.adapter import BaseHarmonyAdapter
from harmony.util import bbox_to_geometry
from pystac import Item
from pystac.item import Asset

from concatenator.harmony.download_worker import multi_core_download
from concatenator.harmony.util import (
_get_netcdf_urls,
_get_output_bounding_box,
_get_output_date_range,
)
from concatenator.stitchee import stitchee


class StitcheeAdapter(BaseHarmonyAdapter):
"""
A harmony-service-lib wrapper around the concatenate-batcher module.
This wrapper does not support Harmony calls that do not have STAC catalogs
as support for this behavior is being depreciated in harmony-service-lib
"""

def __init__(self, message, catalog=None, config=None):
"""
Constructs the adapter
Parameters
----------
message : harmony.Message
The Harmony input which needs acting upon
catalog : pystac.Catalog
A STAC catalog containing the files on which to act
config : harmony.util.Config
The configuration values for this runtime environment.
"""
super().__init__(message, catalog=catalog, config=config)

def invoke(self):
"""
Primary entrypoint into the service wrapper. Overrides BaseHarmonyAdapter.invoke
"""
if not self.catalog:
# Message-only support is being depreciated in Harmony, so we should expect to
# only see requests with catalogs when invoked with a newer Harmony instance
# https://github.com/nasa/harmony-service-lib-py/blob/21bcfbda17caf626fb14d2ac4f8673be9726b549/harmony/adapter.py#L71
raise RuntimeError("Invoking Batchee without a STAC catalog is not supported")

return self.message, self.process_catalog(self.catalog)

def process_catalog(self, catalog: pystac.Catalog):
"""Converts a list of STAC catalogs into a list of lists of STAC catalogs."""
self.logger.info("process_catalog() started.")
try:
result = catalog.clone()
result.id = str(uuid4())
result.clear_children()

# Get all the items from the catalog, including from child or linked catalogs
items = list(self.get_all_catalog_items(catalog))

self.logger.info(f"length of items==={len(items)}.")

# Quick return if catalog contains no items
if len(items) == 0:
return result

# # --- Get granule filepaths (urls) ---
netcdf_urls: list[str] = _get_netcdf_urls(items)
self.logger.info(f"netcdf_urls==={netcdf_urls}.")

# -- Process metadata --
bounding_box: list | None = _get_output_bounding_box(items)
datetimes = _get_output_date_range(items)

# Items did not have a bbox; valid under spec
if bounding_box and len(bounding_box) == 0:
bounding_box = None

# -- Perform merging --
collection = self._get_item_source(items[0]).collection
filename = f"{collection}_merged.nc4"

with TemporaryDirectory() as temp_dir:
self.logger.info("Starting granule downloads")
input_files = multi_core_download(
netcdf_urls, temp_dir, self.message.accessToken, self.config
)
self.logger.info("Finished granule downloads")

output_path = str(Path(temp_dir).joinpath(filename).resolve())

# # --- Run STITCHEE ---
stitchee(
input_files,
output_path,
write_tmp_flat_concatenated=False,
keep_tmp_files=False,
concat_dim="mirror_step", # This is currently set only for TEMPO
logger=self.logger,
)
staged_url = self._stage(output_path, filename, "application/x-netcdf4")

# -- Output to STAC catalog --
result.clear_items()
properties = dict(
start_datetime=datetimes["start_datetime"], end_datetime=datetimes["end_datetime"]
)

item = Item(
str(uuid4()), bbox_to_geometry(bounding_box), bounding_box, None, properties
)
asset = Asset(
staged_url, title=filename, media_type="application/x-netcdf4", roles=["data"]
)
item.add_asset("data", asset)
result.add_item(item)

self.logger.info("STAC catalog creation complete.")

return result

except Exception as service_exception:
self.logger.error(service_exception, exc_info=1)
raise service_exception
98 changes: 98 additions & 0 deletions concatenator/harmony/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""Misc utility functions"""
from datetime import datetime

from pystac import Asset, Item

VALID_EXTENSIONS = (".nc4", ".nc")
VALID_MEDIA_TYPES = ["application/x-netcdf", "application/x-netcdf4"]


def _is_netcdf_asset(asset: Asset) -> bool:
"""Check that a `pystac.Asset` is a valid NetCDF-4 granule. This can be
ascertained via either the media type or by checking the extension of
granule itself if that media type is absent.
"""
return asset.media_type in VALID_MEDIA_TYPES or (
asset.media_type is None and asset.href.lower().endswith(VALID_EXTENSIONS)
)


def _get_item_url(item: Item) -> str | None:
"""Check the `pystac.Item` for the first asset with the `data` role and a
valid input format. If there are no matching assets, return None
"""
return next(
(
asset.href
for asset in item.assets.values()
if "data" in (asset.roles or []) and _is_netcdf_asset(asset)
),
None,
)


def _get_netcdf_urls(items: list[Item]) -> list[str]:
"""Iterate through a list of `pystac.Item` instances, from the input
`pystac.Catalog`. Extract the `pystac.Asset.href` for the first asset
of each item that has a role of "data". If there are any items that do
not have a data asset, then raise an exception.
"""
catalog_urls = [_get_item_url(item) for item in items]

if None in catalog_urls:
raise RuntimeError("Some input granules do not have NetCDF-4 assets.")

return catalog_urls # type: ignore[return-value]


def _get_output_bounding_box(input_items: list[Item]) -> list[float]:
"""Create a bounding box that is the maximum combined extent of all input
`pystac.Item` bounding box extents.
"""
bounding_box = input_items[0].bbox

for item in input_items:
bounding_box[0] = min(bounding_box[0], item.bbox[0])
bounding_box[1] = min(bounding_box[1], item.bbox[1])
bounding_box[2] = max(bounding_box[2], item.bbox[2])
bounding_box[3] = max(bounding_box[3], item.bbox[3])

return bounding_box


def _get_output_date_range(input_items: list[Item]) -> dict[str, str]:
"""Create a dictionary of start and end datetime, which encompasses the
full temporal range of all input `pystac.Item` instances. This output
dictionary will be used for the `properties` of the output Zarr store
`pystac.Item`.
"""
start_datetime, end_datetime = _get_item_date_range(input_items[0])

for item in input_items:
new_start_datetime, new_end_datetime = _get_item_date_range(item)
start_datetime = min(start_datetime, new_start_datetime)
end_datetime = max(end_datetime, new_end_datetime)

return {"start_datetime": start_datetime.isoformat(), "end_datetime": end_datetime.isoformat()}


def _get_item_date_range(item: Item) -> tuple[datetime, datetime]:
"""A helper function to retrieve the temporal range from a `pystac.Item`
instance. If the `pystac.Item.datetime` property exists, there is a
single datetime associated with the granule, otherwise there will be a
start and end time contained within the `pystac.Item` metadata.
"""
if item.datetime is None:
start_datetime = item.common_metadata.start_datetime
end_datetime = item.common_metadata.end_datetime
else:
start_datetime = item.datetime
end_datetime = item.datetime

return start_datetime, end_datetime

0 comments on commit 58b1553

Please sign in to comment.