Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify gcs target #218

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ dependencies:
- python=3.8
- cdsapi
- ecmwf-api-client
- apache-beam[gcp]
- apache-beam[gcp]==2.40.0
- numpy
- pandas
- xarray
Expand Down
27 changes: 21 additions & 6 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,26 @@

from setuptools import find_packages, setup

beam_gcp_requirements = [
"google-cloud-bigquery==2.34.4",
"google-cloud-bigquery-storage==2.14.1",
"google-cloud-bigtable==1.7.2",
"google-cloud-core==1.7.3",
"google-cloud-datastore==1.15.5",
"google-cloud-dlp==3.8.0",
"google-cloud-language==1.3.2",
"google-cloud-pubsub==2.13.4",
"google-cloud-pubsublite==1.4.2",
"google-cloud-recommendations-ai==0.2.0",
"google-cloud-spanner==1.19.3",
"google-cloud-videointelligence==1.16.3",
"google-cloud-vision==1.0.2",
"apache-beam[gcp]==2.40.0",
]

weather_dl_requirements = [
"cdsapi",
"ecmwf-api-client",
"apache-beam[gcp]",
"numpy>=1.19.1",
"pandas",
"xarray",
Expand All @@ -28,12 +44,10 @@
]

weather_mv_requirements = [
"apache-beam[gcp]",
"dataclasses",
"numpy",
"pandas",
"xarray",
"google-cloud-storage==2.2.1",
"cfgrib",
"netcdf4",
"geojson",
Expand All @@ -47,7 +61,6 @@
]

weather_sp_requirements = [
"apache-beam[gcp]",
"numpy>=1.20.3",
"pygrib",
"xarray",
Expand All @@ -67,7 +80,9 @@
"metview",
]

all_test_requirements = weather_dl_requirements + weather_mv_requirements + weather_sp_requirements + test_requirements
all_test_requirements = beam_gcp_requirements + weather_dl_requirements + \
weather_mv_requirements + weather_sp_requirements + \
test_requirements

setup(
name='google-weather-tools',
Expand Down Expand Up @@ -97,7 +112,7 @@

],
python_requires='>=3.7, <3.10',
install_requires=['apache-beam[gcp]'],
install_requires=['apache-beam[gcp]==2.40.0'],
use_scm_version=True,
setup_requires=['setuptools_scm'],
scripts=['weather_dl/weather-dl', 'weather_dl/download-status',
Expand Down
48 changes: 46 additions & 2 deletions weather_dl/download_pipeline/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import calendar
import copy
import copy as cp
import dataclasses
import itertools
import typing as t

Values = t.Union[t.List['Values'], t.Dict[str, 'Values'], bool, int, float, str] # pytype: disable=not-supported-yet
Expand Down Expand Up @@ -78,7 +79,7 @@ def optimize_selection_partition(selection: t.Dict) -> t.Dict:

Used to support custom syntax and optimizations, such as 'all'.
"""
selection_ = copy.deepcopy(selection)
selection_ = cp.deepcopy(selection)

if 'day' in selection_.keys() and selection_['day'] == 'all':
year, month = selection_['year'], selection_['month']
Expand Down Expand Up @@ -109,3 +110,46 @@ def optimize_selection_partition(selection: t.Dict) -> t.Dict:
del selection_['year']

return selection_


def prepare_partitions(config: Config) -> t.Iterator[Config]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious: why did you move these functions to this file? It seems like partition.py is a good place for them.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was to avoid a cyclic dependency issue. In partitions.py we got:

from .parsers import prepare_target_name

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just reverted that commit and confirmed it'd cause an ImportError

"""Iterate over client parameters, partitioning over `partition_keys`.

This produces a Cartesian-Cross over the range of keys.

For example, if the keys were 'year' and 'month', it would produce
an iterable like:
( ('2020', '01'), ('2020', '02'), ('2020', '03'), ...)

Returns:
An iterator of `Config`s.
"""
for option in itertools.product(*[config.selection[key] for key in config.partition_keys]):
yield _create_partition_config(option, config)


def _create_partition_config(option: t.Tuple, config: Config) -> Config:
"""Create a config for a single partition option.

Output a config dictionary, overriding the range of values for
each key with the partition instance in 'selection'.
Continuing the example from prepare_partitions, the selection section
would be:
{ 'foo': ..., 'year': ['2020'], 'month': ['01'], ... }
{ 'foo': ..., 'year': ['2020'], 'month': ['02'], ... }
{ 'foo': ..., 'year': ['2020'], 'month': ['03'], ... }

Args:
option: A single item in the range of partition_keys.
config: The download config, including the parameters and selection sections.

Returns:
A configuration with that selects a single download partition.
"""
copy = cp.deepcopy(config.selection)
out = cp.deepcopy(config)
for idx, key in enumerate(config.partition_keys):
copy[key] = [option[idx]]

out.selection = copy
return out
46 changes: 41 additions & 5 deletions weather_dl/download_pipeline/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,24 @@
import copy as cp
import datetime
import json
import logging
import string
import textwrap
import typing as t
import numpy as np
from collections import OrderedDict
from urllib.parse import urlparse

from apache_beam.io.aws.s3io import S3IO
from apache_beam.io.filesystems import FileSystems
from apache_beam.io.gcp.gcsio import GcsIO

from .clients import CLIENTS
from .config import Config
from .config import Config, prepare_partitions
from .manifest import MANIFESTS, Manifest, Location, NoOpManifest

logger = logging.getLogger(__name__)


def date(candidate: str) -> datetime.date:
"""Converts ECMWF-format date strings into a `datetime.date`.
Expand Down Expand Up @@ -326,6 +333,29 @@ def _number_of_replacements(s: t.Text):
return len(set(format_names)) + num_empty_names


def _validate_target_path(config: Config):
"""Checks accessibility of target locations before execution start"""
for partition_conf in prepare_partitions(config):
target = prepare_target_name(partition_conf)
parsed = urlparse(target)
if FileSystems.exists(target):
raise ValueError(f"Path {target} already exists.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're ok with users downloading to paths that already exist. I'm not sure if we want to do this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, we have skipping logic to not re-download anything that already exists, plus a -f --force flag that let's users override this behavior. How does this validation interact with these two features?

try:
if parsed.scheme == "gs":
GcsIO().open(target, 'w').close()
elif parsed.scheme == "s3":
S3IO().open(target, 'w').close()
elif parsed.scheme == "" or parsed.scheme == "fs":
open(target, 'w').close()
Comment on lines +344 to +349
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use a FileSystems.create() call instead? I think it's a bit more portable.

else:
logger.warning(f"Unable to verify path {target} writeable."
f"Persistent IO errors when writing to the target"
f"path during execution may cause deadlocks.")
except Exception:
raise ValueError(f"Unable to write to {target}")
Comment on lines +354 to +355
Copy link
Collaborator

@alxmrs alxmrs Sep 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more idea – let's chain the errors:

Suggested change
except Exception:
raise ValueError(f"Unable to write to {target}")
except Exception as e:
raise ValueError(f"Unable to write to {target}") from e

FileSystems.delete([target])


def parse_subsections(config: t.Dict) -> t.Dict:
"""Interprets [section.subsection] as nested dictionaries in `.cfg` files."""
copy = cp.deepcopy(config)
Expand Down Expand Up @@ -441,13 +471,19 @@ def require(condition: bool, message: str, error_type: t.Type[Exception] = Value
if not isinstance(selection[key], list):
selection[key] = [selection[key]]

return Config.from_dict(config)
config = Config.from_dict(config)

_validate_target_path(config)

return config


def prepare_target_name(config: Config) -> str:
def prepare_target_name(partition_config: Config) -> str:
"""Returns name of target location."""
partition_dict = OrderedDict((key, typecast(key, config.selection[key][0])) for key in config.partition_keys)
target = config.target_path.format(*partition_dict.values(), **partition_dict)
partition_dict = OrderedDict(
(key, typecast(key, partition_config.selection[key][0]))
for key in partition_config.partition_keys)
Comment on lines +483 to +485
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with making this multiline, but then let's go all the way :)

Suggested change
partition_dict = OrderedDict(
(key, typecast(key, partition_config.selection[key][0]))
for key in partition_config.partition_keys)
partition_dict = OrderedDict(
(key, typecast(key, partition_config.selection[key][0]))
for key in partition_config.partition_keys
)

target = partition_config.target_path.format(*partition_dict.values(), **partition_dict)

return target

Expand Down
45 changes: 1 addition & 44 deletions weather_dl/download_pipeline/partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy as cp
import dataclasses
import itertools
import logging
import typing as t

import apache_beam as beam

from .config import prepare_partitions
from .manifest import Manifest
from .parsers import prepare_target_name
from .config import Config
Expand Down Expand Up @@ -84,33 +84,6 @@ def loop_through_subsections(it: Config) -> Partition:
)


def _create_partition_config(option: t.Tuple, config: Config) -> Config:
"""Create a config for a single partition option.

Output a config dictionary, overriding the range of values for
each key with the partition instance in 'selection'.
Continuing the example from prepare_partitions, the selection section
would be:
{ 'foo': ..., 'year': ['2020'], 'month': ['01'], ... }
{ 'foo': ..., 'year': ['2020'], 'month': ['02'], ... }
{ 'foo': ..., 'year': ['2020'], 'month': ['03'], ... }

Args:
option: A single item in the range of partition_keys.
config: The download config, including the parameters and selection sections.

Returns:
A configuration with that selects a single download partition.
"""
copy = cp.deepcopy(config.selection)
out = cp.deepcopy(config)
for idx, key in enumerate(config.partition_keys):
copy[key] = [option[idx]]

out.selection = copy
return out


def skip_partition(config: Config, store: Store) -> bool:
"""Return true if partition should be skipped."""

Expand All @@ -125,22 +98,6 @@ def skip_partition(config: Config, store: Store) -> bool:
return False


def prepare_partitions(config: Config) -> t.Iterator[Config]:
"""Iterate over client parameters, partitioning over `partition_keys`.

This produces a Cartesian-Cross over the range of keys.

For example, if the keys were 'year' and 'month', it would produce
an iterable like:
( ('2020', '01'), ('2020', '02'), ('2020', '03'), ...)

Returns:
An iterator of `Config`s.
"""
for option in itertools.product(*[config.selection[key] for key in config.partition_keys]):
yield _create_partition_config(option, config)


def new_downloads_only(candidate: Config, store: t.Optional[Store] = None) -> bool:
"""Predicate function to skip already downloaded partitions."""
if store is None:
Expand Down
22 changes: 19 additions & 3 deletions weather_dl/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,33 @@

from setuptools import setup, find_packages

beam_gcp_requirements = [
"google-cloud-bigquery==2.34.4",
"google-cloud-bigquery-storage==2.14.1",
"google-cloud-bigtable==1.7.2",
"google-cloud-core==1.7.3",
"google-cloud-datastore==1.15.5",
"google-cloud-dlp==3.8.0",
"google-cloud-language==1.3.2",
"google-cloud-pubsub==2.13.4",
"google-cloud-pubsublite==1.4.2",
"google-cloud-recommendations-ai==0.2.0",
"google-cloud-spanner==1.19.3",
"google-cloud-videointelligence==1.16.3",
"google-cloud-vision==1.0.2",
"apache-beam[gcp]==2.40.0",
]

base_requirements = [
"cdsapi",
"ecmwf-api-client",
"apache-beam[gcp]",
"numpy>=1.19.1",
"pandas",
"xarray",
"requests>=2.24.0",
"firebase-admin>=5.0.0",
"google-cloud-firestore",
"urllib3==1.26.5",
"google-cloud-firestore==2.6.0",
]

setup(
Expand All @@ -35,5 +51,5 @@
author_email='anthromets-ecmwf@google.com',
url='https://weather-tools.readthedocs.io/en/latest/weather_dl/',
description='A tool to download weather data.',
install_requires=base_requirements,
install_requires=beam_gcp_requirements + base_requirements,
)
21 changes: 18 additions & 3 deletions weather_mv/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,28 @@

from setuptools import setup, find_packages, Command

beam_gcp_requirements = [
"google-cloud-bigquery==2.34.4",
"google-cloud-bigquery-storage==2.14.1",
"google-cloud-bigtable==1.7.2",
"google-cloud-core==1.7.3",
"google-cloud-datastore==1.15.5",
"google-cloud-dlp==3.8.0",
"google-cloud-language==1.3.2",
"google-cloud-pubsub==2.13.4",
"google-cloud-pubsublite==1.4.2",
"google-cloud-recommendations-ai==0.2.0",
"google-cloud-spanner==1.19.3",
"google-cloud-videointelligence==1.16.3",
"google-cloud-vision==1.0.2",
"apache-beam[gcp]==2.40.0",
]

base_requirements = [
"apache-beam[gcp]",
"dataclasses",
"numpy",
"pandas",
"xarray",
"google-cloud-storage==2.2.1",
"cfgrib",
"netcdf4",
"geojson",
Expand Down Expand Up @@ -133,7 +148,7 @@ def run(self):
version='0.2.5',
url='https://weather-tools.readthedocs.io/en/latest/weather_mv/',
description='A tool to load weather data into BigQuery.',
install_requires=base_requirements,
install_requires=beam_gcp_requirements + base_requirements,
cmdclass={
# Command class instantiated and run during pip install scenarios.
'build': build,
Expand Down
Loading