From 9d7223eab67e56dbfbbdc2048a0bd0358db2192d Mon Sep 17 00:00:00 2001 From: dabhicusp Date: Tue, 18 Jul 2023 05:54:25 +0000 Subject: [PATCH 1/8] Script and docker file of data updation is added. --- raw/Dockerfile | 44 ++++++++++++++ raw/era5_pl_hourly.cfg | 1 - raw/era5_sl_hourly.cfg | 1 - raw/fetch.py | 126 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 170 insertions(+), 2 deletions(-) create mode 100644 raw/Dockerfile create mode 100644 raw/fetch.py diff --git a/raw/Dockerfile b/raw/Dockerfile new file mode 100644 index 0000000..d3f25c3 --- /dev/null +++ b/raw/Dockerfile @@ -0,0 +1,44 @@ + +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +ARG py_version=3.8 +FROM continuumio/miniconda3:latest + +# Update miniconda +RUN conda update conda -y + +# Add the mamba solver for faster builds +RUN conda install -n base conda-libmamba-solver +RUN conda config --set solver libmamba + +# Create conda env using environment.yml +ARG weather_tools_git_rev=main +RUN git clone https://github.com/google/weather-tools.git /weather +RUN pip install google-cloud-secret-manager==2.0.0 + +WORKDIR /weather +RUN git checkout "${weather_tools_git_rev}" +RUN conda env create -f environment.yml --debug + +# Activate the conda env and update the PATH +ARG CONDA_ENV_NAME=weather-tools +RUN echo "source activate ${CONDA_ENV_NAME}" >> ~/.bashrc +ENV PATH /opt/conda/envs/${CONDA_ENV_NAME}/bin:$PATH +RUN pip install -e . + +COPY . . + +# Set the entrypoint to Apache Beam SDK launcher. +ENTRYPOINT ["python", "fetch.py"] \ No newline at end of file diff --git a/raw/era5_pl_hourly.cfg b/raw/era5_pl_hourly.cfg index 8585c37..d6a6566 100644 --- a/raw/era5_pl_hourly.cfg +++ b/raw/era5_pl_hourly.cfg @@ -22,7 +22,6 @@ partition_keys= variable pressure_level -api_url=https://cds.climate.copernicus.eu/api/v2 # go/valentine # [parameters.a] diff --git a/raw/era5_sl_hourly.cfg b/raw/era5_sl_hourly.cfg index b04c352..48c43e1 100644 --- a/raw/era5_sl_hourly.cfg +++ b/raw/era5_sl_hourly.cfg @@ -21,7 +21,6 @@ partition_keys= date variable -api_url=https://cds.climate.copernicus.eu/api/v2 # go/valentine # [parameters.a] diff --git a/raw/fetch.py b/raw/fetch.py new file mode 100644 index 0000000..f07412d --- /dev/null +++ b/raw/fetch.py @@ -0,0 +1,126 @@ +import datetime +import json +import os +import subprocess +from google.cloud import secretmanager + +def new_config_file(config_file, field_name, additional_content, co_file, + single_level_file, first_day_first_prev, last_day_first_prev, + first_day_third_prev, last_day_third_prev, sl_year, sl_month, + sl_first_date, sl_last_date): + '''Modified the config file.''' + + with open(config_file, 'r') as file: + lines = file.readlines() + + # Update the specified field with the new value + updated_lines = [] + selection_line_found = False + for line in lines: + if not selection_line_found and line.strip() == '[selection]': + updated_lines.append(f'{additional_content}\n') + selection_line_found = True + + if single_level_file: + if line.startswith('year'): + line = f'year={sl_year}\n' + if line.startswith("month"): + line = f'month={sl_month}\n' + if line.startswith("day"): + line = f'day={sl_first_date}/to/{sl_last_date}\n' + elif line.startswith(field_name): + if co_file: + line = f'{field_name}={first_day_first_prev}/to/{last_day_first_prev}\n' + else: + line = f'{field_name}={first_day_third_prev}/to/{last_day_third_prev}\n' + updated_lines.append(line) + + with open(config_file, 'w') as file: + file.writelines(updated_lines) + +def get_month_range(date): + '''Return the first and last date of the month from the input date.''' + last_day = date.replace(day=1) - datetime.timedelta(days=1) + first_day = last_day.replace(day=1) + return first_day, last_day + +def get_single_level_dates(first_day, last_day): + '''Return the third previous month's year,month,first day and last day.''' + year, month = str(first_day)[:4], str(first_day)[5:7] + first_date, last_date = str(first_day)[8:], str(last_day)[8:] + return (year, month, first_date, last_date) + +def get_previous_month_dates(): + '''Return the first and third previous month's date from the Today's date. ''' + today = datetime.date.today() + prev_month = today.month + 10 if today.month < 3 else today.month - 2 + third_prev_month = today.replace(month=prev_month) + first_prev_month = today.replace(month=today.month) + first_day_third_prev, last_day_third_prev = get_month_range(third_prev_month) + first_day_first_prev, last_day_first_prev = get_month_range(first_prev_month) + + sl_year, sl_month, sl_first_date, sl_last_date = get_single_level_dates( + first_day_third_prev, last_day_third_prev) + + return (first_day_first_prev, last_day_first_prev, first_day_third_prev, + last_day_third_prev, sl_year, sl_month, sl_first_date, sl_last_date) + +def update_config_files(directory, field_name, additional_content): + '''Update the config file.''' + (first_day_first_prev, last_day_first_prev, first_day_third_prev, + last_day_third_prev, sl_year, sl_month, sl_first_date, + sl_last_date)= get_previous_month_dates() + + for filename in os.listdir(directory): + single_level_file = False + co_file = False + if filename.endswith('.cfg'): + if 'sfc' in filename: + single_level_file = True + if 'hourly' in filename: + co_file = True + config_file = os.path.join(directory, filename) + new_config_file(config_file, field_name, additional_content, co_file, + single_level_file, first_day_first_prev, last_day_first_prev, + first_day_third_prev, last_day_third_prev, + sl_year, sl_month, sl_first_date, sl_last_date) + +def get_secret(api_key: str): + '''Function to retrieve the secret value from the google cloud.''' + client = secretmanager.SecretManagerServiceClient() + response = client.access_secret_version(request={"name": api_key}) + payload = response.payload.data.decode("UTF-8") + secret_dict = json.loads(payload) + return secret_dict + +DIRECTORY = '/weather' +FIELD_NAME = 'date' + +current_day = datetime.date.today() +Job_name = f'wx-dl-arco-era5_{current_day.month}_{current_day.year}' + +PROJECT = os.environ.get('PROJECT') +REGION = os.environ.get('REGION') +BUCKET = os.environ.get('BUCKET') +TOPIC_PATH = os.environ.get('TOPIC_PATH') +api_key1 = os.environ.get('api_key_1') +api_key2 = os.environ.get('api_key_2') + +api_key_1 = get_secret(api_key1) +api_key_2 = get_secret(api_key2) + +additional_content = f'[parameters.key1]\napi_url={api_key_1["api_url"]}\napi_key={api_key_1["api_key"]}\n\n[parameters.key2]\napi_url={api_key_2["api_url"]}\napi_key={api_key_2["api_key"]}\n' + +update_config_files(DIRECTORY, FIELD_NAME, additional_content) + +command = f'python "weather_dl/weather-dl" /weather/*.cfg --runner DataflowRunner --project {PROJECT} --region {REGION} \ +--temp_location "gs://{BUCKET}/tmp/" --disk_size_gb 260 --job_name {Job_name} \ +--sdk_container_image "gcr.io/grid-intelligence-sandbox/miniconda3-beam:weather-tools-with-aria2" \ +--manifest-location fs://manifest?projectId=anthromet-ingestion --topic-path {TOPIC_PATH} --experiment use_runner_v2' + +try: + subprocess.run(command, shell=True, check=True, capture_output=True) +except subprocess.CalledProcessError as e: + print(f'Failed to execute dataflow job due to {e.stderr.decode("utf-8")}') + + From 154864421c6d7c898e4a01d5914b2d19b249e2db Mon Sep 17 00:00:00 2001 From: dabhicusp Date: Mon, 24 Jul 2023 12:21:51 +0000 Subject: [PATCH 2/8] Raw ERA5 data automatically downloading. --- raw/Dockerfile | 11 ++- raw/README.md | 39 +++++++- raw/fetch.py | 242 ++++++++++++++++++++++++++++++++++--------------- 3 files changed, 214 insertions(+), 78 deletions(-) diff --git a/raw/Dockerfile b/raw/Dockerfile index d3f25c3..a6cd9b9 100644 --- a/raw/Dockerfile +++ b/raw/Dockerfile @@ -1,5 +1,5 @@ -# Copyright 2022 Google LLC +# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -26,8 +26,6 @@ RUN conda config --set solver libmamba # Create conda env using environment.yml ARG weather_tools_git_rev=main RUN git clone https://github.com/google/weather-tools.git /weather -RUN pip install google-cloud-secret-manager==2.0.0 - WORKDIR /weather RUN git checkout "${weather_tools_git_rev}" RUN conda env create -f environment.yml --debug @@ -37,8 +35,11 @@ ARG CONDA_ENV_NAME=weather-tools RUN echo "source activate ${CONDA_ENV_NAME}" >> ~/.bashrc ENV PATH /opt/conda/envs/${CONDA_ENV_NAME}/bin:$PATH RUN pip install -e . +# TDOO(#): +RUN pip install google-cloud-secret-manager==2.0.0 -COPY . . +COPY *.cfg config_files/ +COPY fetch.py config_files/ # Set the entrypoint to Apache Beam SDK launcher. -ENTRYPOINT ["python", "fetch.py"] \ No newline at end of file +ENTRYPOINT ["python", "config_files/fetch.py"] diff --git a/raw/README.md b/raw/README.md index f6f8608..e32b82b 100644 --- a/raw/README.md +++ b/raw/README.md @@ -116,4 +116,41 @@ _Steps_: 3. Repeat this process, except change the dataset to `pcp`: ``` export DATASET=pcp - ``` \ No newline at end of file + ``` + +## Downloading raw data from Copernicus automatically on monthly basis using of [Cloud-Run](https://cloud.google.com/run) + +All data can be ingested from [Copernicus](https://cds.climate.copernicus.eu/#!/home) with `google-weather-tools`, +specifically `weather-dl` (see [weather-tools.readthedocs.io](https://weather-tools.readthedocs.io/)). + +_Pre-requisites_: + +1. Set up a cloud project with sufficient permissions to use cloud storage (such as + [GCS](https://cloud.google.com/storage)) and a Beam runner (such as [Dataflow](https://cloud.google.com/dataflow)). + > Note: Other cloud systems should work too, such as S3 and Elastic Map Reduce. However, these are untested. If you + > experience an error here, please let us know by [filing an issue](https://github.com/google/weather-tools/issues). + +2. Acquire one or more licenses from [Copernicus](https://cds.climate.copernicus.eu/user/register?destination=/api-how-to). + > Recommended: Download configs allow users to specify multiple API keys in a single data request via + > ["parameter subsections"](https://weather-tools.readthedocs.io/en/latest/Configuration.html#subsections). We + > highly recommend that institutions pool licenses together for faster downloads. + +3. Create a docker image from the docker file of the [current directory](https://github.com/google-research/arco-era5/tree/main/raw) and push that image in the [GCR](https://cloud.google.com/artifact-registry). + > Note: This command will complete the above step. : gcloud builds submit . --tag "gcr.io/PROJECT_NAME/REPOSITORY_NAME:TAG" + +4. Add the all licenses of the cds in the [secret-manager](https://cloud.google.com/secret-manager) with secret value likes this dict: {"api_url": "URL", "api_key": "KEY"} + > NOTE: for every API_key there must be unique secret-key. + +5. Create a new job in [Cloud-Run](https://cloud.google.com/run) using of the above docker image with this **ENV** variables. + * PROJECT + * REGION + * BUCKET + * SDK_CONTAINER_IMAGE + * MANIFEST_LOCATION + * API_KEY_* + + Here, API_KEY_* is access of [secret-manager key](https://cloud.google.com/secret-manager) and it's value is looks like this :: projects/PROJECT_NAME/secrets/SECRET_KEY_NAME/versions/1 + + NOTE: API_KEY is must follow this format: `API_KEY_*`. here * is any value. + +6. Execute the above job with the `Scheduler trigger` of the month with frequency of `* * 1 * *`. diff --git a/raw/fetch.py b/raw/fetch.py index f07412d..1513ef3 100644 --- a/raw/fetch.py +++ b/raw/fetch.py @@ -1,58 +1,139 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== import datetime import json import os +import re import subprocess from google.cloud import secretmanager - -def new_config_file(config_file, field_name, additional_content, co_file, - single_level_file, first_day_first_prev, last_day_first_prev, - first_day_third_prev, last_day_third_prev, sl_year, sl_month, - sl_first_date, sl_last_date): - '''Modified the config file.''' - - with open(config_file, 'r') as file: +from typing import Tuple + +DIRECTORY = "/weather/config_files" +FIELD_NAME = "date" +PROJECT = os.environ.get("PROJECT") +REGION = os.environ.get("REGION") +BUCKET = os.environ.get("BUCKET") +SDK_CONTAINER_IMAGE = os.environ.get("SDK_CONTAINER_IMAGE") +MANIFEST_LOCATION = os.environ.get("MANIFEST_LOCATION") +API_KEY_PATTERN = re.compile(r'^API_KEY_\d+$') +API_KEY_LIST = [] + + +def new_config_file(config_file: str, field_name: str, additional_content: str, + co_file: bool, single_level_file: bool, first_day_first_prev: datetime.date, + last_day_first_prev: datetime.date, first_day_third_prev: datetime.date, + last_day_third_prev: datetime.date, sl_year: str, sl_month: str, + sl_first_date: str, sl_last_date: str) -> None: + """Modify the specified configuration file with new values. + + Parameters: + config_file (str): The path to the configuration file to be modified. + field_name (str): The name of the field to be updated with the new value. + additional_content (str): The additional content to be added under the + '[selection]' section. + co_file (bool): True if the configuration file is a 'hourly' file, False + otherwise. + single_level_file (bool): True if the configuration file is a 'sfc' file, + False otherwise. + first_day_first_prev (datetime.date): The first day of the first previous month. + last_day_first_prev (datetime.date): The last day of the first previous month. + first_day_third_prev (datetime.date): The first day of the third previous month. + last_day_third_prev (datetime.date): The last day of the third previous month. + sl_year (str): The year of the third previous month in 'YYYY' format. + sl_month (str): The month of the third previous month in 'MM' format. + sl_first_date (str): The first date of the third previous month in 'DD' format. + sl_last_date (str): The last date of the third previous month in 'DD' format. + """ + + with open(config_file, "r") as file: lines = file.readlines() # Update the specified field with the new value updated_lines = [] selection_line_found = False for line in lines: - if not selection_line_found and line.strip() == '[selection]': - updated_lines.append(f'{additional_content}\n') + if not selection_line_found and line.strip() == "[selection]": + updated_lines.append(f"{additional_content}\n") selection_line_found = True - + if single_level_file: - if line.startswith('year'): - line = f'year={sl_year}\n' + if line.startswith("year"): + line = f"year={sl_year}\n" if line.startswith("month"): - line = f'month={sl_month}\n' + line = f"month={sl_month}\n" if line.startswith("day"): - line = f'day={sl_first_date}/to/{sl_last_date}\n' - elif line.startswith(field_name): + line = f"day={sl_first_date}/to/{sl_last_date}\n" + elif line.startswith(field_name): if co_file: - line = f'{field_name}={first_day_first_prev}/to/{last_day_first_prev}\n' + line = ( + f"{field_name}={first_day_first_prev}/to/{last_day_first_prev}\n" + ) else: - line = f'{field_name}={first_day_third_prev}/to/{last_day_third_prev}\n' + line = ( + f"{field_name}={first_day_third_prev}/to/{last_day_third_prev}\n" + ) updated_lines.append(line) - with open(config_file, 'w') as file: + with open(config_file, "w") as file: file.writelines(updated_lines) -def get_month_range(date): - '''Return the first and last date of the month from the input date.''' + +def get_month_range(date: datetime.date) -> Tuple[datetime.date, datetime.date]: + """Return the first and last date of the month from the input date. + + Parameters: + date (datetime.date): The input date. + + Returns: + tuple: A tuple containing the first and last date of the month as + datetime.date objects. + """ last_day = date.replace(day=1) - datetime.timedelta(days=1) - first_day = last_day.replace(day=1) + first_day = last_day.replace(day=1) return first_day, last_day -def get_single_level_dates(first_day, last_day): - '''Return the third previous month's year,month,first day and last day.''' + +def get_single_level_dates(first_day: datetime.date, + last_day: datetime.date) -> Tuple[str, str, str, str]: + """Return the year, month, first date, and last date of the input month. + + Parameters: + first_day (datetime.date): The first day of the month. + last_day (datetime.date): The last day of the month. + + Returns: + tuple: A tuple containing the year, month, first date, and last date of the + month as strings. + """ year, month = str(first_day)[:4], str(first_day)[5:7] - first_date, last_date = str(first_day)[8:], str(last_day)[8:] + first_date, last_date = str(first_day)[8:], str(last_day)[8:] return (year, month, first_date, last_date) -def get_previous_month_dates(): - '''Return the first and third previous month's date from the Today's date. ''' + +def get_previous_month_dates() -> (Tuple[datetime.date, datetime.date, datetime.date, + datetime.date, str, str, str, str]): + """Return the first and third previous month's date from the current date. + + Returns: + tuple: A tuple containing the first and third previous month's dates as + datetime.date objects, and the year, month, first date, and last date + of the third previous month as strings. + """ + today = datetime.date.today() + # Calculate the correct previous month considering months from 1 to 12 prev_month = today.month + 10 if today.month < 3 else today.month - 2 third_prev_month = today.replace(month=prev_month) first_prev_month = today.replace(month=today.month) @@ -60,67 +141,84 @@ def get_previous_month_dates(): first_day_first_prev, last_day_first_prev = get_month_range(first_prev_month) sl_year, sl_month, sl_first_date, sl_last_date = get_single_level_dates( - first_day_third_prev, last_day_third_prev) + first_day_third_prev, last_day_third_prev + ) return (first_day_first_prev, last_day_first_prev, first_day_third_prev, - last_day_third_prev, sl_year, sl_month, sl_first_date, sl_last_date) + last_day_third_prev, sl_year, sl_month, sl_first_date, sl_last_date) + -def update_config_files(directory, field_name, additional_content): - '''Update the config file.''' +def update_config_files(directory: str, field_name: str, + additional_content: str) -> None: + """Update the configuration files in the specified directory. + + Parameters: + directory (str): The path to the directory containing the configuration files. + field_name (str): The name of the field to be updated with the new value. + additional_content (str): The additional content to be added under the + '[selection]' section. + """ (first_day_first_prev, last_day_first_prev, first_day_third_prev, - last_day_third_prev, sl_year, sl_month, sl_first_date, - sl_last_date)= get_previous_month_dates() + last_day_third_prev, sl_year, sl_month, sl_first_date, + sl_last_date) = get_previous_month_dates() for filename in os.listdir(directory): single_level_file = False co_file = False - if filename.endswith('.cfg'): - if 'sfc' in filename: + if filename.endswith(".cfg"): + if "sfc" in filename: single_level_file = True - if 'hourly' in filename: + if "hourly" in filename: co_file = True config_file = os.path.join(directory, filename) - new_config_file(config_file, field_name, additional_content, co_file, - single_level_file, first_day_first_prev, last_day_first_prev, - first_day_third_prev, last_day_third_prev, - sl_year, sl_month, sl_first_date, sl_last_date) + new_config_file(config_file, field_name, additional_content, + co_file, single_level_file, first_day_first_prev, + last_day_first_prev, first_day_third_prev, + last_day_third_prev, sl_year, sl_month, + sl_first_date, sl_last_date) + + +def get_secret(api_key: str) -> dict: + """Retrieve the secret value from the Google Cloud Secret Manager. -def get_secret(api_key: str): - '''Function to retrieve the secret value from the google cloud.''' + Parameters: + api_key (str): The name or identifier of the secret in the Google + Cloud Secret Manager. + + Returns: + dict: A dictionary containing the retrieved secret data. + """ client = secretmanager.SecretManagerServiceClient() response = client.access_secret_version(request={"name": api_key}) payload = response.payload.data.decode("UTF-8") secret_dict = json.loads(payload) return secret_dict -DIRECTORY = '/weather' -FIELD_NAME = 'date' - -current_day = datetime.date.today() -Job_name = f'wx-dl-arco-era5_{current_day.month}_{current_day.year}' - -PROJECT = os.environ.get('PROJECT') -REGION = os.environ.get('REGION') -BUCKET = os.environ.get('BUCKET') -TOPIC_PATH = os.environ.get('TOPIC_PATH') -api_key1 = os.environ.get('api_key_1') -api_key2 = os.environ.get('api_key_2') - -api_key_1 = get_secret(api_key1) -api_key_2 = get_secret(api_key2) - -additional_content = f'[parameters.key1]\napi_url={api_key_1["api_url"]}\napi_key={api_key_1["api_key"]}\n\n[parameters.key2]\napi_url={api_key_2["api_url"]}\napi_key={api_key_2["api_key"]}\n' - -update_config_files(DIRECTORY, FIELD_NAME, additional_content) - -command = f'python "weather_dl/weather-dl" /weather/*.cfg --runner DataflowRunner --project {PROJECT} --region {REGION} \ ---temp_location "gs://{BUCKET}/tmp/" --disk_size_gb 260 --job_name {Job_name} \ ---sdk_container_image "gcr.io/grid-intelligence-sandbox/miniconda3-beam:weather-tools-with-aria2" \ ---manifest-location fs://manifest?projectId=anthromet-ingestion --topic-path {TOPIC_PATH} --experiment use_runner_v2' - -try: - subprocess.run(command, shell=True, check=True, capture_output=True) -except subprocess.CalledProcessError as e: - print(f'Failed to execute dataflow job due to {e.stderr.decode("utf-8")}') - +if __name__ == "__main__": + + for env_var in os.environ: + if API_KEY_PATTERN.match(env_var): + api_key_value = os.environ.get(env_var) + API_KEY_LIST.append(api_key_value) + + additional_content = "" + for count,key in enumerate(API_KEY_LIST): + api_key_value = get_secret(key) + additional_content += f'[parameters.api{count}]\n\ + api_url={api_key_value["api_url"]}\napi_key={api_key_value["api_key"]}\n\n' + + current_day = datetime.date.today() + Job_name = f"wx-dl-arco-era5-{current_day.month}-{current_day.year}" + + command = f'python weather_dl/weather-dl /weather/config_files/*.cfg --runner \ + DataflowRunner --project {PROJECT} --region {REGION} --temp_location \ + "gs://{BUCKET}/tmp/" --disk_size_gb 260 --job_name {Job_name} \ + --sdk_container_image {SDK_CONTAINER_IMAGE} \ + --manifest-location {MANIFEST_LOCATION} --experiment use_runner_v2' + + try: + update_config_files(DIRECTORY, FIELD_NAME, additional_content) + subprocess.run(command, shell=True, check=True, capture_output=True) + except subprocess.CalledProcessError as e: + print(f'Failed to execute dataflow job due to {e.stderr.decode("utf-8")}') From 9c0ac2726f6bac8794d1dae29d85b28c63721a40 Mon Sep 17 00:00:00 2001 From: dabhicusp Date: Tue, 25 Jul 2023 11:59:44 +0000 Subject: [PATCH 3/8] Testcase added of fetch.py and code modified. --- raw/README.md | 2 +- raw/fetch.py | 113 ++++++++++++++++++-------------- raw/fetch_test.py | 164 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 229 insertions(+), 50 deletions(-) create mode 100644 raw/fetch_test.py diff --git a/raw/README.md b/raw/README.md index e32b82b..71ddb56 100644 --- a/raw/README.md +++ b/raw/README.md @@ -136,7 +136,7 @@ _Pre-requisites_: > highly recommend that institutions pool licenses together for faster downloads. 3. Create a docker image from the docker file of the [current directory](https://github.com/google-research/arco-era5/tree/main/raw) and push that image in the [GCR](https://cloud.google.com/artifact-registry). - > Note: This command will complete the above step. : gcloud builds submit . --tag "gcr.io/PROJECT_NAME/REPOSITORY_NAME:TAG" + > Reference: https://github.com/google/weather-tools/blob/main/Runtime-Container.md 4. Add the all licenses of the cds in the [secret-manager](https://cloud.google.com/secret-manager) with secret value likes this dict: {"api_url": "URL", "api_key": "KEY"} > NOTE: for every API_key there must be unique secret-key. diff --git a/raw/fetch.py b/raw/fetch.py index 1513ef3..ad9715c 100644 --- a/raw/fetch.py +++ b/raw/fetch.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== +import configparser import datetime import json import os @@ -43,10 +44,10 @@ def new_config_file(config_file: str, field_name: str, additional_content: str, field_name (str): The name of the field to be updated with the new value. additional_content (str): The additional content to be added under the '[selection]' section. - co_file (bool): True if the configuration file is a 'hourly' file, False + co_file (bool): True if the configuration file is a 'CO' type file, False otherwise. - single_level_file (bool): True if the configuration file is a 'sfc' file, - False otherwise. + single_level_file (bool): True if the configuration file contains 'sfc' in + filename, False otherwise. first_day_first_prev (datetime.date): The first day of the first previous month. last_day_first_prev (datetime.date): The last day of the first previous month. first_day_third_prev (datetime.date): The first day of the third previous month. @@ -57,41 +58,38 @@ def new_config_file(config_file: str, field_name: str, additional_content: str, sl_last_date (str): The last date of the third previous month in 'DD' format. """ - with open(config_file, "r") as file: - lines = file.readlines() - - # Update the specified field with the new value - updated_lines = [] - selection_line_found = False - for line in lines: - if not selection_line_found and line.strip() == "[selection]": - updated_lines.append(f"{additional_content}\n") - selection_line_found = True - - if single_level_file: - if line.startswith("year"): - line = f"year={sl_year}\n" - if line.startswith("month"): - line = f"month={sl_month}\n" - if line.startswith("day"): - line = f"day={sl_first_date}/to/{sl_last_date}\n" - elif line.startswith(field_name): - if co_file: - line = ( - f"{field_name}={first_day_first_prev}/to/{last_day_first_prev}\n" - ) - else: - line = ( - f"{field_name}={first_day_third_prev}/to/{last_day_third_prev}\n" - ) - updated_lines.append(line) + config = configparser.ConfigParser() + config.read(config_file) + + if single_level_file: + config.set("selection", "year", sl_year) + config.set("selection", "month", sl_month) + config.set("selection", "day", f"{sl_first_date}/to/{sl_last_date}") + else: + if co_file: + config.set("selection", field_name, + f"{first_day_first_prev}/to/{last_day_first_prev}") + else: + config.set("selection", field_name, + f"{first_day_third_prev}/to/{last_day_third_prev}") + + sections_list = additional_content.split("\n\n") + for section in sections_list[:-1]: + sections = section.split("\n") + print("sections is here",sections) + new_section_name= sections[0].strip() + config.add_section(new_section_name) + api_url_name, api_url_value = sections[1].split("=") + config.set(new_section_name, api_url_name.strip(), api_url_value.strip()) + api_key_name, api_key_value = sections[2].split("=") + config.set(new_section_name, api_key_name.strip(), api_key_value.strip()) with open(config_file, "w") as file: - file.writelines(updated_lines) + config.write(file, space_around_delimiters=False) def get_month_range(date: datetime.date) -> Tuple[datetime.date, datetime.date]: - """Return the first and last date of the month from the input date. + """Return the first and last date of the previous month based on the input date. Parameters: date (datetime.date): The input date. @@ -122,14 +120,26 @@ def get_single_level_dates(first_day: datetime.date, return (year, month, first_date, last_date) -def get_previous_month_dates() -> (Tuple[datetime.date, datetime.date, datetime.date, - datetime.date, str, str, str, str]): - """Return the first and third previous month's date from the current date. +def get_previous_month_dates() -> dict: + """Return a dictionary containing the first and third previous month's dates from + the current date. Returns: - tuple: A tuple containing the first and third previous month's dates as - datetime.date objects, and the year, month, first date, and last date - of the third previous month as strings. + dict: A dictionary containing the following key-value pairs: + - 'first_day_first_prev': The first day of the first previous month + (datetime.date). + - 'last_day_first_prev': The last day of the first previous month + (datetime.date). + - 'first_day_third_prev': The first day of the third previous month + (datetime.date). + - 'last_day_third_prev': The last day of the third previous month + (datetime.date). + - 'sl_year': The year of the third previous month in 'YYYY' format (str). + - 'sl_month': The month of the third previous month in 'MM' format (str). + - 'sl_first_date': The first date of the third previous month in 'DD' + format (str). + - 'sl_last_date': The last date of the third previous month in 'DD' + format (str). """ today = datetime.date.today() @@ -144,8 +154,16 @@ def get_previous_month_dates() -> (Tuple[datetime.date, datetime.date, datetime. first_day_third_prev, last_day_third_prev ) - return (first_day_first_prev, last_day_first_prev, first_day_third_prev, - last_day_third_prev, sl_year, sl_month, sl_first_date, sl_last_date) + return { + 'first_day_first_prev': first_day_first_prev, + 'last_day_first_prev': last_day_first_prev, + 'first_day_third_prev': first_day_third_prev, + 'last_day_third_prev': last_day_third_prev, + 'sl_year': sl_year, + 'sl_month': sl_month, + 'sl_first_date': sl_first_date, + 'sl_last_date': sl_last_date + } def update_config_files(directory: str, field_name: str, @@ -158,9 +176,7 @@ def update_config_files(directory: str, field_name: str, additional_content (str): The additional content to be added under the '[selection]' section. """ - (first_day_first_prev, last_day_first_prev, first_day_third_prev, - last_day_third_prev, sl_year, sl_month, sl_first_date, - sl_last_date) = get_previous_month_dates() + dates_data = get_previous_month_dates() for filename in os.listdir(directory): single_level_file = False @@ -171,11 +187,10 @@ def update_config_files(directory: str, field_name: str, if "hourly" in filename: co_file = True config_file = os.path.join(directory, filename) + # Pass the data as keyword arguments to the new_config_file function new_config_file(config_file, field_name, additional_content, - co_file, single_level_file, first_day_first_prev, - last_day_first_prev, first_day_third_prev, - last_day_third_prev, sl_year, sl_month, - sl_first_date, sl_last_date) + co_file=co_file, single_level_file=single_level_file, + **dates_data) def get_secret(api_key: str) -> dict: @@ -205,7 +220,7 @@ def get_secret(api_key: str) -> dict: additional_content = "" for count,key in enumerate(API_KEY_LIST): api_key_value = get_secret(key) - additional_content += f'[parameters.api{count}]\n\ + additional_content += f'parameters.api{count}\n\ api_url={api_key_value["api_url"]}\napi_key={api_key_value["api_key"]}\n\n' current_day = datetime.date.today() diff --git a/raw/fetch_test.py b/raw/fetch_test.py new file mode 100644 index 0000000..667dc26 --- /dev/null +++ b/raw/fetch_test.py @@ -0,0 +1,164 @@ +import configparser +import datetime +import json +import os +import tempfile +import unittest + +from unittest.mock import patch, MagicMock + +# with . or not below?? +from fetch import ( + new_config_file, + get_month_range, + get_single_level_dates, + get_previous_month_dates, + update_config_files, + get_secret, +) + +class TestFetchFunctions(unittest.TestCase): + def setUp(self): + # Create a temporary directory for testing + self.temp_dir = tempfile.mkdtemp() + self.config_file = os.path.join(self.temp_dir, "test_config.cfg") + with open(self.config_file, "w") as file: + file.write( + "[parameters]\nclient=cds\ndataset=reanalysis-era5-complete\n\ + target_path=gs://gcp-public-data-arco-era5/raw/ERA5GRIB/HRES\ + /Daily/{date:%%Y/%%Y%%m%d}_hres_dve.grb2\npartition_keys=\n\t\ + dates\n\n[selection]\nclass=ea\nstream=oper\nexpver=1\ntype=an\n\ + levtype=ml\nlevelist=1/to/137\ndate=1979-01-01/to/2023-03-31\n\ + time=00/to/23\nparam=138/155\n" + ) + self.first_day_first_prev = datetime.date(2023, 7, 1) + self.last_day_first_prev = datetime.date(2023, 7, 31) + self.first_day_third_prev = datetime.date(2023, 5, 1) + self.last_day_third_prev = datetime.date(2023, 5, 31) + self.sl_year, self.sl_month = "2023", "05" + self.sl_first_date, self.sl_last_date = "01", "31" + self.additional_content = "[parameters.test]\napi_url=test_url\napi_key=\ + test_key\n\n" + + def tearDown(self): + os.remove(self.config_file) + os.rmdir(self.temp_dir) + + def test_new_config_file(self): + section_name = 'parameters.test' + section_api_url = 'test_url' + section_api_key = 'test_key' + additional_content = f'{section_name}\napi_url={section_api_url}\n\ + api_key={section_api_key}\n\n' + + new_config_file( + self.config_file, "date", additional_content, + False, False, self.first_day_first_prev, + self.last_day_first_prev, self.first_day_third_prev, + self.last_day_third_prev, self.sl_year, self.sl_month, + self.sl_first_date, self.sl_last_date) + + config = configparser.ConfigParser() + config.read(self.config_file) + self.assertIn(section_name, config.sections()) + self.assertEqual(config.get('selection', 'date'), + f'{self.first_day_third_prev}/to/{self.last_day_third_prev}') + self.assertEqual(config.get(section_name, 'api_url'), + section_api_url) + self.assertEqual(config.get(section_name, 'api_key'), + section_api_key) + + def test_new_config_file_with_co_file(self): + co_file = True + single_level_file = False + + new_config_file( + self.config_file, "date", self.additional_content, + co_file, single_level_file, self.first_day_first_prev, + self.last_day_first_prev, self.first_day_third_prev, + self.last_day_third_prev, self.sl_year, self.sl_month, + self.sl_first_date, self.sl_last_date) + + config = configparser.ConfigParser() + config.read(self.config_file) + + self.assertEqual(config.get('selection', 'date'), + f'{self.first_day_first_prev}/to/{self.last_day_first_prev}') + + def test_new_config_file_with_single_level_file(self): + co_file = False + single_level_file = True + + new_config_file(self.config_file, 'date', self.additional_content, + co_file, single_level_file, self.first_day_first_prev, + self.last_day_first_prev, self.first_day_third_prev, + self.last_day_third_prev, self.sl_year, self.sl_month, + self.sl_first_date, self.sl_last_date) + + config = configparser.ConfigParser() + config.read(self.config_file) + + self.assertEqual(config.get('selection', 'year'), self.sl_year) + self.assertEqual(config.get('selection', 'month'), self.sl_month) + self.assertEqual(config.get('selection', 'day'), + f'{self.sl_first_date}/to/{self.sl_last_date}') + + def test_get_month_range(self): + # Test get_month_range function + first_day, last_day = get_month_range(datetime.date(2023, 7, 18)) + self.assertEqual(first_day, datetime.date(2023, 6, 1)) + self.assertEqual(last_day, datetime.date(2023, 6, 30)) + + def test_get_single_level_dates(self): + # Test get_single_level_dates function + first_day = datetime.date(2023, 7, 1) + last_day = datetime.date(2023, 7, 31) + year, month, first_date, last_date = get_single_level_dates(first_day, last_day) + self.assertEqual(year, "2023") + self.assertEqual(month, "07") + self.assertEqual(first_date, "01") + self.assertEqual(last_date, "31") + + def test_get_previous_month_dates(self): + # Test get_previous_month_dates function + prev_month_data = get_previous_month_dates() + self.assertIn("first_day_first_prev", prev_month_data) + self.assertIn("last_day_first_prev", prev_month_data) + self.assertIn("first_day_third_prev", prev_month_data) + self.assertIn("last_day_third_prev", prev_month_data) + self.assertIn("sl_year", prev_month_data) + self.assertIn("sl_month", prev_month_data) + self.assertIn("sl_first_date", prev_month_data) + self.assertIn("sl_last_date", prev_month_data) + + def test_update_config_files(self): + # Test update_config_files function + update_config_files( + self.temp_dir, "date", self.additional_content) + + @patch("fetch.secretmanager.SecretManagerServiceClient") + def test_get_secret_success(self, mock_secretmanager): + secret_data = { + "api_url": "https://example.com/api", + "api_key": "my_secret_api_key" + } + mock_response = MagicMock() + mock_response.payload.data.decode.return_value = json.dumps(secret_data) + mock_secretmanager.return_value.access_secret_version.return_value = ( + mock_response) + + api_key = "projects/my-project/secrets/my-secret/versions/latest" + result = get_secret(api_key) + self.assertEqual(result, secret_data) + + @patch("fetch.secretmanager.SecretManagerServiceClient") + def test_get_secret_failure(self, mock_secretmanager): + mock_secretmanager.return_value.access_secret_version.side_effect = ( + Exception("Error retrieving secret") ) + api_key = "projects/my-project/secrets/my-secret/versions/latest" + with self.assertRaises(Exception): + get_secret(api_key) + + +if __name__ == "__main__": + unittest.main() From 0415518e180ffbd2b0aaabcec7ba313879d5bc98 Mon Sep 17 00:00:00 2001 From: dabhicusp Date: Wed, 26 Jul 2023 07:04:17 +0000 Subject: [PATCH 4/8] remove unnecessary code and Updated function. --- raw/Dockerfile | 2 +- raw/README.md | 12 +-- raw/fetch.py | 184 ++++++++++++++++++++++++++-------------------- raw/fetch_test.py | 101 +++++++++++-------------- 4 files changed, 155 insertions(+), 144 deletions(-) diff --git a/raw/Dockerfile b/raw/Dockerfile index a6cd9b9..e45cb3f 100644 --- a/raw/Dockerfile +++ b/raw/Dockerfile @@ -35,7 +35,7 @@ ARG CONDA_ENV_NAME=weather-tools RUN echo "source activate ${CONDA_ENV_NAME}" >> ~/.bashrc ENV PATH /opt/conda/envs/${CONDA_ENV_NAME}/bin:$PATH RUN pip install -e . -# TDOO(#): +# TODO([##368](https://github.com/google/weather-tools/issues/368)) RUN pip install google-cloud-secret-manager==2.0.0 COPY *.cfg config_files/ diff --git a/raw/README.md b/raw/README.md index 71ddb56..cabf550 100644 --- a/raw/README.md +++ b/raw/README.md @@ -142,12 +142,12 @@ _Pre-requisites_: > NOTE: for every API_key there must be unique secret-key. 5. Create a new job in [Cloud-Run](https://cloud.google.com/run) using of the above docker image with this **ENV** variables. - * PROJECT - * REGION - * BUCKET - * SDK_CONTAINER_IMAGE - * MANIFEST_LOCATION - * API_KEY_* + * `PROJECT` + * `REGION` + * `BUCKET` + * `SDK_CONTAINER_IMAGE` + * `MANIFEST_LOCATION` + * `API_KEY_*` Here, API_KEY_* is access of [secret-manager key](https://cloud.google.com/secret-manager) and it's value is looks like this :: projects/PROJECT_NAME/secrets/SECRET_KEY_NAME/versions/1 diff --git a/raw/fetch.py b/raw/fetch.py index ad9715c..2a39709 100644 --- a/raw/fetch.py +++ b/raw/fetch.py @@ -15,11 +15,13 @@ import configparser import datetime import json +import logging import os import re import subprocess +import typing as t + from google.cloud import secretmanager -from typing import Tuple DIRECTORY = "/weather/config_files" FIELD_NAME = "date" @@ -31,22 +33,17 @@ API_KEY_PATTERN = re.compile(r'^API_KEY_\d+$') API_KEY_LIST = [] +logger = logging.getLogger(__name__) -def new_config_file(config_file: str, field_name: str, additional_content: str, - co_file: bool, single_level_file: bool, first_day_first_prev: datetime.date, - last_day_first_prev: datetime.date, first_day_third_prev: datetime.date, - last_day_third_prev: datetime.date, sl_year: str, sl_month: str, - sl_first_date: str, sl_last_date: str) -> None: - """Modify the specified configuration file with new values. - Parameters: - config_file (str): The path to the configuration file to be modified. - field_name (str): The name of the field to be updated with the new value. - additional_content (str): The additional content to be added under the - '[selection]' section. - co_file (bool): True if the configuration file is a 'CO' type file, False +class ConfigArgs(t.TypedDict): + """A class representing the configuration arguments for the new_config_file + function. + + Attributes: + co_file (bool): True if the configuration file is a 'CO' type file, False otherwise. - single_level_file (bool): True if the configuration file contains 'sfc' in + single_level_file (bool): True if the configuration file contains 'sfc' in filename, False otherwise. first_day_first_prev (datetime.date): The first day of the first previous month. last_day_first_prev (datetime.date): The last day of the first previous month. @@ -54,9 +51,58 @@ def new_config_file(config_file: str, field_name: str, additional_content: str, last_day_third_prev (datetime.date): The last day of the third previous month. sl_year (str): The year of the third previous month in 'YYYY' format. sl_month (str): The month of the third previous month in 'MM' format. - sl_first_date (str): The first date of the third previous month in 'DD' format. - sl_last_date (str): The last date of the third previous month in 'DD' format. """ + co_file: bool + single_level_file: bool + first_day_first_prev: datetime.date + last_day_first_prev: datetime.date + first_day_third_prev: datetime.date + last_day_third_prev: datetime.date + sl_year: str + sl_month: str + + +class MonthDates(t.TypedDict): + """A class representing the first and third previous month's dates. + + Attributes: + first_day_first_prev (datetime.date): The first day of the first previous month. + last_day_first_prev (datetime.date): The last day of the first previous month. + first_day_third_prev (datetime.date): The first day of the third previous month. + last_day_third_prev (datetime.date): The last day of the third previous month. + sl_year (str): The year of the third previous month in 'YYYY' format. + sl_month (str): The month of the third previous month in 'MM' format. + """ + first_day_first_prev: datetime.date + last_day_first_prev: datetime.date + first_day_third_prev: datetime.date + last_day_third_prev: datetime.date + sl_year: str + sl_month: str + + +def new_config_file(config_file: str, field_name: str, additional_content: str, + config_args: ConfigArgs) -> None: + """Modify the specified configuration file with new values. + + Parameters: + config_file (str): The path to the configuration file to be modified. + field_name (str): The name of the field to be updated with the new value. + additional_content (str): The additional content to be added under the + '[selection]' section. + config_args (ConfigArgs): A dictionary containing the configuration arguments + as key-value pairs. + """ + + # Unpack the values from config_args dictionary + co_file = config_args["co_file"] + single_level_file = config_args["single_level_file"] + first_day_first_prev = config_args["first_day_first_prev"] + last_day_first_prev = config_args["last_day_first_prev"] + first_day_third_prev = config_args["first_day_third_prev"] + last_day_third_prev = config_args["last_day_third_prev"] + sl_year = config_args["sl_year"] + sl_month = config_args["sl_month"] config = configparser.ConfigParser() config.read(config_file) @@ -64,20 +110,19 @@ def new_config_file(config_file: str, field_name: str, additional_content: str, if single_level_file: config.set("selection", "year", sl_year) config.set("selection", "month", sl_month) - config.set("selection", "day", f"{sl_first_date}/to/{sl_last_date}") + config.set("selection", "day", "all") else: if co_file: - config.set("selection", field_name, + config.set("selection", field_name, f"{first_day_first_prev}/to/{last_day_first_prev}") else: - config.set("selection", field_name, + config.set("selection", field_name, f"{first_day_third_prev}/to/{last_day_third_prev}") sections_list = additional_content.split("\n\n") for section in sections_list[:-1]: sections = section.split("\n") - print("sections is here",sections) - new_section_name= sections[0].strip() + new_section_name = sections[0].strip() config.add_section(new_section_name) api_url_name, api_url_value = sections[1].split("=") config.set(new_section_name, api_url_name.strip(), api_url_value.strip()) @@ -88,14 +133,14 @@ def new_config_file(config_file: str, field_name: str, additional_content: str, config.write(file, space_around_delimiters=False) -def get_month_range(date: datetime.date) -> Tuple[datetime.date, datetime.date]: +def get_month_range(date: datetime.date) -> t.Tuple[datetime.date, datetime.date]: """Return the first and last date of the previous month based on the input date. Parameters: date (datetime.date): The input date. Returns: - tuple: A tuple containing the first and last date of the month as + tuple: A tuple containing the first and last date of the month as datetime.date objects. """ last_day = date.replace(day=1) - datetime.timedelta(days=1) @@ -103,43 +148,22 @@ def get_month_range(date: datetime.date) -> Tuple[datetime.date, datetime.date]: return first_day, last_day -def get_single_level_dates(first_day: datetime.date, - last_day: datetime.date) -> Tuple[str, str, str, str]: - """Return the year, month, first date, and last date of the input month. - - Parameters: - first_day (datetime.date): The first day of the month. - last_day (datetime.date): The last day of the month. - - Returns: - tuple: A tuple containing the year, month, first date, and last date of the - month as strings. - """ - year, month = str(first_day)[:4], str(first_day)[5:7] - first_date, last_date = str(first_day)[8:], str(last_day)[8:] - return (year, month, first_date, last_date) - - -def get_previous_month_dates() -> dict: - """Return a dictionary containing the first and third previous month's dates from +def get_previous_month_dates() -> MonthDates: + """Return a dictionary containing the first and third previous month's dates from the current date. Returns: dict: A dictionary containing the following key-value pairs: - - 'first_day_first_prev': The first day of the first previous month + - 'first_day_first_prev': The first day of the first previous month (datetime.date). - - 'last_day_first_prev': The last day of the first previous month + - 'last_day_first_prev': The last day of the first previous month (datetime.date). - - 'first_day_third_prev': The first day of the third previous month + - 'first_day_third_prev': The first day of the third previous month (datetime.date). - - 'last_day_third_prev': The last day of the third previous month + - 'last_day_third_prev': The last day of the third previous month (datetime.date). - 'sl_year': The year of the third previous month in 'YYYY' format (str). - 'sl_month': The month of the third previous month in 'MM' format (str). - - 'sl_first_date': The first date of the third previous month in 'DD' - format (str). - - 'sl_last_date': The last date of the third previous month in 'DD' - format (str). """ today = datetime.date.today() @@ -149,10 +173,8 @@ def get_previous_month_dates() -> dict: first_prev_month = today.replace(month=today.month) first_day_third_prev, last_day_third_prev = get_month_range(third_prev_month) first_day_first_prev, last_day_first_prev = get_month_range(first_prev_month) - - sl_year, sl_month, sl_first_date, sl_last_date = get_single_level_dates( - first_day_third_prev, last_day_third_prev - ) + first_date_third_prev = first_day_third_prev + sl_year, sl_month = str(first_date_third_prev)[:4], str(first_date_third_prev)[5:7] return { 'first_day_first_prev': first_day_first_prev, @@ -161,43 +183,46 @@ def get_previous_month_dates() -> dict: 'last_day_third_prev': last_day_third_prev, 'sl_year': sl_year, 'sl_month': sl_month, - 'sl_first_date': sl_first_date, - 'sl_last_date': sl_last_date } -def update_config_files(directory: str, field_name: str, +def update_config_files(directory: str, field_name: str, additional_content: str) -> None: """Update the configuration files in the specified directory. Parameters: directory (str): The path to the directory containing the configuration files. field_name (str): The name of the field to be updated with the new value. - additional_content (str): The additional content to be added under the + additional_content (str): The additional content to be added under the '[selection]' section. """ dates_data = get_previous_month_dates() - + config_args = { + "first_day_first_prev": dates_data['first_day_first_prev'], + "last_day_first_prev": dates_data['last_day_first_prev'], + "first_day_third_prev": dates_data['first_day_third_prev'], + "last_day_third_prev": dates_data['last_day_third_prev'], + "sl_year": dates_data['sl_year'], + "sl_month": dates_data['sl_month'], + } for filename in os.listdir(directory): - single_level_file = False - co_file = False + config_args['single_level_file'] = config_args['co_file'] = False if filename.endswith(".cfg"): if "sfc" in filename: - single_level_file = True + config_args["single_level_file"] = True if "hourly" in filename: - co_file = True + config_args["co_file"] = True config_file = os.path.join(directory, filename) # Pass the data as keyword arguments to the new_config_file function new_config_file(config_file, field_name, additional_content, - co_file=co_file, single_level_file=single_level_file, - **dates_data) + config_args=config_args) def get_secret(api_key: str) -> dict: """Retrieve the secret value from the Google Cloud Secret Manager. Parameters: - api_key (str): The name or identifier of the secret in the Google + api_key (str): The name or identifier of the secret in the Google Cloud Secret Manager. Returns: @@ -211,29 +236,32 @@ def get_secret(api_key: str) -> dict: if __name__ == "__main__": - for env_var in os.environ: if API_KEY_PATTERN.match(env_var): api_key_value = os.environ.get(env_var) API_KEY_LIST.append(api_key_value) - + additional_content = "" - for count,key in enumerate(API_KEY_LIST): + for count, key in enumerate(API_KEY_LIST): api_key_value = get_secret(key) additional_content += f'parameters.api{count}\n\ api_url={api_key_value["api_url"]}\napi_key={api_key_value["api_key"]}\n\n' - - current_day = datetime.date.today() - Job_name = f"wx-dl-arco-era5-{current_day.month}-{current_day.year}" - command = f'python weather_dl/weather-dl /weather/config_files/*.cfg --runner \ - DataflowRunner --project {PROJECT} --region {REGION} --temp_location \ - "gs://{BUCKET}/tmp/" --disk_size_gb 260 --job_name {Job_name} \ - --sdk_container_image {SDK_CONTAINER_IMAGE} \ - --manifest-location {MANIFEST_LOCATION} --experiment use_runner_v2' + current_day = datetime.date.today() + job_name = f"wx-dl-arco-era5-{current_day.month}-{current_day.year}" + + command = ( + f'python weather_dl/weather-dl /weather/config_files/*.cfg --runner ' + f'DataflowRunner --project {PROJECT} --region {REGION} --temp_location ' + f'"gs://{BUCKET}/tmp/" --disk_size_gb 260 --job_name {job_name} ' + f'--sdk_container_image {SDK_CONTAINER_IMAGE} ' + f'--manifest-location {MANIFEST_LOCATION} --experiment use_runner_v2' + ) try: update_config_files(DIRECTORY, FIELD_NAME, additional_content) subprocess.run(command, shell=True, check=True, capture_output=True) except subprocess.CalledProcessError as e: - print(f'Failed to execute dataflow job due to {e.stderr.decode("utf-8")}') + logger.error( + f'Failed to execute dataflow job due to {e.stderr.decode("utf-8")}' + ) diff --git a/raw/fetch_test.py b/raw/fetch_test.py index 667dc26..64bef8f 100644 --- a/raw/fetch_test.py +++ b/raw/fetch_test.py @@ -7,21 +7,20 @@ from unittest.mock import patch, MagicMock -# with . or not below?? from fetch import ( new_config_file, get_month_range, - get_single_level_dates, get_previous_month_dates, update_config_files, get_secret, ) + class TestFetchFunctions(unittest.TestCase): def setUp(self): # Create a temporary directory for testing - self.temp_dir = tempfile.mkdtemp() - self.config_file = os.path.join(self.temp_dir, "test_config.cfg") + self.temp_dir = tempfile.TemporaryDirectory() + self.config_file = os.path.join(self.temp_dir.name, "test_config.cfg") with open(self.config_file, "w") as file: file.write( "[parameters]\nclient=cds\ndataset=reanalysis-era5-complete\n\ @@ -31,18 +30,23 @@ def setUp(self): levtype=ml\nlevelist=1/to/137\ndate=1979-01-01/to/2023-03-31\n\ time=00/to/23\nparam=138/155\n" ) - self.first_day_first_prev = datetime.date(2023, 7, 1) - self.last_day_first_prev = datetime.date(2023, 7, 31) - self.first_day_third_prev = datetime.date(2023, 5, 1) - self.last_day_third_prev = datetime.date(2023, 5, 31) - self.sl_year, self.sl_month = "2023", "05" - self.sl_first_date, self.sl_last_date = "01", "31" + self.config_args = { + "first_day_first_prev": datetime.date(2023, 7, 1), + "last_day_first_prev": datetime.date(2023, 7, 31), + "first_day_third_prev": datetime.date(2023, 5, 1), + "last_day_third_prev": datetime.date(2023, 5, 31), + "sl_year": "2023", + "sl_month": "05", + 'single_level_file': False, + 'co_file': False + } self.additional_content = "[parameters.test]\napi_url=test_url\napi_key=\ test_key\n\n" def tearDown(self): os.remove(self.config_file) - os.rmdir(self.temp_dir) + os.rmdir(self.temp_dir.name) + self.temp_dir.cleanup() def test_new_config_file(self): section_name = 'parameters.test' @@ -50,75 +54,56 @@ def test_new_config_file(self): section_api_key = 'test_key' additional_content = f'{section_name}\napi_url={section_api_url}\n\ api_key={section_api_key}\n\n' - - new_config_file( - self.config_file, "date", additional_content, - False, False, self.first_day_first_prev, - self.last_day_first_prev, self.first_day_third_prev, - self.last_day_third_prev, self.sl_year, self.sl_month, - self.sl_first_date, self.sl_last_date) + + new_config_file(self.config_file, "date", additional_content, self.config_args) config = configparser.ConfigParser() config.read(self.config_file) self.assertIn(section_name, config.sections()) - self.assertEqual(config.get('selection', 'date'), - f'{self.first_day_third_prev}/to/{self.last_day_third_prev}') + self.assertEqual( + config.get("selection", "date"), + f'{self.config_args["first_day_third_prev"]}/to/{self.config_args["last_day_third_prev"]}', + ) self.assertEqual(config.get(section_name, 'api_url'), - section_api_url) + section_api_url) self.assertEqual(config.get(section_name, 'api_key'), - section_api_key) + section_api_key) def test_new_config_file_with_co_file(self): - co_file = True - single_level_file = False + self.config_args["co_file"] = True + self.config_args["single_level_file"] = False new_config_file( - self.config_file, "date", self.additional_content, - co_file, single_level_file, self.first_day_first_prev, - self.last_day_first_prev, self.first_day_third_prev, - self.last_day_third_prev, self.sl_year, self.sl_month, - self.sl_first_date, self.sl_last_date) - + self.config_file, "date", self.additional_content, self.config_args) + config = configparser.ConfigParser() config.read(self.config_file) - self.assertEqual(config.get('selection', 'date'), - f'{self.first_day_first_prev}/to/{self.last_day_first_prev}') + self.assertEqual( + config.get('selection', 'date'), + f'{self.config_args["first_day_first_prev"]}/to/{self.config_args["last_day_first_prev"]}' + ) def test_new_config_file_with_single_level_file(self): - co_file = False - single_level_file = True + self.config_args["co_file"] = False + self.config_args["single_level_file"] = True - new_config_file(self.config_file, 'date', self.additional_content, - co_file, single_level_file, self.first_day_first_prev, - self.last_day_first_prev, self.first_day_third_prev, - self.last_day_third_prev, self.sl_year, self.sl_month, - self.sl_first_date, self.sl_last_date) + new_config_file( + self.config_file, "date", self.additional_content, self.config_args) config = configparser.ConfigParser() config.read(self.config_file) - self.assertEqual(config.get('selection', 'year'), self.sl_year) - self.assertEqual(config.get('selection', 'month'), self.sl_month) - self.assertEqual(config.get('selection', 'day'), - f'{self.sl_first_date}/to/{self.sl_last_date}') - + self.assertEqual(config.get('selection', 'year'), self.config_args["sl_year"]) + self.assertEqual(config.get('selection', 'month'), self.config_args["sl_month"]) + self.assertEqual(config.get('selection', 'day'), 'all') + def test_get_month_range(self): # Test get_month_range function first_day, last_day = get_month_range(datetime.date(2023, 7, 18)) self.assertEqual(first_day, datetime.date(2023, 6, 1)) self.assertEqual(last_day, datetime.date(2023, 6, 30)) - def test_get_single_level_dates(self): - # Test get_single_level_dates function - first_day = datetime.date(2023, 7, 1) - last_day = datetime.date(2023, 7, 31) - year, month, first_date, last_date = get_single_level_dates(first_day, last_day) - self.assertEqual(year, "2023") - self.assertEqual(month, "07") - self.assertEqual(first_date, "01") - self.assertEqual(last_date, "31") - def test_get_previous_month_dates(self): # Test get_previous_month_dates function prev_month_data = get_previous_month_dates() @@ -128,13 +113,11 @@ def test_get_previous_month_dates(self): self.assertIn("last_day_third_prev", prev_month_data) self.assertIn("sl_year", prev_month_data) self.assertIn("sl_month", prev_month_data) - self.assertIn("sl_first_date", prev_month_data) - self.assertIn("sl_last_date", prev_month_data) def test_update_config_files(self): # Test update_config_files function update_config_files( - self.temp_dir, "date", self.additional_content) + self.temp_dir.name, "date", self.additional_content) @patch("fetch.secretmanager.SecretManagerServiceClient") def test_get_secret_success(self, mock_secretmanager): @@ -153,8 +136,8 @@ def test_get_secret_success(self, mock_secretmanager): @patch("fetch.secretmanager.SecretManagerServiceClient") def test_get_secret_failure(self, mock_secretmanager): - mock_secretmanager.return_value.access_secret_version.side_effect = ( - Exception("Error retrieving secret") ) + mock_secretmanager.return_value.access_secret_version.side_effect = ( + Exception("Error retrieving secret")) api_key = "projects/my-project/secrets/my-secret/versions/latest" with self.assertRaises(Exception): get_secret(api_key) From 0512475bbc10f337766e05432415e29cd4f9d888 Mon Sep 17 00:00:00 2001 From: dabhicusp Date: Thu, 27 Jul 2023 05:09:03 +0000 Subject: [PATCH 5/8] Changed nits. --- raw/Dockerfile | 3 +-- raw/README.md | 6 +++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/raw/Dockerfile b/raw/Dockerfile index e45cb3f..004d88e 100644 --- a/raw/Dockerfile +++ b/raw/Dockerfile @@ -1,4 +1,3 @@ - # Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -35,7 +34,7 @@ ARG CONDA_ENV_NAME=weather-tools RUN echo "source activate ${CONDA_ENV_NAME}" >> ~/.bashrc ENV PATH /opt/conda/envs/${CONDA_ENV_NAME}/bin:$PATH RUN pip install -e . -# TODO([##368](https://github.com/google/weather-tools/issues/368)) +# TODO(#368): Add google-cloud-secret-manager in weather-tools RUN pip install google-cloud-secret-manager==2.0.0 COPY *.cfg config_files/ diff --git a/raw/README.md b/raw/README.md index cabf550..b2f57ee 100644 --- a/raw/README.md +++ b/raw/README.md @@ -125,7 +125,7 @@ specifically `weather-dl` (see [weather-tools.readthedocs.io](https://weather-to _Pre-requisites_: -1. Set up a cloud project with sufficient permissions to use cloud storage (such as +1. Set up a Cloud project with sufficient permissions to use cloud storage (such as [GCS](https://cloud.google.com/storage)) and a Beam runner (such as [Dataflow](https://cloud.google.com/dataflow)). > Note: Other cloud systems should work too, such as S3 and Elastic Map Reduce. However, these are untested. If you > experience an error here, please let us know by [filing an issue](https://github.com/google/weather-tools/issues). @@ -138,8 +138,8 @@ _Pre-requisites_: 3. Create a docker image from the docker file of the [current directory](https://github.com/google-research/arco-era5/tree/main/raw) and push that image in the [GCR](https://cloud.google.com/artifact-registry). > Reference: https://github.com/google/weather-tools/blob/main/Runtime-Container.md -4. Add the all licenses of the cds in the [secret-manager](https://cloud.google.com/secret-manager) with secret value likes this dict: {"api_url": "URL", "api_key": "KEY"} - > NOTE: for every API_key there must be unique secret-key. +4. 4. Add the all CDS licenses into the [secret-manager](https://cloud.google.com/secret-manager) with value likes this: {"api_url": "URL", "api_key": "KEY"} + > NOTE: for every API_KEY there must be unique secret-key. 5. Create a new job in [Cloud-Run](https://cloud.google.com/run) using of the above docker image with this **ENV** variables. * `PROJECT` From 4badb4634582f8040c41a1ee86511e0ea9af5e79 Mon Sep 17 00:00:00 2001 From: dabhicusp Date: Fri, 28 Jul 2023 11:57:11 +0000 Subject: [PATCH 6/8] Subprocess stopping script added. --- raw/README.md | 2 +- raw/fetch.py | 24 +++++++++++++++++------- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/raw/README.md b/raw/README.md index b2f57ee..3b1fcf9 100644 --- a/raw/README.md +++ b/raw/README.md @@ -138,7 +138,7 @@ _Pre-requisites_: 3. Create a docker image from the docker file of the [current directory](https://github.com/google-research/arco-era5/tree/main/raw) and push that image in the [GCR](https://cloud.google.com/artifact-registry). > Reference: https://github.com/google/weather-tools/blob/main/Runtime-Container.md -4. 4. Add the all CDS licenses into the [secret-manager](https://cloud.google.com/secret-manager) with value likes this: {"api_url": "URL", "api_key": "KEY"} +4. Add the all CDS licenses into the [secret-manager](https://cloud.google.com/secret-manager) with value likes this: {"api_url": "URL", "api_key": "KEY"} > NOTE: for every API_KEY there must be unique secret-key. 5. Create a new job in [Cloud-Run](https://cloud.google.com/run) using of the above docker image with this **ENV** variables. diff --git a/raw/fetch.py b/raw/fetch.py index 2a39709..e0879ec 100644 --- a/raw/fetch.py +++ b/raw/fetch.py @@ -19,6 +19,7 @@ import os import re import subprocess +import signal import typing as t from google.cloud import secretmanager @@ -258,10 +259,19 @@ def get_secret(api_key: str) -> dict: f'--manifest-location {MANIFEST_LOCATION} --experiment use_runner_v2' ) - try: - update_config_files(DIRECTORY, FIELD_NAME, additional_content) - subprocess.run(command, shell=True, check=True, capture_output=True) - except subprocess.CalledProcessError as e: - logger.error( - f'Failed to execute dataflow job due to {e.stderr.decode("utf-8")}' - ) + update_config_files(DIRECTORY, FIELD_NAME, additional_content) + stop_on_log = 'JOB_STATE_RUNNING' + process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + with process.stdout: + try: + for line in iter(process.stdout.readline, b''): + log_message = line.decode("utf-8").strip() + print(log_message) + if stop_on_log in log_message: + print(f'Stopping subprocess as {stop_on_log}') + os.killpg(os.getpgid(process.pid), signal.SIGTERM) + except subprocess.CalledProcessError as e: + logger.error( + f'Failed to execute dataflow job due to {e.stderr.decode("utf-8")}' + ) From a2553a7d24380fe2a88df62fccb8da3e9056d0b2 Mon Sep 17 00:00:00 2001 From: dabhicusp Date: Mon, 31 Jul 2023 11:53:20 +0000 Subject: [PATCH 7/8] Added TODO of --async keyword. --- raw/fetch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raw/fetch.py b/raw/fetch.py index e0879ec..185bc1c 100644 --- a/raw/fetch.py +++ b/raw/fetch.py @@ -250,7 +250,7 @@ def get_secret(api_key: str) -> dict: current_day = datetime.date.today() job_name = f"wx-dl-arco-era5-{current_day.month}-{current_day.year}" - + # TODO(#373): Update the command once `--async` keyword added in weather-dl. command = ( f'python weather_dl/weather-dl /weather/config_files/*.cfg --runner ' f'DataflowRunner --project {PROJECT} --region {REGION} --temp_location ' From a342dc832e3f878c1c544cdf72ede00dd6f283c7 Mon Sep 17 00:00:00 2001 From: dabhicusp Date: Mon, 31 Jul 2023 13:21:04 +0000 Subject: [PATCH 8/8] Variable name changed. --- raw/fetch.py | 10 +++++----- raw/fetch_test.py | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/raw/fetch.py b/raw/fetch.py index 185bc1c..415d11b 100644 --- a/raw/fetch.py +++ b/raw/fetch.py @@ -219,7 +219,7 @@ def update_config_files(directory: str, field_name: str, config_args=config_args) -def get_secret(api_key: str) -> dict: +def get_secret(secret_key: str) -> dict: """Retrieve the secret value from the Google Cloud Secret Manager. Parameters: @@ -230,7 +230,7 @@ def get_secret(api_key: str) -> dict: dict: A dictionary containing the retrieved secret data. """ client = secretmanager.SecretManagerServiceClient() - response = client.access_secret_version(request={"name": api_key}) + response = client.access_secret_version(request={"name": secret_key}) payload = response.payload.data.decode("UTF-8") secret_dict = json.loads(payload) return secret_dict @@ -243,10 +243,10 @@ def get_secret(api_key: str) -> dict: API_KEY_LIST.append(api_key_value) additional_content = "" - for count, key in enumerate(API_KEY_LIST): - api_key_value = get_secret(key) + for count, secret_key in enumerate(API_KEY_LIST): + secret_key_value = get_secret(secret_key) additional_content += f'parameters.api{count}\n\ - api_url={api_key_value["api_url"]}\napi_key={api_key_value["api_key"]}\n\n' + api_url={secret_key_value["api_url"]}\napi_key={secret_key_value["api_key"]}\n\n' current_day = datetime.date.today() job_name = f"wx-dl-arco-era5-{current_day.month}-{current_day.year}" diff --git a/raw/fetch_test.py b/raw/fetch_test.py index 64bef8f..d7051b4 100644 --- a/raw/fetch_test.py +++ b/raw/fetch_test.py @@ -130,17 +130,17 @@ def test_get_secret_success(self, mock_secretmanager): mock_secretmanager.return_value.access_secret_version.return_value = ( mock_response) - api_key = "projects/my-project/secrets/my-secret/versions/latest" - result = get_secret(api_key) + secret_key = "projects/my-project/secrets/my-secret/versions/latest" + result = get_secret(secret_key) self.assertEqual(result, secret_data) @patch("fetch.secretmanager.SecretManagerServiceClient") def test_get_secret_failure(self, mock_secretmanager): mock_secretmanager.return_value.access_secret_version.side_effect = ( Exception("Error retrieving secret")) - api_key = "projects/my-project/secrets/my-secret/versions/latest" + secret_key = "projects/my-project/secrets/my-secret/versions/latest" with self.assertRaises(Exception): - get_secret(api_key) + get_secret(secret_key) if __name__ == "__main__":