diff --git a/raw/Dockerfile b/raw/Dockerfile new file mode 100644 index 0000000..004d88e --- /dev/null +++ b/raw/Dockerfile @@ -0,0 +1,44 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +ARG py_version=3.8 +FROM continuumio/miniconda3:latest + +# Update miniconda +RUN conda update conda -y + +# Add the mamba solver for faster builds +RUN conda install -n base conda-libmamba-solver +RUN conda config --set solver libmamba + +# Create conda env using environment.yml +ARG weather_tools_git_rev=main +RUN git clone https://github.com/google/weather-tools.git /weather +WORKDIR /weather +RUN git checkout "${weather_tools_git_rev}" +RUN conda env create -f environment.yml --debug + +# Activate the conda env and update the PATH +ARG CONDA_ENV_NAME=weather-tools +RUN echo "source activate ${CONDA_ENV_NAME}" >> ~/.bashrc +ENV PATH /opt/conda/envs/${CONDA_ENV_NAME}/bin:$PATH +RUN pip install -e . +# TODO(#368): Add google-cloud-secret-manager in weather-tools +RUN pip install google-cloud-secret-manager==2.0.0 + +COPY *.cfg config_files/ +COPY fetch.py config_files/ + +# Set the entrypoint to Apache Beam SDK launcher. +ENTRYPOINT ["python", "config_files/fetch.py"] diff --git a/raw/README.md b/raw/README.md index f6f8608..3b1fcf9 100644 --- a/raw/README.md +++ b/raw/README.md @@ -116,4 +116,41 @@ _Steps_: 3. Repeat this process, except change the dataset to `pcp`: ``` export DATASET=pcp - ``` \ No newline at end of file + ``` + +## Downloading raw data from Copernicus automatically on monthly basis using of [Cloud-Run](https://cloud.google.com/run) + +All data can be ingested from [Copernicus](https://cds.climate.copernicus.eu/#!/home) with `google-weather-tools`, +specifically `weather-dl` (see [weather-tools.readthedocs.io](https://weather-tools.readthedocs.io/)). + +_Pre-requisites_: + +1. Set up a Cloud project with sufficient permissions to use cloud storage (such as + [GCS](https://cloud.google.com/storage)) and a Beam runner (such as [Dataflow](https://cloud.google.com/dataflow)). + > Note: Other cloud systems should work too, such as S3 and Elastic Map Reduce. However, these are untested. If you + > experience an error here, please let us know by [filing an issue](https://github.com/google/weather-tools/issues). + +2. Acquire one or more licenses from [Copernicus](https://cds.climate.copernicus.eu/user/register?destination=/api-how-to). + > Recommended: Download configs allow users to specify multiple API keys in a single data request via + > ["parameter subsections"](https://weather-tools.readthedocs.io/en/latest/Configuration.html#subsections). We + > highly recommend that institutions pool licenses together for faster downloads. + +3. Create a docker image from the docker file of the [current directory](https://github.com/google-research/arco-era5/tree/main/raw) and push that image in the [GCR](https://cloud.google.com/artifact-registry). + > Reference: https://github.com/google/weather-tools/blob/main/Runtime-Container.md + +4. Add the all CDS licenses into the [secret-manager](https://cloud.google.com/secret-manager) with value likes this: {"api_url": "URL", "api_key": "KEY"} + > NOTE: for every API_KEY there must be unique secret-key. + +5. Create a new job in [Cloud-Run](https://cloud.google.com/run) using of the above docker image with this **ENV** variables. + * `PROJECT` + * `REGION` + * `BUCKET` + * `SDK_CONTAINER_IMAGE` + * `MANIFEST_LOCATION` + * `API_KEY_*` + + Here, API_KEY_* is access of [secret-manager key](https://cloud.google.com/secret-manager) and it's value is looks like this :: projects/PROJECT_NAME/secrets/SECRET_KEY_NAME/versions/1 + + NOTE: API_KEY is must follow this format: `API_KEY_*`. here * is any value. + +6. Execute the above job with the `Scheduler trigger` of the month with frequency of `* * 1 * *`. diff --git a/raw/era5_pl_hourly.cfg b/raw/era5_pl_hourly.cfg index 8585c37..d6a6566 100644 --- a/raw/era5_pl_hourly.cfg +++ b/raw/era5_pl_hourly.cfg @@ -22,7 +22,6 @@ partition_keys= variable pressure_level -api_url=https://cds.climate.copernicus.eu/api/v2 # go/valentine # [parameters.a] diff --git a/raw/era5_sl_hourly.cfg b/raw/era5_sl_hourly.cfg index b04c352..48c43e1 100644 --- a/raw/era5_sl_hourly.cfg +++ b/raw/era5_sl_hourly.cfg @@ -21,7 +21,6 @@ partition_keys= date variable -api_url=https://cds.climate.copernicus.eu/api/v2 # go/valentine # [parameters.a] diff --git a/raw/fetch.py b/raw/fetch.py new file mode 100644 index 0000000..415d11b --- /dev/null +++ b/raw/fetch.py @@ -0,0 +1,277 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +import configparser +import datetime +import json +import logging +import os +import re +import subprocess +import signal +import typing as t + +from google.cloud import secretmanager + +DIRECTORY = "/weather/config_files" +FIELD_NAME = "date" +PROJECT = os.environ.get("PROJECT") +REGION = os.environ.get("REGION") +BUCKET = os.environ.get("BUCKET") +SDK_CONTAINER_IMAGE = os.environ.get("SDK_CONTAINER_IMAGE") +MANIFEST_LOCATION = os.environ.get("MANIFEST_LOCATION") +API_KEY_PATTERN = re.compile(r'^API_KEY_\d+$') +API_KEY_LIST = [] + +logger = logging.getLogger(__name__) + + +class ConfigArgs(t.TypedDict): + """A class representing the configuration arguments for the new_config_file + function. + + Attributes: + co_file (bool): True if the configuration file is a 'CO' type file, False + otherwise. + single_level_file (bool): True if the configuration file contains 'sfc' in + filename, False otherwise. + first_day_first_prev (datetime.date): The first day of the first previous month. + last_day_first_prev (datetime.date): The last day of the first previous month. + first_day_third_prev (datetime.date): The first day of the third previous month. + last_day_third_prev (datetime.date): The last day of the third previous month. + sl_year (str): The year of the third previous month in 'YYYY' format. + sl_month (str): The month of the third previous month in 'MM' format. + """ + co_file: bool + single_level_file: bool + first_day_first_prev: datetime.date + last_day_first_prev: datetime.date + first_day_third_prev: datetime.date + last_day_third_prev: datetime.date + sl_year: str + sl_month: str + + +class MonthDates(t.TypedDict): + """A class representing the first and third previous month's dates. + + Attributes: + first_day_first_prev (datetime.date): The first day of the first previous month. + last_day_first_prev (datetime.date): The last day of the first previous month. + first_day_third_prev (datetime.date): The first day of the third previous month. + last_day_third_prev (datetime.date): The last day of the third previous month. + sl_year (str): The year of the third previous month in 'YYYY' format. + sl_month (str): The month of the third previous month in 'MM' format. + """ + first_day_first_prev: datetime.date + last_day_first_prev: datetime.date + first_day_third_prev: datetime.date + last_day_third_prev: datetime.date + sl_year: str + sl_month: str + + +def new_config_file(config_file: str, field_name: str, additional_content: str, + config_args: ConfigArgs) -> None: + """Modify the specified configuration file with new values. + + Parameters: + config_file (str): The path to the configuration file to be modified. + field_name (str): The name of the field to be updated with the new value. + additional_content (str): The additional content to be added under the + '[selection]' section. + config_args (ConfigArgs): A dictionary containing the configuration arguments + as key-value pairs. + """ + + # Unpack the values from config_args dictionary + co_file = config_args["co_file"] + single_level_file = config_args["single_level_file"] + first_day_first_prev = config_args["first_day_first_prev"] + last_day_first_prev = config_args["last_day_first_prev"] + first_day_third_prev = config_args["first_day_third_prev"] + last_day_third_prev = config_args["last_day_third_prev"] + sl_year = config_args["sl_year"] + sl_month = config_args["sl_month"] + + config = configparser.ConfigParser() + config.read(config_file) + + if single_level_file: + config.set("selection", "year", sl_year) + config.set("selection", "month", sl_month) + config.set("selection", "day", "all") + else: + if co_file: + config.set("selection", field_name, + f"{first_day_first_prev}/to/{last_day_first_prev}") + else: + config.set("selection", field_name, + f"{first_day_third_prev}/to/{last_day_third_prev}") + + sections_list = additional_content.split("\n\n") + for section in sections_list[:-1]: + sections = section.split("\n") + new_section_name = sections[0].strip() + config.add_section(new_section_name) + api_url_name, api_url_value = sections[1].split("=") + config.set(new_section_name, api_url_name.strip(), api_url_value.strip()) + api_key_name, api_key_value = sections[2].split("=") + config.set(new_section_name, api_key_name.strip(), api_key_value.strip()) + + with open(config_file, "w") as file: + config.write(file, space_around_delimiters=False) + + +def get_month_range(date: datetime.date) -> t.Tuple[datetime.date, datetime.date]: + """Return the first and last date of the previous month based on the input date. + + Parameters: + date (datetime.date): The input date. + + Returns: + tuple: A tuple containing the first and last date of the month as + datetime.date objects. + """ + last_day = date.replace(day=1) - datetime.timedelta(days=1) + first_day = last_day.replace(day=1) + return first_day, last_day + + +def get_previous_month_dates() -> MonthDates: + """Return a dictionary containing the first and third previous month's dates from + the current date. + + Returns: + dict: A dictionary containing the following key-value pairs: + - 'first_day_first_prev': The first day of the first previous month + (datetime.date). + - 'last_day_first_prev': The last day of the first previous month + (datetime.date). + - 'first_day_third_prev': The first day of the third previous month + (datetime.date). + - 'last_day_third_prev': The last day of the third previous month + (datetime.date). + - 'sl_year': The year of the third previous month in 'YYYY' format (str). + - 'sl_month': The month of the third previous month in 'MM' format (str). + """ + + today = datetime.date.today() + # Calculate the correct previous month considering months from 1 to 12 + prev_month = today.month + 10 if today.month < 3 else today.month - 2 + third_prev_month = today.replace(month=prev_month) + first_prev_month = today.replace(month=today.month) + first_day_third_prev, last_day_third_prev = get_month_range(third_prev_month) + first_day_first_prev, last_day_first_prev = get_month_range(first_prev_month) + first_date_third_prev = first_day_third_prev + sl_year, sl_month = str(first_date_third_prev)[:4], str(first_date_third_prev)[5:7] + + return { + 'first_day_first_prev': first_day_first_prev, + 'last_day_first_prev': last_day_first_prev, + 'first_day_third_prev': first_day_third_prev, + 'last_day_third_prev': last_day_third_prev, + 'sl_year': sl_year, + 'sl_month': sl_month, + } + + +def update_config_files(directory: str, field_name: str, + additional_content: str) -> None: + """Update the configuration files in the specified directory. + + Parameters: + directory (str): The path to the directory containing the configuration files. + field_name (str): The name of the field to be updated with the new value. + additional_content (str): The additional content to be added under the + '[selection]' section. + """ + dates_data = get_previous_month_dates() + config_args = { + "first_day_first_prev": dates_data['first_day_first_prev'], + "last_day_first_prev": dates_data['last_day_first_prev'], + "first_day_third_prev": dates_data['first_day_third_prev'], + "last_day_third_prev": dates_data['last_day_third_prev'], + "sl_year": dates_data['sl_year'], + "sl_month": dates_data['sl_month'], + } + for filename in os.listdir(directory): + config_args['single_level_file'] = config_args['co_file'] = False + if filename.endswith(".cfg"): + if "sfc" in filename: + config_args["single_level_file"] = True + if "hourly" in filename: + config_args["co_file"] = True + config_file = os.path.join(directory, filename) + # Pass the data as keyword arguments to the new_config_file function + new_config_file(config_file, field_name, additional_content, + config_args=config_args) + + +def get_secret(secret_key: str) -> dict: + """Retrieve the secret value from the Google Cloud Secret Manager. + + Parameters: + api_key (str): The name or identifier of the secret in the Google + Cloud Secret Manager. + + Returns: + dict: A dictionary containing the retrieved secret data. + """ + client = secretmanager.SecretManagerServiceClient() + response = client.access_secret_version(request={"name": secret_key}) + payload = response.payload.data.decode("UTF-8") + secret_dict = json.loads(payload) + return secret_dict + + +if __name__ == "__main__": + for env_var in os.environ: + if API_KEY_PATTERN.match(env_var): + api_key_value = os.environ.get(env_var) + API_KEY_LIST.append(api_key_value) + + additional_content = "" + for count, secret_key in enumerate(API_KEY_LIST): + secret_key_value = get_secret(secret_key) + additional_content += f'parameters.api{count}\n\ + api_url={secret_key_value["api_url"]}\napi_key={secret_key_value["api_key"]}\n\n' + + current_day = datetime.date.today() + job_name = f"wx-dl-arco-era5-{current_day.month}-{current_day.year}" + # TODO(#373): Update the command once `--async` keyword added in weather-dl. + command = ( + f'python weather_dl/weather-dl /weather/config_files/*.cfg --runner ' + f'DataflowRunner --project {PROJECT} --region {REGION} --temp_location ' + f'"gs://{BUCKET}/tmp/" --disk_size_gb 260 --job_name {job_name} ' + f'--sdk_container_image {SDK_CONTAINER_IMAGE} ' + f'--manifest-location {MANIFEST_LOCATION} --experiment use_runner_v2' + ) + + update_config_files(DIRECTORY, FIELD_NAME, additional_content) + stop_on_log = 'JOB_STATE_RUNNING' + process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + with process.stdout: + try: + for line in iter(process.stdout.readline, b''): + log_message = line.decode("utf-8").strip() + print(log_message) + if stop_on_log in log_message: + print(f'Stopping subprocess as {stop_on_log}') + os.killpg(os.getpgid(process.pid), signal.SIGTERM) + except subprocess.CalledProcessError as e: + logger.error( + f'Failed to execute dataflow job due to {e.stderr.decode("utf-8")}' + ) diff --git a/raw/fetch_test.py b/raw/fetch_test.py new file mode 100644 index 0000000..d7051b4 --- /dev/null +++ b/raw/fetch_test.py @@ -0,0 +1,147 @@ +import configparser +import datetime +import json +import os +import tempfile +import unittest + +from unittest.mock import patch, MagicMock + +from fetch import ( + new_config_file, + get_month_range, + get_previous_month_dates, + update_config_files, + get_secret, +) + + +class TestFetchFunctions(unittest.TestCase): + def setUp(self): + # Create a temporary directory for testing + self.temp_dir = tempfile.TemporaryDirectory() + self.config_file = os.path.join(self.temp_dir.name, "test_config.cfg") + with open(self.config_file, "w") as file: + file.write( + "[parameters]\nclient=cds\ndataset=reanalysis-era5-complete\n\ + target_path=gs://gcp-public-data-arco-era5/raw/ERA5GRIB/HRES\ + /Daily/{date:%%Y/%%Y%%m%d}_hres_dve.grb2\npartition_keys=\n\t\ + dates\n\n[selection]\nclass=ea\nstream=oper\nexpver=1\ntype=an\n\ + levtype=ml\nlevelist=1/to/137\ndate=1979-01-01/to/2023-03-31\n\ + time=00/to/23\nparam=138/155\n" + ) + self.config_args = { + "first_day_first_prev": datetime.date(2023, 7, 1), + "last_day_first_prev": datetime.date(2023, 7, 31), + "first_day_third_prev": datetime.date(2023, 5, 1), + "last_day_third_prev": datetime.date(2023, 5, 31), + "sl_year": "2023", + "sl_month": "05", + 'single_level_file': False, + 'co_file': False + } + self.additional_content = "[parameters.test]\napi_url=test_url\napi_key=\ + test_key\n\n" + + def tearDown(self): + os.remove(self.config_file) + os.rmdir(self.temp_dir.name) + self.temp_dir.cleanup() + + def test_new_config_file(self): + section_name = 'parameters.test' + section_api_url = 'test_url' + section_api_key = 'test_key' + additional_content = f'{section_name}\napi_url={section_api_url}\n\ + api_key={section_api_key}\n\n' + + new_config_file(self.config_file, "date", additional_content, self.config_args) + + config = configparser.ConfigParser() + config.read(self.config_file) + self.assertIn(section_name, config.sections()) + self.assertEqual( + config.get("selection", "date"), + f'{self.config_args["first_day_third_prev"]}/to/{self.config_args["last_day_third_prev"]}', + ) + self.assertEqual(config.get(section_name, 'api_url'), + section_api_url) + self.assertEqual(config.get(section_name, 'api_key'), + section_api_key) + + def test_new_config_file_with_co_file(self): + self.config_args["co_file"] = True + self.config_args["single_level_file"] = False + + new_config_file( + self.config_file, "date", self.additional_content, self.config_args) + + config = configparser.ConfigParser() + config.read(self.config_file) + + self.assertEqual( + config.get('selection', 'date'), + f'{self.config_args["first_day_first_prev"]}/to/{self.config_args["last_day_first_prev"]}' + ) + + def test_new_config_file_with_single_level_file(self): + self.config_args["co_file"] = False + self.config_args["single_level_file"] = True + + new_config_file( + self.config_file, "date", self.additional_content, self.config_args) + + config = configparser.ConfigParser() + config.read(self.config_file) + + self.assertEqual(config.get('selection', 'year'), self.config_args["sl_year"]) + self.assertEqual(config.get('selection', 'month'), self.config_args["sl_month"]) + self.assertEqual(config.get('selection', 'day'), 'all') + + def test_get_month_range(self): + # Test get_month_range function + first_day, last_day = get_month_range(datetime.date(2023, 7, 18)) + self.assertEqual(first_day, datetime.date(2023, 6, 1)) + self.assertEqual(last_day, datetime.date(2023, 6, 30)) + + def test_get_previous_month_dates(self): + # Test get_previous_month_dates function + prev_month_data = get_previous_month_dates() + self.assertIn("first_day_first_prev", prev_month_data) + self.assertIn("last_day_first_prev", prev_month_data) + self.assertIn("first_day_third_prev", prev_month_data) + self.assertIn("last_day_third_prev", prev_month_data) + self.assertIn("sl_year", prev_month_data) + self.assertIn("sl_month", prev_month_data) + + def test_update_config_files(self): + # Test update_config_files function + update_config_files( + self.temp_dir.name, "date", self.additional_content) + + @patch("fetch.secretmanager.SecretManagerServiceClient") + def test_get_secret_success(self, mock_secretmanager): + secret_data = { + "api_url": "https://example.com/api", + "api_key": "my_secret_api_key" + } + mock_response = MagicMock() + mock_response.payload.data.decode.return_value = json.dumps(secret_data) + mock_secretmanager.return_value.access_secret_version.return_value = ( + mock_response) + + secret_key = "projects/my-project/secrets/my-secret/versions/latest" + result = get_secret(secret_key) + self.assertEqual(result, secret_data) + + @patch("fetch.secretmanager.SecretManagerServiceClient") + def test_get_secret_failure(self, mock_secretmanager): + mock_secretmanager.return_value.access_secret_version.side_effect = ( + Exception("Error retrieving secret")) + secret_key = "projects/my-project/secrets/my-secret/versions/latest" + with self.assertRaises(Exception): + get_secret(secret_key) + + +if __name__ == "__main__": + unittest.main()