diff --git a/cads_adaptors/__init__.py b/cads_adaptors/__init__.py index 0294c65a..e53d7a9b 100644 --- a/cads_adaptors/__init__.py +++ b/cads_adaptors/__init__.py @@ -32,6 +32,7 @@ from cads_adaptors.adaptors.legacy import LegacyCdsAdaptor from cads_adaptors.adaptors.mars import DirectMarsCdsAdaptor, MarsCdsAdaptor from cads_adaptors.adaptors.multi import MultiAdaptor, MultiMarsCdsAdaptor +from cads_adaptors.adaptors.roocs import RoocsCdsAdaptor from cads_adaptors.adaptors.url import UrlCdsAdaptor from .tools.adaptor_tools import get_adaptor_class @@ -52,4 +53,5 @@ "MultiAdaptor", "MultiMarsCdsAdaptor", "ObservationsAdaptor", + "RoocsCdsAdaptor", ] diff --git a/cads_adaptors/adaptors/roocs/__init__.py b/cads_adaptors/adaptors/roocs/__init__.py new file mode 100644 index 00000000..9de4e99c --- /dev/null +++ b/cads_adaptors/adaptors/roocs/__init__.py @@ -0,0 +1,95 @@ +import os +from typing import BinaryIO + +from cads_adaptors import mapping +from cads_adaptors.adaptors.cds import AbstractCdsAdaptor, Request + + +class RoocsCdsAdaptor(AbstractCdsAdaptor): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.facets = self.config.get("facets", dict()) + self.facet_groups = self.config.get("facet_groups", dict()) + self.facets_order = self.config.get("facets_order", []) + + def retrieve(self, request: Request) -> BinaryIO: + from cads_adaptors.tools import download_tools, url_tools + + os.environ["ROOK_URL"] = "http://rook.dkrz.de/wps" + + # switch off interactive logging to avoid threading issues + os.environ["ROOK_MODE"] = "sync" + + import rooki + + request = mapping.apply_mapping(request, self.mapping) + + workflow = self.construct_workflow(request) + response = rooki.rooki.orchestrate(workflow=workflow._serialise()) + + response = workflow.orchestrate() + + try: + urls = response.download_urls() + except Exception: + raise Exception(response.status) + + paths = url_tools.try_download(urls, context=self.context) + + return download_tools.DOWNLOAD_FORMATS["zip"](paths) + + def construct_workflow(self, request): + os.environ["ROOK_URL"] = "http://rook.dkrz.de/wps" + import rooki.operators as rookops + + from cads_adaptors.adaptors.roocs import operators + + facets = self.find_facets(request) + + dataset_id = ".".join(facets.values()) + variable_id = facets.get("variable", "") + + workflow = rookops.Input(variable_id, [dataset_id]) + + for operator_class in operators.ROOKI_OPERATORS: + operator = operator_class(request) + kwargs = dict() + for parameter in operator.parameters: + if parameter.__name__ in request: + kwargs = operator.update_kwargs(kwargs, parameter()) + if kwargs: + workflow = getattr(rookops, operator.ROOKI)(workflow, **kwargs) + + return workflow + + def find_facets(self, request): + """ + Expand the CDS request into a full, unique set of facets for ROOCS. + + NOTE: This method assumes unique facets for each CDS request. + """ + remap = self.mapping.get("remap", dict()) + + request = { + k: (v if not isinstance(v, list) else v[0]) for k, v in request.items() + } + request = {k: remap.get(k, dict()).get(v, v) for k, v in request.items()} + request = {k: v for k, v in request.items() if k in self.facets[0]} + + for raw_candidate in self.facets: + candidate = raw_candidate.copy() + + for key, groups in self.facet_groups.items(): + if key in candidate: + for group in groups: + if candidate[key] in groups[group]: + candidate[key] = group + + if candidate.items() >= request.items(): + break + else: + raise ValueError(f"No data found for request {request}") + + # raise ValueError(str(raw_candidate) + " | " + str(self.facets_order)) + + return {key: raw_candidate[key] for key in self.facets_order} diff --git a/cads_adaptors/adaptors/roocs/operators.py b/cads_adaptors/adaptors/roocs/operators.py new file mode 100644 index 00000000..c3863011 --- /dev/null +++ b/cads_adaptors/adaptors/roocs/operators.py @@ -0,0 +1,157 @@ +import calendar +import types +import typing as T + +DEFAULT_DATE_FORMAT = "%Y-%m-%d" + + +def rooki_args(*keys): + def decorator(method): + def wrapper(self, *args, **kwargs): + values = method(self, *args, **kwargs) + return {keys[i]: values[i] for i in range(len(values))} + + return wrapper + + return decorator + + +def parse_date_string(date_string, date_format=DEFAULT_DATE_FORMAT): + """Format any datestring into the required WPS date format.""" + import dateutil + + datetime = dateutil.parser.parse(date_string) + return datetime.strftime(date_format) + + +class Operator: + ROOKI: T.Union[str, None] = None + + def __init__(self, request): + self.request = request + + @property + def parameters(self): + params = (getattr(self, parameter) for parameter in dir(self)) + return (item for item in params if isinstance(item, types.MethodType)) + + @staticmethod + def update_kwargs(target, source): + """Combine rooki parameters to be passed as a single operator argument.""" + for key, value in source.items(): + if key in target: + target[key] = "|".join((target[key], value)).rstrip("|") + else: + target[key] = value + return target + + +class Subset(Operator): + ROOKI: T.Union[str, None] = "Subset" + + def date(request): + """ + Extract a date range from a CDS request and translate it to a rooki-style + date range. + """ + date_range = request["date"] + + if isinstance(date_range, list): + start, end = str(date_range[0]), str(date_range[-1]) + date_range = f"{parse_date_string(start)}/{parse_date_string(end)}" + + return {"time": date_range} + + def year_range(self): + """Convert a CDS-style year range to a rooki-style year range.""" + years = self.request["year"] + if not isinstance(years, (list, tuple)): + years = [years] + return {"time": "/".join((min(years), max(years)))} + + def year(self): + """Convert a CDS-style year request to a rooki-style year request.""" + years = self.request["year"] + if not isinstance(years, (list, tuple)): + years = [years] + return { + **self.year_range(), + **{"time_components": "year:" + ",".join(years)}, + } + + def month(self): + """Convert a CDS-style month request to a rooki-style month request.""" + months = self.request["month"] + if not isinstance(months, (list, tuple)): + months = [months] + months = [calendar.month_name[int(month)].lower()[:3] for month in months] + return {"time_components": "month:" + ",".join(months)} + + def day(self): + """Convert a CDS-style day request to a rooki-style day request.""" + days = self.request["day"] + if not isinstance(days, (list, tuple)): + days = [days] + return {"time_components": "day:" + ",".join(days)} + + def level(self): + """ + Extract a level argument from a CDS request and translate it to a + rooki-style level range. + """ + import re + + levels = self.request["level"] + + if not isinstance(levels, (list, tuple)): + levels = [levels] + + sanitised_levels = [] + for level in levels: + matches = list(re.finditer(r"[\d]*[.][\d]+|[\d]+", level)) + if matches: + sanitised_levels.append(matches[0].string) + else: + sanitised_levels.append(level) + + levels = [] + for level in sanitised_levels: + try: + levels.append(int(float(level))) + except ValueError: + continue + if not levels: + return {} + + levels = ",".join([str(level) for level in levels]) + + return {"level": levels} + + def area(self): + """ + Extract an area argument from a CDS request and translate it to a + rooki-style geographical area range. + """ + extents = self.request["area"] + + if isinstance(extents, str): + delimiters = [",", "/"] + for delimiter in delimiters: + if delimiter in extents: + extents = extents.split(delimiter) + break + else: + raise ValueError(f"invalid area argument: {extents}") + + # reorder extents from MARS-style (NWSE) to rooki-style (WSEN) + extents_order = [1, 2, 3, 0] + extents = [extents[i] for i in extents_order] + + extents = ",".join(str(extent) for extent in extents) + + return {"area": extents} + + +ROOKI_OPERATORS = [ + Subset, +]