Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify gcs target #218

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open

Conversation

blackvvine
Copy link
Collaborator

Fixes #209

Copy link
Collaborator

@alxmrs alxmrs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've had to stop this review short; however, I have a few notes for things to work on so far.

One general idea: Another approach to solve this would be to allow the pipeline to fail in the fetch step when this error is encountered (or, errors where failure is preferable).

@@ -109,3 +111,46 @@ def optimize_selection_partition(selection: t.Dict) -> t.Dict:
del selection_['year']

return selection_


def prepare_partitions(config: Config) -> t.Iterator[Config]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious: why did you move these functions to this file? It seems like partition.py is a good place for them.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was to avoid a cyclic dependency issue. In partitions.py we got:

from .parsers import prepare_target_name

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just reverted that commit and confirmed it'd cause an ImportError

weather_dl/download_pipeline/parsers.py Outdated Show resolved Hide resolved
weather_dl/download_pipeline/parsers.py Outdated Show resolved Hide resolved
for partition_conf in prepare_partitions(config):
target = prepare_target_name(partition_conf)
parsed = urlparse(target)
if parsed.scheme == 'gs':
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way this this validation could be written to make the check portable (to other clouds)? For example, via beam's FileSystem's API?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great point. I figured the "path writeable" check needs to distinguish between file systems so stuck to GCS. Nevertheless FileSystem does have an FS-agnostic way for checking whether the directory exists. I've updated the CL to support this.

Copy link
Collaborator

@alxmrs alxmrs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a few opened ended questions that generally ask how this validation will interact with other features in weather-dl. Also, some nits :) .

target = prepare_target_name(partition_conf)
parsed = urlparse(target)
if FileSystems.exists(target):
raise ValueError(f"Path {target} already exists.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're ok with users downloading to paths that already exist. I'm not sure if we want to do this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, we have skipping logic to not re-download anything that already exists, plus a -f --force flag that let's users override this behavior. How does this validation interact with these two features?

Comment on lines +344 to +349
if parsed.scheme == "gs":
GcsIO().open(target, 'w').close()
elif parsed.scheme == "s3":
S3IO().open(target, 'w').close()
elif parsed.scheme == "" or parsed.scheme == "fs":
open(target, 'w').close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use a FileSystems.create() call instead? I think it's a bit more portable.

Comment on lines +483 to +485
partition_dict = OrderedDict(
(key, typecast(key, partition_config.selection[key][0]))
for key in partition_config.partition_keys)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with making this multiline, but then let's go all the way :)

Suggested change
partition_dict = OrderedDict(
(key, typecast(key, partition_config.selection[key][0]))
for key in partition_config.partition_keys)
partition_dict = OrderedDict(
(key, typecast(key, partition_config.selection[key][0]))
for key in partition_config.partition_keys
)

Comment on lines +354 to +355
except Exception:
raise ValueError(f"Unable to write to {target}")
Copy link
Collaborator

@alxmrs alxmrs Sep 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more idea – let's chain the errors:

Suggested change
except Exception:
raise ValueError(f"Unable to write to {target}")
except Exception as e:
raise ValueError(f"Unable to write to {target}") from e

@alxmrs
Copy link
Collaborator

alxmrs commented Sep 15, 2022

I just re-read #209 -- a general thought: Should the fix even be in the config parser? It seems like the source of the error has to do with the pipeline args / environment.

Furthermore, another possible fix is to change what kinds of errors we choose to retry and others we let bring down the pipeline. These are generally set here:

def _retry_if_valid_input_but_server_or_socket_error_and_timeout_filter(exception) -> bool:

@blackvvine
Copy link
Collaborator Author

I just re-read #209 -- a general thought: Should the fix even be in the config parser? It seems like the source of the error has to do with the pipeline args / environment.

Furthermore, another possible fix is to change what kinds of errors we choose to retry and others we let bring down the pipeline. These are generally set here:

def _retry_if_valid_input_but_server_or_socket_error_and_timeout_filter(exception) -> bool:

I see point that logically it might fit better in the error handler, but IMO it makes more sense for the check to be done in the arg parser since it's fail-fast. Even if we fix the error handler to tear down the job in case target location is unreachable, it does needlessly incur costs for the user to run the job up to the point of failure, and it takes somewhere between 5-20 minutes for it to reach there.

@alxmrs
Copy link
Collaborator

alxmrs commented Sep 23, 2022

Ok, the validation is very reasonable to me. I like the principle of failing fast.

My one nit is: this should at least occur in the run function and not in the parser, since it's testing resource choices rather than parsing (e.g. it's validating, not parsing).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Validate weather-dl target location before starting the job
2 participants