Skip to content

Commit

Permalink
roocs adaptor (#80)
Browse files Browse the repository at this point in the history
* roocs adaptor code
  • Loading branch information
EddyCMWF authored Feb 19, 2024
1 parent 6ce815a commit 54cea15
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cads_adaptors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from cads_adaptors.adaptors.legacy import LegacyCdsAdaptor
from cads_adaptors.adaptors.mars import DirectMarsCdsAdaptor, MarsCdsAdaptor
from cads_adaptors.adaptors.multi import MultiAdaptor, MultiMarsCdsAdaptor
from cads_adaptors.adaptors.roocs import RoocsCdsAdaptor
from cads_adaptors.adaptors.url import UrlCdsAdaptor

from .tools.adaptor_tools import get_adaptor_class
Expand All @@ -52,4 +53,5 @@
"MultiAdaptor",
"MultiMarsCdsAdaptor",
"ObservationsAdaptor",
"RoocsCdsAdaptor",
]
95 changes: 95 additions & 0 deletions cads_adaptors/adaptors/roocs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import os
from typing import BinaryIO

from cads_adaptors import mapping
from cads_adaptors.adaptors.cds import AbstractCdsAdaptor, Request


class RoocsCdsAdaptor(AbstractCdsAdaptor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.facets = self.config.get("facets", dict())
self.facet_groups = self.config.get("facet_groups", dict())
self.facets_order = self.config.get("facets_order", [])

def retrieve(self, request: Request) -> BinaryIO:
from cads_adaptors.tools import download_tools, url_tools

os.environ["ROOK_URL"] = "http://rook.dkrz.de/wps"

# switch off interactive logging to avoid threading issues
os.environ["ROOK_MODE"] = "sync"

import rooki

request = mapping.apply_mapping(request, self.mapping)

workflow = self.construct_workflow(request)
response = rooki.rooki.orchestrate(workflow=workflow._serialise())

response = workflow.orchestrate()

try:
urls = response.download_urls()
except Exception:
raise Exception(response.status)

paths = url_tools.try_download(urls, context=self.context)

return download_tools.DOWNLOAD_FORMATS["zip"](paths)

def construct_workflow(self, request):
os.environ["ROOK_URL"] = "http://rook.dkrz.de/wps"
import rooki.operators as rookops

from cads_adaptors.adaptors.roocs import operators

facets = self.find_facets(request)

dataset_id = ".".join(facets.values())
variable_id = facets.get("variable", "")

workflow = rookops.Input(variable_id, [dataset_id])

for operator_class in operators.ROOKI_OPERATORS:
operator = operator_class(request)
kwargs = dict()
for parameter in operator.parameters:
if parameter.__name__ in request:
kwargs = operator.update_kwargs(kwargs, parameter())
if kwargs:
workflow = getattr(rookops, operator.ROOKI)(workflow, **kwargs)

return workflow

def find_facets(self, request):
"""
Expand the CDS request into a full, unique set of facets for ROOCS.
NOTE: This method assumes unique facets for each CDS request.
"""
remap = self.mapping.get("remap", dict())

request = {
k: (v if not isinstance(v, list) else v[0]) for k, v in request.items()
}
request = {k: remap.get(k, dict()).get(v, v) for k, v in request.items()}
request = {k: v for k, v in request.items() if k in self.facets[0]}

for raw_candidate in self.facets:
candidate = raw_candidate.copy()

for key, groups in self.facet_groups.items():
if key in candidate:
for group in groups:
if candidate[key] in groups[group]:
candidate[key] = group

if candidate.items() >= request.items():
break
else:
raise ValueError(f"No data found for request {request}")

# raise ValueError(str(raw_candidate) + " | " + str(self.facets_order))

return {key: raw_candidate[key] for key in self.facets_order}
157 changes: 157 additions & 0 deletions cads_adaptors/adaptors/roocs/operators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import calendar
import types
import typing as T

DEFAULT_DATE_FORMAT = "%Y-%m-%d"


def rooki_args(*keys):
def decorator(method):
def wrapper(self, *args, **kwargs):
values = method(self, *args, **kwargs)
return {keys[i]: values[i] for i in range(len(values))}

return wrapper

return decorator


def parse_date_string(date_string, date_format=DEFAULT_DATE_FORMAT):
"""Format any datestring into the required WPS date format."""
import dateutil

datetime = dateutil.parser.parse(date_string)
return datetime.strftime(date_format)


class Operator:
ROOKI: T.Union[str, None] = None

def __init__(self, request):
self.request = request

@property
def parameters(self):
params = (getattr(self, parameter) for parameter in dir(self))
return (item for item in params if isinstance(item, types.MethodType))

@staticmethod
def update_kwargs(target, source):
"""Combine rooki parameters to be passed as a single operator argument."""
for key, value in source.items():
if key in target:
target[key] = "|".join((target[key], value)).rstrip("|")
else:
target[key] = value
return target


class Subset(Operator):
ROOKI: T.Union[str, None] = "Subset"

def date(request):
"""
Extract a date range from a CDS request and translate it to a rooki-style
date range.
"""
date_range = request["date"]

if isinstance(date_range, list):
start, end = str(date_range[0]), str(date_range[-1])
date_range = f"{parse_date_string(start)}/{parse_date_string(end)}"

return {"time": date_range}

def year_range(self):
"""Convert a CDS-style year range to a rooki-style year range."""
years = self.request["year"]
if not isinstance(years, (list, tuple)):
years = [years]
return {"time": "/".join((min(years), max(years)))}

def year(self):
"""Convert a CDS-style year request to a rooki-style year request."""
years = self.request["year"]
if not isinstance(years, (list, tuple)):
years = [years]
return {
**self.year_range(),
**{"time_components": "year:" + ",".join(years)},
}

def month(self):
"""Convert a CDS-style month request to a rooki-style month request."""
months = self.request["month"]
if not isinstance(months, (list, tuple)):
months = [months]
months = [calendar.month_name[int(month)].lower()[:3] for month in months]
return {"time_components": "month:" + ",".join(months)}

def day(self):
"""Convert a CDS-style day request to a rooki-style day request."""
days = self.request["day"]
if not isinstance(days, (list, tuple)):
days = [days]
return {"time_components": "day:" + ",".join(days)}

def level(self):
"""
Extract a level argument from a CDS request and translate it to a
rooki-style level range.
"""
import re

levels = self.request["level"]

if not isinstance(levels, (list, tuple)):
levels = [levels]

sanitised_levels = []
for level in levels:
matches = list(re.finditer(r"[\d]*[.][\d]+|[\d]+", level))
if matches:
sanitised_levels.append(matches[0].string)
else:
sanitised_levels.append(level)

levels = []
for level in sanitised_levels:
try:
levels.append(int(float(level)))
except ValueError:
continue
if not levels:
return {}

levels = ",".join([str(level) for level in levels])

return {"level": levels}

def area(self):
"""
Extract an area argument from a CDS request and translate it to a
rooki-style geographical area range.
"""
extents = self.request["area"]

if isinstance(extents, str):
delimiters = [",", "/"]
for delimiter in delimiters:
if delimiter in extents:
extents = extents.split(delimiter)
break
else:
raise ValueError(f"invalid area argument: {extents}")

# reorder extents from MARS-style (NWSE) to rooki-style (WSEN)
extents_order = [1, 2, 3, 0]
extents = [extents[i] for i in extents_order]

extents = ",".join(str(extent) for extent in extents)

return {"area": extents}


ROOKI_OPERATORS = [
Subset,
]

0 comments on commit 54cea15

Please sign in to comment.