-
Notifications
You must be signed in to change notification settings - Fork 362
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Separate CacheMetadata class for CachingFileSystem (#1326)
* Separate CacheMetadata class for CachingFileSystem * Codespell * Pass optional CachingFileSystem to CacheMetadata.check_file * Rename CacheMetadata.close_file to on_close_cached_file * Remove CacheMetadata.empty() * Docstring for CacheMetadata.pop_file * Move atomic_write to utils.py * Fix test_metadata_save_blocked
- Loading branch information
1 parent
2107d4a
commit b6aa854
Showing
4 changed files
with
269 additions
and
146 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,206 @@ | ||
from __future__ import annotations | ||
|
||
import os | ||
import pickle | ||
import time | ||
from typing import TYPE_CHECKING | ||
|
||
from fsspec.utils import atomic_write | ||
|
||
if TYPE_CHECKING: | ||
from typing import Any, Dict, Iterator, Literal | ||
|
||
from typing_extensions import TypeAlias | ||
|
||
from .cached import CachingFileSystem | ||
|
||
Detail: TypeAlias = Dict[str, Any] | ||
|
||
|
||
class CacheMetadata: | ||
"""Cache metadata. | ||
All reading and writing of cache metadata is performed by this class, | ||
accessing the cached files and blocks is not. | ||
Metadata is stored in a single file per storage directory, pickled. | ||
""" | ||
|
||
def __init__(self, storage: list[str]): | ||
""" | ||
Parameters | ||
---------- | ||
storage: list[str] | ||
Directories containing cached files, must be at least one. Metadata | ||
is stored in the last of these directories by convention. | ||
""" | ||
if not storage: | ||
raise ValueError("CacheMetadata expects at least one storage location") | ||
|
||
self._storage = storage | ||
self.cached_files: list[Detail] = [{}] | ||
|
||
def _scan_locations( | ||
self, writable_only: bool = False | ||
) -> Iterator[tuple[str, str, bool]]: | ||
"""Yield locations (filenames) where metadata is stored, and whether | ||
writable or not. | ||
Parameters | ||
---------- | ||
writable: bool | ||
Set to True to only yield writable locations. | ||
Returns | ||
------- | ||
Yields (str, str, bool) | ||
""" | ||
n = len(self._storage) | ||
for i, storage in enumerate(self._storage): | ||
writable = i == n - 1 | ||
if writable_only and not writable: | ||
continue | ||
yield os.path.join(storage, "cache"), storage, writable | ||
|
||
def check_file( | ||
self, path: str, cfs: CachingFileSystem | None | ||
) -> Literal[False] | tuple[Detail, str]: | ||
"""If path is in cache return its details, otherwise return ``False``. | ||
If the optional CachingFileSystem is specified then it is used to | ||
perform extra checks to reject possible matches, such as if they are | ||
too old. | ||
""" | ||
for (fn, base, _), cache in zip(self._scan_locations(), self.cached_files): | ||
if path not in cache: | ||
continue | ||
detail = cache[path].copy() | ||
|
||
if cfs is not None: | ||
if cfs.check_files and detail["uid"] != cfs.fs.ukey(path): | ||
# Wrong file as determined by hash of file properties | ||
continue | ||
if cfs.expiry and time.time() - detail["time"] > cfs.expiry: | ||
# Cached file has expired | ||
continue | ||
|
||
fn = os.path.join(base, detail["fn"]) | ||
if os.path.exists(fn): | ||
return detail, fn | ||
return False | ||
|
||
def clear_expired(self, expiry_time: int) -> tuple[list[str], bool]: | ||
"""Remove expired metadata from the cache. | ||
Returns names of files corresponding to expired metadata and a boolean | ||
flag indicating whether the writable cache is empty. Caller is | ||
responsible for deleting the expired files. | ||
""" | ||
expired_files = [] | ||
for path, detail in self.cached_files[-1].copy().items(): | ||
if time.time() - detail["time"] > expiry_time: | ||
fn = detail.get("fn", "") | ||
if not fn: | ||
raise RuntimeError( | ||
f"Cache metadata does not contain 'fn' for {path}" | ||
) | ||
fn = os.path.join(self._storage[-1], fn) | ||
expired_files.append(fn) | ||
self.cached_files[-1].pop(path) | ||
|
||
if self.cached_files[-1]: | ||
cache_path = os.path.join(self._storage[-1], "cache") | ||
with atomic_write(cache_path) as fc: | ||
pickle.dump(self.cached_files[-1], fc) | ||
|
||
writable_cache_empty = not self.cached_files[-1] | ||
return expired_files, writable_cache_empty | ||
|
||
def load(self) -> None: | ||
"""Load all metadata from disk and store in ``self.cached_files``""" | ||
cached_files = [] | ||
for fn, _, _ in self._scan_locations(): | ||
if os.path.exists(fn): | ||
with open(fn, "rb") as f: | ||
# TODO: consolidate blocks here | ||
loaded_cached_files = pickle.load(f) | ||
for c in loaded_cached_files.values(): | ||
if isinstance(c["blocks"], list): | ||
c["blocks"] = set(c["blocks"]) | ||
cached_files.append(loaded_cached_files) | ||
else: | ||
cached_files.append({}) | ||
self.cached_files = cached_files or [{}] | ||
|
||
def on_close_cached_file(self, f: Any, path: str) -> None: | ||
"""Perform side-effect actions on closing a cached file. | ||
The actual closing of the file is the responsibility of the caller. | ||
""" | ||
# File must be writeble, so in self.cached_files[-1] | ||
c = self.cached_files[-1][path] | ||
if c["blocks"] is not True and len(c["blocks"]) * f.blocksize >= f.size: | ||
c["blocks"] = True | ||
|
||
def pop_file(self, path: str) -> str | None: | ||
"""Remove metadata of cached file. | ||
If path is in the cache, return the filename of the cached file, | ||
otherwise return ``None``. Caller is responsible for deleting the | ||
cached file. | ||
""" | ||
details = self.check_file(path, None) | ||
if not details: | ||
return None | ||
_, fn = details | ||
if fn.startswith(self._storage[-1]): | ||
self.cached_files[-1].pop(path) | ||
self.save() | ||
else: | ||
raise PermissionError( | ||
"Can only delete cached file in last, writable cache location" | ||
) | ||
return fn | ||
|
||
def save(self) -> None: | ||
"""Save metadata to disk""" | ||
for (fn, _, writable), cache in zip(self._scan_locations(), self.cached_files): | ||
if not writable: | ||
continue | ||
|
||
if os.path.exists(fn): | ||
with open(fn, "rb") as f: | ||
cached_files = pickle.load(f) | ||
for k, c in cached_files.items(): | ||
if k in cache: | ||
if c["blocks"] is True or cache[k]["blocks"] is True: | ||
c["blocks"] = True | ||
else: | ||
# self.cached_files[*][*]["blocks"] must continue to | ||
# point to the same set object so that updates | ||
# performed by MMapCache are propagated back to | ||
# self.cached_files. | ||
blocks = cache[k]["blocks"] | ||
blocks.update(c["blocks"]) | ||
c["blocks"] = blocks | ||
c["time"] = max(c["time"], cache[k]["time"]) | ||
c["uid"] = cache[k]["uid"] | ||
|
||
# Files can be added to cache after it was written once | ||
for k, c in cache.items(): | ||
if k not in cached_files: | ||
cached_files[k] = c | ||
else: | ||
cached_files = cache | ||
cache = {k: v.copy() for k, v in cached_files.items()} | ||
for c in cache.values(): | ||
if isinstance(c["blocks"], set): | ||
c["blocks"] = list(c["blocks"]) | ||
with atomic_write(fn) as f: | ||
pickle.dump(cache, f) | ||
self.cached_files[-1] = cached_files | ||
|
||
def update_file(self, path: str, detail: Detail) -> None: | ||
"""Update metadata for specific file in memory, do not save""" | ||
self.cached_files[-1][path] = detail |
Oops, something went wrong.