Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raw ERA5 data automatically downloading on a cron. #36

Merged
merged 8 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions raw/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
ARG py_version=3.8
FROM continuumio/miniconda3:latest

# Update miniconda
RUN conda update conda -y

# Add the mamba solver for faster builds
RUN conda install -n base conda-libmamba-solver
RUN conda config --set solver libmamba

# Create conda env using environment.yml
ARG weather_tools_git_rev=main
RUN git clone https://github.com/google/weather-tools.git /weather
WORKDIR /weather
RUN git checkout "${weather_tools_git_rev}"
RUN conda env create -f environment.yml --debug

# Activate the conda env and update the PATH
ARG CONDA_ENV_NAME=weather-tools
RUN echo "source activate ${CONDA_ENV_NAME}" >> ~/.bashrc
ENV PATH /opt/conda/envs/${CONDA_ENV_NAME}/bin:$PATH
RUN pip install -e .
# TODO(#368): Add google-cloud-secret-manager in weather-tools
RUN pip install google-cloud-secret-manager==2.0.0

COPY *.cfg config_files/
COPY fetch.py config_files/

# Set the entrypoint to Apache Beam SDK launcher.
ENTRYPOINT ["python", "config_files/fetch.py"]
39 changes: 38 additions & 1 deletion raw/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,41 @@ _Steps_:
3. Repeat this process, except change the dataset to `pcp`:
```
export DATASET=pcp
```
```

## Downloading raw data from Copernicus automatically on monthly basis using of [Cloud-Run](https://cloud.google.com/run)

All data can be ingested from [Copernicus](https://cds.climate.copernicus.eu/#!/home) with `google-weather-tools`,
specifically `weather-dl` (see [weather-tools.readthedocs.io](https://weather-tools.readthedocs.io/)).

_Pre-requisites_:

1. Set up a Cloud project with sufficient permissions to use cloud storage (such as
[GCS](https://cloud.google.com/storage)) and a Beam runner (such as [Dataflow](https://cloud.google.com/dataflow)).
> Note: Other cloud systems should work too, such as S3 and Elastic Map Reduce. However, these are untested. If you
> experience an error here, please let us know by [filing an issue](https://github.com/google/weather-tools/issues).

2. Acquire one or more licenses from [Copernicus](https://cds.climate.copernicus.eu/user/register?destination=/api-how-to).
> Recommended: Download configs allow users to specify multiple API keys in a single data request via
> ["parameter subsections"](https://weather-tools.readthedocs.io/en/latest/Configuration.html#subsections). We
> highly recommend that institutions pool licenses together for faster downloads.

3. Create a docker image from the docker file of the [current directory](https://github.com/google-research/arco-era5/tree/main/raw) and push that image in the [GCR](https://cloud.google.com/artifact-registry).
> Reference: https://github.com/google/weather-tools/blob/main/Runtime-Container.md

4. Add the all CDS licenses into the [secret-manager](https://cloud.google.com/secret-manager) with value likes this: {"api_url": "URL", "api_key": "KEY"}
> NOTE: for every API_KEY there must be unique secret-key.

5. Create a new job in [Cloud-Run](https://cloud.google.com/run) using of the above docker image with this **ENV** variables.
* `PROJECT`
* `REGION`
* `BUCKET`
* `SDK_CONTAINER_IMAGE`
* `MANIFEST_LOCATION`
* `API_KEY_*`

Here, API_KEY_* is access of [secret-manager key](https://cloud.google.com/secret-manager) and it's value is looks like this :: projects/PROJECT_NAME/secrets/SECRET_KEY_NAME/versions/1

NOTE: API_KEY is must follow this format: `API_KEY_*`. here * is any value.

6. Execute the above job with the `Scheduler trigger` of the month with frequency of `* * 1 * *`.
1 change: 0 additions & 1 deletion raw/era5_pl_hourly.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ partition_keys=
variable
pressure_level

api_url=https://cds.climate.copernicus.eu/api/v2

# go/valentine
# [parameters.a]
Expand Down
1 change: 0 additions & 1 deletion raw/era5_sl_hourly.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ partition_keys=
date
variable

api_url=https://cds.climate.copernicus.eu/api/v2

# go/valentine
# [parameters.a]
Expand Down
277 changes: 277 additions & 0 deletions raw/fetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import configparser
import datetime
import json
import logging
import os
import re
import subprocess
import signal
import typing as t

from google.cloud import secretmanager

DIRECTORY = "/weather/config_files"
FIELD_NAME = "date"
PROJECT = os.environ.get("PROJECT")
REGION = os.environ.get("REGION")
BUCKET = os.environ.get("BUCKET")
SDK_CONTAINER_IMAGE = os.environ.get("SDK_CONTAINER_IMAGE")
MANIFEST_LOCATION = os.environ.get("MANIFEST_LOCATION")
API_KEY_PATTERN = re.compile(r'^API_KEY_\d+$')
API_KEY_LIST = []

logger = logging.getLogger(__name__)


class ConfigArgs(t.TypedDict):
dabhicusp marked this conversation as resolved.
Show resolved Hide resolved
"""A class representing the configuration arguments for the new_config_file
function.

Attributes:
co_file (bool): True if the configuration file is a 'CO' type file, False
otherwise.
single_level_file (bool): True if the configuration file contains 'sfc' in
filename, False otherwise.
first_day_first_prev (datetime.date): The first day of the first previous month.
last_day_first_prev (datetime.date): The last day of the first previous month.
first_day_third_prev (datetime.date): The first day of the third previous month.
last_day_third_prev (datetime.date): The last day of the third previous month.
sl_year (str): The year of the third previous month in 'YYYY' format.
sl_month (str): The month of the third previous month in 'MM' format.
"""
co_file: bool
single_level_file: bool
first_day_first_prev: datetime.date
last_day_first_prev: datetime.date
first_day_third_prev: datetime.date
last_day_third_prev: datetime.date
sl_year: str
sl_month: str


class MonthDates(t.TypedDict):
"""A class representing the first and third previous month's dates.

Attributes:
first_day_first_prev (datetime.date): The first day of the first previous month.
last_day_first_prev (datetime.date): The last day of the first previous month.
first_day_third_prev (datetime.date): The first day of the third previous month.
last_day_third_prev (datetime.date): The last day of the third previous month.
sl_year (str): The year of the third previous month in 'YYYY' format.
sl_month (str): The month of the third previous month in 'MM' format.
"""
first_day_first_prev: datetime.date
last_day_first_prev: datetime.date
first_day_third_prev: datetime.date
last_day_third_prev: datetime.date
sl_year: str
sl_month: str


def new_config_file(config_file: str, field_name: str, additional_content: str,
config_args: ConfigArgs) -> None:
"""Modify the specified configuration file with new values.

Parameters:
config_file (str): The path to the configuration file to be modified.
field_name (str): The name of the field to be updated with the new value.
additional_content (str): The additional content to be added under the
'[selection]' section.
config_args (ConfigArgs): A dictionary containing the configuration arguments
as key-value pairs.
"""

# Unpack the values from config_args dictionary
co_file = config_args["co_file"]
single_level_file = config_args["single_level_file"]
first_day_first_prev = config_args["first_day_first_prev"]
last_day_first_prev = config_args["last_day_first_prev"]
first_day_third_prev = config_args["first_day_third_prev"]
last_day_third_prev = config_args["last_day_third_prev"]
sl_year = config_args["sl_year"]
sl_month = config_args["sl_month"]

config = configparser.ConfigParser()
config.read(config_file)

if single_level_file:
config.set("selection", "year", sl_year)
config.set("selection", "month", sl_month)
config.set("selection", "day", "all")
else:
if co_file:
config.set("selection", field_name,
f"{first_day_first_prev}/to/{last_day_first_prev}")
else:
config.set("selection", field_name,
f"{first_day_third_prev}/to/{last_day_third_prev}")

sections_list = additional_content.split("\n\n")
for section in sections_list[:-1]:
dabhicusp marked this conversation as resolved.
Show resolved Hide resolved
sections = section.split("\n")
new_section_name = sections[0].strip()
config.add_section(new_section_name)
api_url_name, api_url_value = sections[1].split("=")
config.set(new_section_name, api_url_name.strip(), api_url_value.strip())
api_key_name, api_key_value = sections[2].split("=")
config.set(new_section_name, api_key_name.strip(), api_key_value.strip())

with open(config_file, "w") as file:
config.write(file, space_around_delimiters=False)


def get_month_range(date: datetime.date) -> t.Tuple[datetime.date, datetime.date]:
"""Return the first and last date of the previous month based on the input date.

Parameters:
date (datetime.date): The input date.

Returns:
tuple: A tuple containing the first and last date of the month as
datetime.date objects.
"""
last_day = date.replace(day=1) - datetime.timedelta(days=1)
first_day = last_day.replace(day=1)
return first_day, last_day


def get_previous_month_dates() -> MonthDates:
"""Return a dictionary containing the first and third previous month's dates from
the current date.

Returns:
dict: A dictionary containing the following key-value pairs:
- 'first_day_first_prev': The first day of the first previous month
(datetime.date).
- 'last_day_first_prev': The last day of the first previous month
(datetime.date).
- 'first_day_third_prev': The first day of the third previous month
(datetime.date).
- 'last_day_third_prev': The last day of the third previous month
(datetime.date).
- 'sl_year': The year of the third previous month in 'YYYY' format (str).
- 'sl_month': The month of the third previous month in 'MM' format (str).
"""

today = datetime.date.today()
# Calculate the correct previous month considering months from 1 to 12
prev_month = today.month + 10 if today.month < 3 else today.month - 2
third_prev_month = today.replace(month=prev_month)
first_prev_month = today.replace(month=today.month)
first_day_third_prev, last_day_third_prev = get_month_range(third_prev_month)
first_day_first_prev, last_day_first_prev = get_month_range(first_prev_month)
first_date_third_prev = first_day_third_prev
sl_year, sl_month = str(first_date_third_prev)[:4], str(first_date_third_prev)[5:7]

return {
'first_day_first_prev': first_day_first_prev,
'last_day_first_prev': last_day_first_prev,
'first_day_third_prev': first_day_third_prev,
'last_day_third_prev': last_day_third_prev,
'sl_year': sl_year,
'sl_month': sl_month,
}


def update_config_files(directory: str, field_name: str,
additional_content: str) -> None:
"""Update the configuration files in the specified directory.

Parameters:
directory (str): The path to the directory containing the configuration files.
field_name (str): The name of the field to be updated with the new value.
additional_content (str): The additional content to be added under the
'[selection]' section.
"""
dates_data = get_previous_month_dates()
config_args = {
"first_day_first_prev": dates_data['first_day_first_prev'],
"last_day_first_prev": dates_data['last_day_first_prev'],
"first_day_third_prev": dates_data['first_day_third_prev'],
"last_day_third_prev": dates_data['last_day_third_prev'],
"sl_year": dates_data['sl_year'],
"sl_month": dates_data['sl_month'],
}
for filename in os.listdir(directory):
config_args['single_level_file'] = config_args['co_file'] = False
if filename.endswith(".cfg"):
if "sfc" in filename:
config_args["single_level_file"] = True
if "hourly" in filename:
config_args["co_file"] = True
config_file = os.path.join(directory, filename)
# Pass the data as keyword arguments to the new_config_file function
new_config_file(config_file, field_name, additional_content,
dabhicusp marked this conversation as resolved.
Show resolved Hide resolved
config_args=config_args)


def get_secret(secret_key: str) -> dict:
"""Retrieve the secret value from the Google Cloud Secret Manager.

Parameters:
api_key (str): The name or identifier of the secret in the Google
Cloud Secret Manager.

Returns:
dict: A dictionary containing the retrieved secret data.
"""
client = secretmanager.SecretManagerServiceClient()
response = client.access_secret_version(request={"name": secret_key})
payload = response.payload.data.decode("UTF-8")
secret_dict = json.loads(payload)
return secret_dict


if __name__ == "__main__":
for env_var in os.environ:
if API_KEY_PATTERN.match(env_var):
api_key_value = os.environ.get(env_var)
API_KEY_LIST.append(api_key_value)

additional_content = ""
for count, secret_key in enumerate(API_KEY_LIST):
secret_key_value = get_secret(secret_key)
additional_content += f'parameters.api{count}\n\
api_url={secret_key_value["api_url"]}\napi_key={secret_key_value["api_key"]}\n\n'

current_day = datetime.date.today()
job_name = f"wx-dl-arco-era5-{current_day.month}-{current_day.year}"
# TODO(#373): Update the command once `--async` keyword added in weather-dl.
command = (
dabhicusp marked this conversation as resolved.
Show resolved Hide resolved
f'python weather_dl/weather-dl /weather/config_files/*.cfg --runner '
f'DataflowRunner --project {PROJECT} --region {REGION} --temp_location '
f'"gs://{BUCKET}/tmp/" --disk_size_gb 260 --job_name {job_name} '
f'--sdk_container_image {SDK_CONTAINER_IMAGE} '
f'--manifest-location {MANIFEST_LOCATION} --experiment use_runner_v2'
)

update_config_files(DIRECTORY, FIELD_NAME, additional_content)
stop_on_log = 'JOB_STATE_RUNNING'
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
with process.stdout:
try:
for line in iter(process.stdout.readline, b''):
log_message = line.decode("utf-8").strip()
print(log_message)
if stop_on_log in log_message:
print(f'Stopping subprocess as {stop_on_log}')
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
except subprocess.CalledProcessError as e:
logger.error(
f'Failed to execute dataflow job due to {e.stderr.decode("utf-8")}'
)
Loading