Skip to content

Commit

Permalink
Merge pull request #227 from Aiven-Open/kmichel-close-transfer
Browse files Browse the repository at this point in the history
Close transfer objects when we're done using them [DDB-1160]
  • Loading branch information
joelynch authored Jul 24, 2024
2 parents 9aee89f + 115f5b8 commit d179687
Show file tree
Hide file tree
Showing 27 changed files with 408 additions and 315 deletions.
23 changes: 5 additions & 18 deletions astacus/common/cachingjsonstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"""
from .exceptions import NotFoundException
from .storage import JsonStorage, MultiStorage
from .storage import JsonStorage
from collections.abc import Iterator

import contextlib
Expand All @@ -37,6 +37,10 @@ def __init__(self, *, backend_storage: JsonStorage, cache_storage: JsonStorage)
self.backend_storage = backend_storage
self.cache_storage = cache_storage

def close(self) -> None:
self.backend_storage.close()
self.cache_storage.close()

@property
def _backend_json_set(self) -> set[str]:
if self._backend_json_set_cache is None:
Expand Down Expand Up @@ -84,20 +88,3 @@ def upload_json_bytes(self, name: str, data: bytes | mmap.mmap) -> bool:
self.backend_storage.upload_json_bytes(name, data)
self._backend_json_set_add(name)
return True


class MultiCachingJsonStorage(MultiStorage[CachingJsonStorage]):
def __init__(self, *, backend_mstorage: MultiStorage, cache_mstorage: MultiStorage) -> None:
self.cache_mstorage = cache_mstorage
self.backend_mstorage = backend_mstorage

def get_storage(self, name: str) -> CachingJsonStorage:
return CachingJsonStorage(
backend_storage=self.backend_mstorage.get_storage(name), cache_storage=self.cache_mstorage.get_storage(name)
)

def get_default_storage_name(self) -> str:
return self.backend_mstorage.get_default_storage_name()

def list_storages(self) -> list[str]:
return self.backend_mstorage.list_storages()
19 changes: 4 additions & 15 deletions astacus/common/rohmustorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
Rohmu-specific actual object storage implementation
"""
from .storage import MultiStorage, Storage, StorageUploadResult
from .storage import Storage, StorageUploadResult
from .utils import AstacusModel, fifo_cache
from astacus.common import exceptions
from collections.abc import Iterator, Mapping
Expand Down Expand Up @@ -128,6 +128,9 @@ def __init__(self, config: RohmuConfig, *, storage: str | None = None) -> None:
if not self.config.compression.algorithm and not self.config.encryption_key_id:
raise exceptions.CompressionOrEncryptionRequired()

def close(self) -> None:
self.storage.close()

@rohmu_error_wrapper
def _download_key_to_file(self, key, f: FileLike) -> bool:
with tempfile.TemporaryFile(dir=self.config.temporary_directory) as temp_file:
Expand Down Expand Up @@ -238,17 +241,3 @@ def upload_json_bytes(self, name: str, data: bytes | mmap.mmap) -> bool:
data.seek(0)
self._upload_key_from_file(key, data, len(data))
return True


class MultiRohmuStorage(MultiStorage[RohmuStorage]):
def __init__(self, *, config: RohmuConfig) -> None:
self.config = config

def get_storage(self, name: str | None) -> RohmuStorage:
return RohmuStorage(config=self.config, storage=name)

def get_default_storage_name(self) -> str:
return self.config.default_storage

def list_storages(self) -> list[str]:
return sorted(self.config.storages.keys())
56 changes: 22 additions & 34 deletions astacus/common/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from collections.abc import Iterator
from pathlib import Path
from rohmu.typing import FileLike
from typing import BinaryIO, Callable, ContextManager, Generic, ParamSpec, TypeAlias, TypeVar
from typing import BinaryIO, Callable, ContextManager, ParamSpec, TypeAlias, TypeVar

import contextlib
import io
Expand All @@ -36,6 +36,10 @@ class StorageUploadResult(msgspec.Struct, kw_only=True, frozen=True):


class HexDigestStorage(ABC):
@abstractmethod
def close(self) -> None:
...

@abstractmethod
def delete_hexdigest(self, hexdigest: str) -> None:
...
Expand Down Expand Up @@ -69,6 +73,10 @@ def upload_hexdigest_from_file(self, hexdigest: str, f: BinaryIO, file_size: int


class JsonStorage(ABC):
@abstractmethod
def close(self) -> None:
pass

@abstractmethod
def delete_json(self, name: str) -> None:
...
Expand Down Expand Up @@ -125,6 +133,9 @@ def __init__(self, path: str | Path, *, hexdigest_suffix: str = ".dat", json_suf
self.hexdigest_suffix = hexdigest_suffix
self.json_suffix = json_suffix

def close(self) -> None:
pass

def copy(self) -> "FileStorage":
return FileStorage(path=self.path, hexdigest_suffix=self.hexdigest_suffix, json_suffix=self.json_suffix)

Expand Down Expand Up @@ -188,48 +199,25 @@ def upload_json_bytes(self, name: str, data: bytes | mmap.mmap) -> bool:
return True


class MultiStorage(Generic[T]):
def get_default_storage(self) -> T:
return self.get_storage(self.get_default_storage_name())

def get_default_storage_name(self) -> str:
raise NotImplementedError

def get_storage(self, name: str) -> T:
raise NotImplementedError

def list_storages(self) -> list[str]:
raise NotImplementedError


class MultiFileStorage(MultiStorage[FileStorage]):
def __init__(self, path, **kw):
self.path = Path(path)
self.kw = kw
self._storages = set()

def get_storage(self, name: str) -> FileStorage:
self._storages.add(name)
return FileStorage(self.path / name, **self.kw)

def get_default_storage_name(self) -> str:
return sorted(self._storages)[-1]

def list_storages(self) -> list[str]:
return sorted(self._storages)


class ThreadLocalStorage:
def __init__(self, *, storage: Storage) -> None:
self.threadlocal = threading.local()
self.storage = storage
self.local_storages: list[Storage] = []
self.local_storages_lock = threading.Lock()

@property
def local_storage(self) -> Storage:
def get_storage(self) -> Storage:
local_storage = getattr(self.threadlocal, "storage", None)
if local_storage is None:
local_storage = self.storage.copy()
with self.local_storages_lock:
self.local_storages.append(local_storage)
setattr(self.threadlocal, "storage", local_storage)
else:
assert isinstance(local_storage, Storage)
return local_storage

def close(self) -> None:
for local_storage in self.local_storages:
local_storage.close()
self.local_storages.clear()
4 changes: 2 additions & 2 deletions astacus/coordinator/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ async def _list_backups(
if cached_list_response is not None
else {}
)
list_response = await to_thread(list_backups, req=req, json_mstorage=c.json_mstorage, cache=cache)
list_response = await to_thread(list_backups, req=req, storage_factory=c.storage_factory, cache=cache)
c.state.cached_list_response = CachedListResponse(
coordinator_config=coordinator_config,
list_request=req,
Expand All @@ -158,7 +158,7 @@ def get_cache_entries_from_list_response(list_response: ipc.ListResponse) -> Cac
async def _list_delta_backups(*, storage: Annotated[str, Body()] = "", c: Coordinator = Depends(), request: Request):
req = ipc.ListRequest(storage=storage)
# This is not supposed to be called very often, no caching necessary
return await to_thread(list_delta_backups, req=req, json_mstorage=c.json_mstorage)
return await to_thread(list_delta_backups, req=req, storage_factory=c.storage_factory)


@router.post("/cleanup")
Expand Down
8 changes: 5 additions & 3 deletions astacus/coordinator/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async def create(*, c: Coordinator = Depends(), req: ipc.CleanupRequest = ipc.Cl
return CleanupOp(c=c, req=req)

def __init__(self, *, c: Coordinator, req: ipc.CleanupRequest) -> None:
context = c.get_operation_context()
operation_context = c.get_operation_context()
if req.retention is None:
retention = ipc.Retention(
minimum_backups=c.config.retention.minimum_backups,
Expand All @@ -34,8 +34,10 @@ def __init__(self, *, c: Coordinator, req: ipc.CleanupRequest) -> None:
maximum_backups=coalesce(req.retention.maximum_backups, c.config.retention.maximum_backups),
keep_days=coalesce(req.retention.keep_days, c.config.retention.keep_days),
)
steps = c.get_plugin().get_cleanup_steps(context=context, retention=retention, explicit_delete=req.explicit_delete)
super().__init__(c=c, attempts=1, steps=steps)
steps = c.get_plugin().get_cleanup_steps(
context=operation_context, retention=retention, explicit_delete=req.explicit_delete
)
super().__init__(c=c, attempts=1, steps=steps, operation_context=operation_context)


def coalesce(a: int | None, b: int | None) -> int | None:
Expand Down
Loading

0 comments on commit d179687

Please sign in to comment.