Skip to content

Commit

Permalink
Merge pull request #242 from ecmwf-projects/feature/regional_ecpds
Browse files Browse the repository at this point in the history
Feature/regional ecpds
  • Loading branch information
gbiavati authored Dec 4, 2024
2 parents 4c8f1aa + ff7c20b commit af8de76
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 82 deletions.
68 changes: 42 additions & 26 deletions cads_adaptors/adaptors/cams_regional_fc/cacher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import re
import threading
import time
from urllib.parse import urlparse

import boto3
import jinja2
Expand All @@ -23,12 +24,25 @@ class AbstractCacher:
"""

def __init__(
self, integration_server, logger=None, no_put=False, permanent_fields=None
self,
integration_server,
logger=None,
no_put=False,
permanent_fields=None,
no_cache_key=None,
):
self.integration_server = integration_server
self.logger = logging.getLogger(__name__) if logger is None else logger
self.no_put = no_put

# The name of a key which, if present in the original request, will be
# inserted into the field description dictionary when writing to the
# cache, which means it will appear in the cached filename. Its presence
# will also mean corresponding files are always written to temporary
# space. It is used for optionally avoiding the cache provided by this
# class.
self.no_cache_key = no_cache_key or "_no_cache"

# Fields which should be cached permanently (on the datastore). All
# other fields will be cached in temporary locations.
if permanent_fields is None:
Expand Down Expand Up @@ -80,14 +94,14 @@ def put(self, req):
)
assert nmatches == 1

# If no_cache was in the request then insert it into req1field.
# This means it will appear in the cache file name, which is
# useful for regression testing. It means multiple tests that
# request the same field can share a unique no_cache value so
# the field is retrieved from the backend the first time but
# from cache on subsequent attempts.
if "no_cache" in req["req"]:
req1field["no_cache"] = req["req"]["no_cache"]
# If self.no_cache_key was in the request then insert it into
# req1field. This means it will appear in the cache file name,
# which is useful for regression testing. It means multiple
# tests that request the same field can share a unique no-cache
# value so the field is retrieved from the backend the first
# time but from cache on subsequent attempts.
if self.no_cache_key in req["req"]:
req1field[self.no_cache_key] = req["req"][self.no_cache_key]

# Convert the message to pure binary data and write to cache
self._write_field(codes_get_message(msg), req1field)
Expand Down Expand Up @@ -125,12 +139,12 @@ def _cache_permanently(self, field):
"""Return True if this field should be put in the permanent cache, False
otherwise.
"""
# Is this a field which should be stored in a permanent location? If
# the field contains an area specification then it isn't because only
# full-area fields are stored permanently. The "no_cache" key is set to
# a random string to defeat the system cache when testing so make sure
# that's not stored permanently.
if "north" not in field and "no_cache" not in field:
# Is this a field which should be stored in a permanent location? If the
# field contains an area specification then it isn't because only
# full-area fields are stored permanently. The self.no_cache_key key is
# set to a random string to defeat the system cache when testing so make
# sure that's not stored permanently.
if "north" not in field and self.no_cache_key not in field:
permanent, _, _ = hcubes_intdiff2(
{k: [v] for k, v in field.items()}, self.permanent_fields
)
Expand Down Expand Up @@ -298,18 +312,27 @@ class CacherS3(AbstractAsyncCacher):
bucket.
"""

def __init__(self, *args, **kwargs):
def __init__(self, *args, s3_bucket=None, create_bucket=False, **kwargs):
super().__init__(*args, **kwargs)

self._host = "object-store.os-api.cci2.ecmwf.int"
self._bucket = "cci2-cams-regional-fc"
endpoint_url = os.environ["STORAGE_API_URL"]
self._host = urlparse(endpoint_url).hostname
self._bucket = s3_bucket or "cci2-cams-regional-fc"
self._credentials = dict(
endpoint_url=os.environ["STORAGE_API_URL"],
endpoint_url=endpoint_url,
aws_access_key_id=os.environ["STORAGE_ADMIN"],
aws_secret_access_key=os.environ["STORAGE_PASSWORD"],
)
self.client = boto3.client("s3", **self._credentials)

# If it's not guaranteed that the bucket already exists then the caller
# should pass create_bucket=True
if create_bucket:
rsrc = boto3.resource("s3", **self._credentials)
bkt = rsrc.Bucket(self._bucket)
if not bkt.creation_date:
bkt = self.client.create_bucket(Bucket=self._bucket)

def _write_field_sync(self, data, fieldinfo):
"""Write the data described by fieldinfo to the appropriate cache
location.
Expand All @@ -321,13 +344,6 @@ def _write_field_sync(self, data, fieldinfo):
f"Caching {fieldinfo} to {self._host}:{self._bucket}:{remote_path}"
)

# Uncomment this code if it can't be trusted that the bucket already
# exists
# resource = boto3.resource('s3', **self._credentials)
# bkt = resource.Bucket(self._bucket)
# if not bkt.creation_date:
# bkt = self.client.create_bucket(Bucket=self._bucket)

attempt = 0
t0 = time.time()
while True:
Expand Down
68 changes: 35 additions & 33 deletions cads_adaptors/adaptors/cams_regional_fc/cams_regional_fc.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def cams_regional_fc(context, config, requests):
regapi = regional_fc_api(integration_server=integration_server, logger=context)

# Pre-process requests
requests, info = preprocess_requests(context, requests, regapi)
requests, info = preprocess_requests(context, config, requests, regapi)
info["config"] = config

# If converting to NetCDF then different groups of grib files may need to be
Expand Down Expand Up @@ -151,7 +151,14 @@ def get_local(req_groups, integration_server, config, context):
the datastore) and identify the remaining non-local fields.
"""
# Cacher has knowledge of cache locations
with Cacher(integration_server, logger=context) as cacher:
cfg = config.get("regional_fc", {})
no_cache_key = cfg.get("no_cache_key")
with Cacher(
integration_server,
logger=context,
no_cache_key=no_cache_key,
**cfg.get("cacher_kwargs", {}),
) as cacher:
for req_group in req_groups:
_get_local(req_group, cacher, config, context)

Expand Down Expand Up @@ -275,7 +282,7 @@ def get_uncached(requests, req_group, config, context):
if size > 0:
req_group["retrieved_files"] = req_group.get("retrieved_files", []) + [path]
else:
context.add_stdout("Sub-request target file is empty")
context.info("Sub-request target file is empty")

return path

Expand Down Expand Up @@ -303,48 +310,43 @@ def retrieve_subrequest(backend, requests, req_group, config, context):

# Launch the sub-request
response = None
sub_request_uid = None
sub_request_uid = "n/a"
dataset = f"cams-europe-air-quality-forecasts-{backend}"
try:
response = client.retrieve(
dataset,
{"requests": requests, "parent_config": config},
)
except Exception as e:
sub_request_uid = "none" if response is None else response.request_uid
context.add_stderr(
"Sub-request "
+ ("" if response is None else f"({response.request_uid}) ")
+ f"failed: {e!r}"
sub_request_uid = response.request_uid
context.info(
f"Sub-request {sub_request_uid} has been launched (via the " "CDSAPI)."
)
# Download the result
exc = None
for i_retry in range(MAX_SUBREQUEST_RESULT_DOWNLOAD_RETRIES):
try:
response.download(target)
break
except Exception as e:
context.error(
f"Attempt {i_retry+1} to download the result of sub-request "
f"{sub_request_uid} failed: {e!r}"
)
exc = e
else:
raise exc
except Exception as e:
context.error(f"Sub-request ({sub_request_uid}) failed: {e!r}")
if maintenance_msg:
context.add_user_visible_error(maintenance_msg)
raise InvalidRequest(maintenance_msg) from None
else:
raise RuntimeError(
msg = (
f"Failed to retrieve data from {backend} remote server. "
"Please try again later."
) from None
else:
sub_request_uid = response.request_uid
message = f"Sub-request {sub_request_uid} has been launched (via the CDSAPI)."
context.add_stdout(message)

# Download the result
exc = None
for i_retry in range(MAX_SUBREQUEST_RESULT_DOWNLOAD_RETRIES):
try:
response.download(target)
break
except Exception as e:
exc = e
context.add_stdout(
f"Attempt {i_retry+1} to download the result of sub-request "
f"{sub_request_uid} failed: {e!r}"
+ "Please try again later."
)
else:
message = f"Failed to download sub-request result: {exc!r}"
context.add_stderr(message)
raise RuntimeError(message) from None
context.add_user_visible_error(msg)
raise RuntimeError(msg) from None

size = os.path.getsize(target)
context.info(
Expand Down
7 changes: 1 addition & 6 deletions cads_adaptors/adaptors/cams_regional_fc/grib2request.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,8 @@
def grib2request_init(regfc_defns):
"""Initialise global variables: grib_key_types and field_data. This is so
that it doesn't need to be done multiple times or in grib2request(),
which is called from places where the path to the dataset directory is
not easily available.
which is called from places where the dataset config is not easily available.
"""
# Do not execute twice
if field_data:
return

# Link grib representations to API request values
field_data.update(
{
Expand Down
17 changes: 12 additions & 5 deletions cads_adaptors/adaptors/cams_regional_fc/meteo_france_retrieve.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@

def meteo_france_retrieve(
requests,
target,
regapi,
regfc_defns,
integration_server,
target=None,
tmpdir=None,
max_rate=None,
max_simultaneous=None,
cacher_kwargs=None,
combine_method=None,
logger=None,
**kwargs,
):
Expand All @@ -32,6 +33,11 @@ def meteo_france_retrieve(
if logger is None:
logger = logging.getLogger(__name__)

# By default, if a target has been provided then all grib fields will be
# concatenated into it. Otherwise they will not be written to file.
if combine_method is None:
combine_method = "cat" if target else "null"

# Keyword argument options to Downloader that depend on the backend
# (archived/latest)
backend_specific = {
Expand Down Expand Up @@ -103,7 +109,7 @@ def meteo_france_retrieve(
getter=getter,
max_rate=rate_limiter,
max_simultaneous=number_limiter,
combine_method="cat" if target else "null",
combine_method=combine_method,
target_suffix=".grib",
response_checker=assert_valid_grib,
response_checker_threadsafe=False,
Expand All @@ -119,8 +125,9 @@ def meteo_france_retrieve(
t0 = time.time()

try:
# Returns None if no data is found
file = downloader.execute(urlreqs, target=target)
# Returns None if no data is found. Return a list if combine_method
# is "none".
output = downloader.execute(urlreqs, target=target)
except RequestFailed as e:
req = {x["url"]: x["req"] for x in urlreqs}[e.url]
raise Exception(
Expand All @@ -139,7 +146,7 @@ def meteo_france_retrieve(

logger.info("Meteo France download finished")

return file
return output


def make_api_hypercubes(requests, regapi):
Expand Down
26 changes: 20 additions & 6 deletions cads_adaptors/adaptors/cams_regional_fc/preprocess_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,17 @@
from .formats import Formats


def preprocess_requests(context, requests, regapi):
def preprocess_requests(context, config, requests, regapi):
# Enforce basic type conformance
requests = apply_schema(requests, context)
requests = apply_schema(requests, config, context)

# The no_cache (as opposed to _no_cache) key is still allowed for backward
# compatibility because some external users still use it, but they no longer
# need to subvert the system-level cache to avoid getting out-of-date
# results. It just causes unecessary downloads from Meteo France. So get rid
# of it.
for r in requests:
r.pop("no_cache", None)

# Get output format and remove from requests
format = requests[0]["format"][0]
Expand Down Expand Up @@ -50,8 +58,7 @@ def preprocess_requests(context, requests, regapi):
+ "Please either request grib format, make separate requests or "
+ "explicitly specify an area that will result in output on a "
+ "single grid\n\n"
+ model_grids_table(model_grids, regapi),
"",
+ model_grids_table(model_grids, regapi)
)

# Ensure date lists are not in compressed form
Expand All @@ -69,7 +76,7 @@ def preprocess_requests(context, requests, regapi):
return requests, info


def apply_schema(requests, context):
def apply_schema(requests, config, context):
"""Enforce basic type conformance of the requests according to a schema."""
mandatory_keys = [
"variable",
Expand All @@ -83,7 +90,14 @@ def apply_schema(requests, context):
]
recognised_keys = sorted(
set(mandatory_keys).union(
["area", "no_cache", "__in_adaptor_no_cache", "_local_subarea"]
[
"no_cache", # Old user cache subversion key, still used by some
"_no_cache", # New user cache subversion key
"__in_adaptor_no_cache", # System cache subversion key
config.get("regional_fc", {}).get("no_cache_key", "_no_cache"),
"area",
"_local_subarea",
]
)
)

Expand Down
8 changes: 6 additions & 2 deletions cads_adaptors/adaptors/cams_regional_fc/subrequest_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,23 @@ def subrequest_main(backend, request, child_config, context):
)
os.close(fd)

cacher_kwargs = cfg.get("cacher_kwargs", {})
if cfg.get("no_cache_key"):
cacher_kwargs["no_cache_key"] = cfg["no_cache_key"]

# Get the data
try:
meteo_france_retrieve(
request["requests"],
target,
regapi,
cfg["definitions"],
integration_server,
target=target,
logger=context,
tmpdir=STACK_TEMP_DIR,
max_rate=cfg.get("meteofrance_max_rate"),
max_simultaneous=cfg.get("meteofrance_max_simultaneous"),
cacher_kwargs=cfg.get("cacher_kwargs", {}),
cacher_kwargs=cacher_kwargs,
)
except Exception as e:
message = f"Failed to obtain data from remote server: {e!r}"
Expand Down
Loading

0 comments on commit af8de76

Please sign in to comment.