Skip to content

Commit

Permalink
Merge branch 'feature/nrt-tags' into hotfix/pin-multiurl-0.3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
ecmwf-cobarzan committed Nov 6, 2024
2 parents 59f5d29 + 03e8b46 commit f9a0739
Show file tree
Hide file tree
Showing 15 changed files with 439 additions and 1,277 deletions.
49 changes: 8 additions & 41 deletions cads_adaptors/adaptors/cams_regional_fc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

from cads_adaptors.adaptors.cds import AbstractCdsAdaptor, Request

STACK_TEMP_DIR = "/tmp/cams-europe-air-quality-forecasts/temp"
STACK_DOWNLOAD_DIR = "/tmp/cams-europe-air-quality-forecasts/download"


class CAMSEuropeAirQualityForecastsAdaptor(AbstractCdsAdaptor):
def retrieve(self, request: Request) -> BinaryIO:
Expand All @@ -16,54 +19,18 @@ def retrieve(self, request: Request) -> BinaryIO:

result_file = cams_regional_fc(self.context, self.config, self.mapped_requests)

return open(result_file.path, "rb")
return open(result_file, "rb")


class CAMSEuropeAirQualityForecastsAdaptorForLatestData(AbstractCdsAdaptor):
def retrieve(self, request: Request) -> BinaryIO:
from .cams_regional_fc import retrieve_latest

message = (
f"The parent request is {request['parent_request_uid']}, "
"launched by user {request['parent_request_user_uid']}."
)
self.context.add_stdout(message)
from .subrequest_main import subrequest_main

result_file = retrieve_latest(
self.context,
request["requests"],
request["dataset_dir"],
request["integration_server"],
)
if hasattr(result_file, "path"):
return open(result_file.path, "rb")
else:
request_uid = self.config.get("request_uid", None)
message = f"Sub-request {request_uid} failed to produce a result when one was expected."
self.context.add_stderr(message)
raise RuntimeError(message)
return subrequest_main("latest", request, self.config, self.context)


class CAMSEuropeAirQualityForecastsAdaptorForArchivedData(AbstractCdsAdaptor):
def retrieve(self, request: Request) -> BinaryIO:
from .cams_regional_fc import retrieve_archived

message = (
f"The parent request is {request['parent_request_uid']}, "
"launched by user {request['parent_request_user_uid']}."
)
self.context.add_stdout(message)
from .subrequest_main import subrequest_main

result_file = retrieve_archived(
self.context,
request["requests"],
request["dataset_dir"],
request["integration_server"],
)
if hasattr(result_file, "path"):
return open(result_file.path, "rb")
else:
request_uid = self.config.get("request_uid", None)
message = f"Sub-request {request_uid} failed to produce a result when one was expected."
self.context.add_stderr(message)
raise RuntimeError(message)
return subrequest_main("archived", request, self.config, self.context)
20 changes: 8 additions & 12 deletions cads_adaptors/adaptors/cams_regional_fc/assert_valid_grib.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import os
import random
from datetime import datetime

from cds_common.message_iterators import grib_bytes_iterator
from cds_common.url2.downloader import ResponseException
from eccodes import codes_is_defined

from .grib2request import grib2request


def assert_valid_grib(req, response, context):
def assert_valid_grib(req, response):
"""Raise a ResponseException if the request response indicates success but
the content is not a valid grib message.
"""
Expand Down Expand Up @@ -43,13 +39,13 @@ def assert_valid_grib(req, response, context):

except Exception as e:
# Write bad grib to file for investigation?
if datetime.now() < datetime(2021, 10, 31, 0):
rn = random.randint(0, 2**128)
file = f"/tmp/cams-europe-air-quality-forecasts/debug/badgrib_{context.request_id}.{rn}.grib"
context.info(f'Writing bad grib to {file}: {req["url"]}')
os.makedirs(os.path.dirname(file), exist_ok=True)
with open(file, "wb") as f:
f.write(response.content)
# if datetime.now() < datetime(2021, 10, 31, 0):
# rn = random.randint(0, 2**128)
# file = f"/tmp/cams-europe-air-quality-forecasts/debug/badgrib_{context.request_id}.{rn}.grib"
# context.info(f'Writing bad grib to {file}: {req["url"]}')
# os.makedirs(os.path.dirname(file), exist_ok=True)
# with open(file, "wb") as f:
# f.write(response.content)

raise ResponseException(
"Request did not return valid grib: " + "{}: {}".format(e, req["req"])
Expand Down
65 changes: 35 additions & 30 deletions cads_adaptors/adaptors/cams_regional_fc/cacher.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import concurrent.futures
import io
import logging
import os
import re
import threading
Expand All @@ -21,13 +22,18 @@ class AbstractCacher:
defines the interface.
"""

def __init__(self, context, no_put=False):
self.context = context
def __init__(
self, integration_server, logger=None, no_put=False, permanent_fields=None
):
self.integration_server = integration_server
self.logger = logging.getLogger(__name__) if logger is None else logger
self.no_put = no_put

# Fields which should be cached permanently (on the datastore). All
# other fields will be cached in temporary locations.
self.permanent_fields = [{"model": ["ENS"], "level": ["0"]}]
if permanent_fields is None:
permanent_fields = [{"model": ["ENS"], "level": ["0"]}]
self.permanent_fields = permanent_fields

def done(self):
pass
Expand Down Expand Up @@ -69,9 +75,7 @@ def put(self, req):
nmatches = count_fields(intn)
if nmatches == 0:
raise Exception(
"Got unexpected field "
+ repr(req1field)
+ " from request "
f"Got unexpected field {req1field!r} from request "
+ repr(req["req"])
)
assert nmatches == 1
Expand Down Expand Up @@ -139,8 +143,6 @@ def _cache_file_path(self, fieldinfo):
"""Return a field-specific path or the given field. Can be used by a
child class to determine server-side cache location.
"""
dir = "permanent" if self._cache_permanently(fieldinfo) else "temporary"

# Set the order we'd like the keys to appear in the filename. Area
# keys will be last.
order1 = ["model", "type", "variable", "level", "time", "step"]
Expand All @@ -159,16 +161,12 @@ def key_order(k):
if keys not in self._templates:
# Form a Jinja2 template string for the cache files. "_backend" not
# used; organised by date; area keys put at the end.
path_template = (
dir
+ "/{{ date }}/"
+ "_".join(
[
"{k}={{{{ {k} }}}}".format(k=k)
for k in sorted(keys, key=key_order)
if k not in ["date", "_backend"]
]
)
path_template = "{{ date }}/" + "_".join(
[
"{k}={{{{ {k} }}}}".format(k=k)
for k in sorted(keys, key=key_order)
if k not in ["date", "_backend"]
]
)
self._templates[keys] = jinja2.Template(path_template)

Expand All @@ -181,7 +179,12 @@ def key_order(k):
"Bad characters in value for " + k + ": " + repr(v)
)

return self._templates[keys].render(fieldinfo)
dir = "permanent" if self._cache_permanently(fieldinfo) else "temporary"
# Data from the integration server should not mix with the production data
if self.integration_server:
dir += "_esuite"

return f"{dir}/" + self._templates[keys].render(fieldinfo)


class AbstractAsyncCacher(AbstractCacher):
Expand All @@ -193,11 +196,11 @@ class is still abstract since it does not do the actual data copy. It

def __init__(
self,
context,
*args,
nthreads=10,
max_mem=100000000,
tmpdir="/cache/tmp",
logger=None,
nthreads=None,
max_mem=None,
tmpdir=None,
**kwargs,
):
"""The number of fields that will be written concurrently to the cache
Expand All @@ -210,15 +213,17 @@ def __init__(
temporarily written to disk (in tmpdir) to avoid excessive memory
usage.
"""
super().__init__(context, *args, **kwargs)
self.nthreads = nthreads
super().__init__(*args, logger=logger, **kwargs)
self.nthreads = 10 if nthreads is None else nthreads
self._lock1 = threading.Lock()
self._lock2 = threading.Lock()
self._qclosed = False
self._templates = {}
self._futures = []
self._start_time = None
self._queue = MemSafeQueue(max_mem, tmpdir, logger=context)
self._queue = MemSafeQueue(
100000000 if max_mem is None else max_mem, tmpdir=tmpdir, logger=logger
)

def _start_copy_threads(self):
"""Start the threads that will do the remote copies."""
Expand Down Expand Up @@ -250,7 +255,7 @@ def done(self):
"drain": now - qclose_time,
"io": iotime,
}
self.context.info(f"MemSafeQueue summary: {summary!r}")
self.logger.info(f"MemSafeQueue summary: {summary!r}")

def __enter__(self):
return self
Expand Down Expand Up @@ -312,8 +317,8 @@ def _write_field_sync(self, data, fieldinfo):
local_object = io.BytesIO(data)
remote_path = self._cache_file_path(fieldinfo)

self.context.debug(
f"CACHER: copying data to " f"{self._host}:{self._bucket}:{remote_path}"
self.logger.info(
f"Caching {fieldinfo} to {self._host}:{self._bucket}:{remote_path}"
)

# Uncomment this code if it can't be trusted that the bucket already
Expand All @@ -337,7 +342,7 @@ def _write_field_sync(self, data, fieldinfo):
status = "uploaded"
break
except Exception as exc:
self.context.error(
self.logger.error(
"Failed to upload to S3 bucket (attempt " f"#{attempt}): {exc!r}"
)
status = f"process ended in error: {exc!r}"
Expand Down
Loading

0 comments on commit f9a0739

Please sign in to comment.