Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add verify_existing option to cache #630

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions pangeo_forge_recipes/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,19 +171,26 @@ def _full_path(self, path: str) -> str:
return os.path.join(self.root_path, new_path)


@dataclass
class CacheFSSpecTarget(FlatFSSpecTarget):
"""Alias for FlatFSSpecTarget"""

verify_existing: bool = True

def cache_file(self, fname: str, secrets: Optional[dict], **open_kwargs) -> None:
# check and see if the file already exists in the cache
logger.info(f"Caching file '{fname}'")
if self.exists(fname):
exists = self.exists(fname)
if exists and self.verify_existing:
cached_size = self.size(fname)
remote_size = _get_url_size(fname, secrets, **open_kwargs)
if cached_size == remote_size:
# TODO: add checksumming here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For future reference, do we have a way of doing this without downloading the file and computing the checksum? S3 provides that information but arbitrary HTTP endpoints can't be relied upon to provide the SHA in a predictable way

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so digging into fsspec source a bit, it appears that fsspec's info method ought to be used here. It will, by default, avoid opening the file and instead attempt to get both size and checksum from a head request. It looks to me as though there's no benefit to caching with the current method if verify_existing is True, as it will get the size of an opened file rather than using a small HEAD request

Would you like me to amend this PR with some logic to lean on those methods? Probably good to provide a warning in case it is absolutely necessary to fully load the file up (as happens here) for size/checksum validation

logger.info(f"File '{fname}' is already cached")
logger.info(f"File '{fname}' is already cached, and matches remote size.")
return
elif exists and not self.verify_existing:
logger.info(f"File '{fname}' is already cached, skipping verification.")
return

input_opener = _get_opener(fname, secrets, **open_kwargs)
target_opener = self.open(fname, mode="wb")
Expand Down
37 changes: 37 additions & 0 deletions tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,43 @@ def test_caching_only_truncates_long_fnames_for_local_fs(fs_cls, fname_longer_th
assert len(fname_in_full_path) > POSIX_MAX_FNAME_LENGTH


@pytest.mark.parametrize("verify_existing", [True, False])
def test_cache_no_verify_existing(tmpdir_factory: pytest.TempdirFactory, verify_existing: bool):
tmp_src = tmpdir_factory.mktemp("src")
tmp_dst = tmpdir_factory.mktemp("dst")
cache = CacheFSSpecTarget(LocalFileSystem(), tmp_dst, verify_existing=verify_existing)
src_fname = str(tmp_src / "source.txt")

# write the source file
with open(src_fname, mode="w") as f:
f.write("0")

# cache it
cache.cache_file(src_fname, secrets=None)

# overwrite source with new data
with open(src_fname, mode="w") as f:
f.write("00")

# cache it again
cache.cache_file(src_fname, secrets=None)

# open from cache
cached_fname = cache._full_path(src_fname)
with open(cached_fname) as f:
if not verify_existing:
# if we *do not* verify the existing cache, the second caching operation will be
# skipped due to the presence of the cached filename already existing in the cache.
# we expect the data to reflect the data contained in the initial source file.
assert f.read() == "0"
else:
# if we *do verify* the length of the existing data, we will recognize that the source
# file has changed since the first caching operation, and therefore the second caching
# operation will recognize the inconsistent lengths of the source data between the first
# and second caching operations, and re-cache the data the second time around.
assert f.read() == "00"


def test_suffix(tmp_path):
assert str((FSSpecTarget(LocalFileSystem(), tmp_path) / "test").root_path) == str(
tmp_path / "test"
Expand Down