-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Verify gcs target #218
base: main
Are you sure you want to change the base?
Verify gcs target #218
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've had to stop this review short; however, I have a few notes for things to work on so far.
One general idea: Another approach to solve this would be to allow the pipeline to fail in the fetch step when this error is encountered (or, errors where failure is preferable).
@@ -109,3 +111,46 @@ def optimize_selection_partition(selection: t.Dict) -> t.Dict: | |||
del selection_['year'] | |||
|
|||
return selection_ | |||
|
|||
|
|||
def prepare_partitions(config: Config) -> t.Iterator[Config]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious: why did you move these functions to this file? It seems like partition.py
is a good place for them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was to avoid a cyclic dependency issue. In partitions.py
we got:
from .parsers import prepare_target_name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just reverted that commit and confirmed it'd cause an ImportError
for partition_conf in prepare_partitions(config): | ||
target = prepare_target_name(partition_conf) | ||
parsed = urlparse(target) | ||
if parsed.scheme == 'gs': |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way this this validation could be written to make the check portable (to other clouds)? For example, via beam's FileSystem's API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a great point. I figured the "path writeable" check needs to distinguish between file systems so stuck to GCS. Nevertheless FileSystem
does have an FS-agnostic way for checking whether the directory exists. I've updated the CL to support this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a few opened ended questions that generally ask how this validation will interact with other features in weather-dl. Also, some nits :) .
target = prepare_target_name(partition_conf) | ||
parsed = urlparse(target) | ||
if FileSystems.exists(target): | ||
raise ValueError(f"Path {target} already exists.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we're ok with users downloading to paths that already exist. I'm not sure if we want to do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, we have skipping logic to not re-download anything that already exists, plus a -f --force
flag that let's users override this behavior. How does this validation interact with these two features?
if parsed.scheme == "gs": | ||
GcsIO().open(target, 'w').close() | ||
elif parsed.scheme == "s3": | ||
S3IO().open(target, 'w').close() | ||
elif parsed.scheme == "" or parsed.scheme == "fs": | ||
open(target, 'w').close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use a FileSystems.create()
call instead? I think it's a bit more portable.
partition_dict = OrderedDict( | ||
(key, typecast(key, partition_config.selection[key][0])) | ||
for key in partition_config.partition_keys) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm ok with making this multiline, but then let's go all the way :)
partition_dict = OrderedDict( | |
(key, typecast(key, partition_config.selection[key][0])) | |
for key in partition_config.partition_keys) | |
partition_dict = OrderedDict( | |
(key, typecast(key, partition_config.selection[key][0])) | |
for key in partition_config.partition_keys | |
) |
except Exception: | ||
raise ValueError(f"Unable to write to {target}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more idea – let's chain the errors:
except Exception: | |
raise ValueError(f"Unable to write to {target}") | |
except Exception as e: | |
raise ValueError(f"Unable to write to {target}") from e |
I just re-read #209 -- a general thought: Should the fix even be in the config parser? It seems like the source of the error has to do with the pipeline args / environment. Furthermore, another possible fix is to change what kinds of errors we choose to retry and others we let bring down the pipeline. These are generally set here:
|
I see point that logically it might fit better in the error handler, but IMO it makes more sense for the check to be done in the arg parser since it's fail-fast. Even if we fix the error handler to tear down the job in case target location is unreachable, it does needlessly incur costs for the user to run the job up to the point of failure, and it takes somewhere between 5-20 minutes for it to reach there. |
Ok, the validation is very reasonable to me. I like the principle of failing fast. My one nit is: this should at least occur in the |
Fixes #209