-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Verify gcs target #218
base: main
Are you sure you want to change the base?
Verify gcs target #218
Changes from all commits
e9f5c8a
50c2e36
3c4f0e0
7be6c71
d599fc1
14e020a
4a94b5d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -18,17 +18,24 @@ | |||||||||||||||
import copy as cp | ||||||||||||||||
import datetime | ||||||||||||||||
import json | ||||||||||||||||
import logging | ||||||||||||||||
import string | ||||||||||||||||
import textwrap | ||||||||||||||||
import typing as t | ||||||||||||||||
import numpy as np | ||||||||||||||||
from collections import OrderedDict | ||||||||||||||||
from urllib.parse import urlparse | ||||||||||||||||
|
||||||||||||||||
from apache_beam.io.aws.s3io import S3IO | ||||||||||||||||
from apache_beam.io.filesystems import FileSystems | ||||||||||||||||
from apache_beam.io.gcp.gcsio import GcsIO | ||||||||||||||||
|
||||||||||||||||
from .clients import CLIENTS | ||||||||||||||||
from .config import Config | ||||||||||||||||
from .config import Config, prepare_partitions | ||||||||||||||||
from .manifest import MANIFESTS, Manifest, Location, NoOpManifest | ||||||||||||||||
|
||||||||||||||||
logger = logging.getLogger(__name__) | ||||||||||||||||
|
||||||||||||||||
|
||||||||||||||||
def date(candidate: str) -> datetime.date: | ||||||||||||||||
"""Converts ECMWF-format date strings into a `datetime.date`. | ||||||||||||||||
|
@@ -326,6 +333,29 @@ def _number_of_replacements(s: t.Text): | |||||||||||||||
return len(set(format_names)) + num_empty_names | ||||||||||||||||
|
||||||||||||||||
|
||||||||||||||||
def _validate_target_path(config: Config): | ||||||||||||||||
"""Checks accessibility of target locations before execution start""" | ||||||||||||||||
for partition_conf in prepare_partitions(config): | ||||||||||||||||
target = prepare_target_name(partition_conf) | ||||||||||||||||
parsed = urlparse(target) | ||||||||||||||||
if FileSystems.exists(target): | ||||||||||||||||
raise ValueError(f"Path {target} already exists.") | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we're ok with users downloading to paths that already exist. I'm not sure if we want to do this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For example, we have skipping logic to not re-download anything that already exists, plus a |
||||||||||||||||
try: | ||||||||||||||||
if parsed.scheme == "gs": | ||||||||||||||||
GcsIO().open(target, 'w').close() | ||||||||||||||||
elif parsed.scheme == "s3": | ||||||||||||||||
S3IO().open(target, 'w').close() | ||||||||||||||||
elif parsed.scheme == "" or parsed.scheme == "fs": | ||||||||||||||||
open(target, 'w').close() | ||||||||||||||||
Comment on lines
+344
to
+349
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we use a |
||||||||||||||||
else: | ||||||||||||||||
logger.warning(f"Unable to verify path {target} writeable." | ||||||||||||||||
f"Persistent IO errors when writing to the target" | ||||||||||||||||
f"path during execution may cause deadlocks.") | ||||||||||||||||
except Exception: | ||||||||||||||||
raise ValueError(f"Unable to write to {target}") | ||||||||||||||||
Comment on lines
+354
to
+355
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One more idea – let's chain the errors:
Suggested change
|
||||||||||||||||
FileSystems.delete([target]) | ||||||||||||||||
|
||||||||||||||||
|
||||||||||||||||
def parse_subsections(config: t.Dict) -> t.Dict: | ||||||||||||||||
"""Interprets [section.subsection] as nested dictionaries in `.cfg` files.""" | ||||||||||||||||
copy = cp.deepcopy(config) | ||||||||||||||||
|
@@ -441,13 +471,19 @@ def require(condition: bool, message: str, error_type: t.Type[Exception] = Value | |||||||||||||||
if not isinstance(selection[key], list): | ||||||||||||||||
selection[key] = [selection[key]] | ||||||||||||||||
|
||||||||||||||||
return Config.from_dict(config) | ||||||||||||||||
config = Config.from_dict(config) | ||||||||||||||||
|
||||||||||||||||
_validate_target_path(config) | ||||||||||||||||
|
||||||||||||||||
return config | ||||||||||||||||
|
||||||||||||||||
|
||||||||||||||||
def prepare_target_name(config: Config) -> str: | ||||||||||||||||
def prepare_target_name(partition_config: Config) -> str: | ||||||||||||||||
"""Returns name of target location.""" | ||||||||||||||||
partition_dict = OrderedDict((key, typecast(key, config.selection[key][0])) for key in config.partition_keys) | ||||||||||||||||
target = config.target_path.format(*partition_dict.values(), **partition_dict) | ||||||||||||||||
partition_dict = OrderedDict( | ||||||||||||||||
(key, typecast(key, partition_config.selection[key][0])) | ||||||||||||||||
for key in partition_config.partition_keys) | ||||||||||||||||
Comment on lines
+483
to
+485
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm ok with making this multiline, but then let's go all the way :)
Suggested change
|
||||||||||||||||
target = partition_config.target_path.format(*partition_dict.values(), **partition_dict) | ||||||||||||||||
|
||||||||||||||||
return target | ||||||||||||||||
|
||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious: why did you move these functions to this file? It seems like
partition.py
is a good place for them.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was to avoid a cyclic dependency issue. In
partitions.py
we got:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just reverted that commit and confirmed it'd cause an
ImportError