diff --git a/astacus/coordinator/plugins/clickhouse/config.py b/astacus/coordinator/plugins/clickhouse/config.py index 739ba625..e25eb44c 100644 --- a/astacus/coordinator/plugins/clickhouse/config.py +++ b/astacus/coordinator/plugins/clickhouse/config.py @@ -9,6 +9,7 @@ from astacus.coordinator.plugins.zookeeper_config import ZooKeeperConfiguration from collections.abc import Mapping, Sequence from pathlib import Path +from typing import Literal import enum @@ -39,9 +40,19 @@ class DiskType(enum.Enum): object_storage = "object_storage" +class DirectCopyConfig(AstacusModel): + method: Literal["direct"] = "direct" + + +class LocalCopyConfig(AstacusModel): + method: Literal["local"] = "local" + temporary_directory: str + + class DiskObjectStorageConfiguration(AstacusModel): default_storage: str storages: Mapping[str, RohmuStorageConfig] + copy_config: DirectCopyConfig | LocalCopyConfig = DirectCopyConfig() class DiskConfiguration(AstacusModel): diff --git a/astacus/coordinator/plugins/clickhouse/disks.py b/astacus/coordinator/plugins/clickhouse/disks.py index 8070d4e9..cc78e9ed 100644 --- a/astacus/coordinator/plugins/clickhouse/disks.py +++ b/astacus/coordinator/plugins/clickhouse/disks.py @@ -41,9 +41,10 @@ def from_disk_config(cls, config: DiskConfiguration, storage_name: str | None = else: config_name = storage_name if storage_name is not None else config.object_storage.default_storage object_storage_config = config.object_storage.storages[config_name] + copy_config = config.object_storage.copy_config def create_storage() -> ObjectStorage: - return ThreadSafeRohmuStorage(config=object_storage_config) + return ThreadSafeRohmuStorage(config=object_storage_config, copy_config=copy_config) object_storage_factory = create_storage return Disk( diff --git a/astacus/coordinator/plugins/clickhouse/object_storage.py b/astacus/coordinator/plugins/clickhouse/object_storage.py index 15ea989b..8a69ee07 100644 --- a/astacus/coordinator/plugins/clickhouse/object_storage.py +++ b/astacus/coordinator/plugins/clickhouse/object_storage.py @@ -2,8 +2,10 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ +from .config import DirectCopyConfig, LocalCopyConfig from abc import ABC, abstractmethod from astacus.common.rohmustorage import RohmuStorageConfig +from astacus.common.statsd import StatsClient from collections.abc import Iterator, Sequence from rohmu import BaseTransfer from rohmu.errors import FileNotFoundFromStorageError @@ -14,6 +16,7 @@ import datetime import logging import rohmu +import tempfile import threading logger = logging.getLogger(__name__) @@ -43,13 +46,14 @@ def delete_item(self, key: str) -> None: ... @abstractmethod - def copy_items_from(self, source: "ObjectStorage", keys: Sequence[str]) -> None: + def copy_items_from(self, source: "ObjectStorage", keys: Sequence[str], *, stats: StatsClient | None) -> None: ... class ThreadSafeRohmuStorage(ObjectStorage): - def __init__(self, config: RohmuStorageConfig) -> None: + def __init__(self, *, config: RohmuStorageConfig, copy_config: DirectCopyConfig | LocalCopyConfig) -> None: self.config = config + self.copy_config = copy_config self._storage = rohmu.get_transfer_from_model(config) self._storage_lock = threading.Lock() @@ -68,7 +72,7 @@ def delete_item(self, key: str) -> None: with self._storage_lock: self._storage.delete_key(key) - def copy_items_from(self, source: ObjectStorage, keys: Sequence[str]) -> None: + def copy_items_from(self, source: ObjectStorage, keys: Sequence[str], *, stats: StatsClient | None) -> None: # In theory this could deadlock if some other place was locking the same two storages # in the reverse order at the same time. Within the context of backups and restore, # it's quite unlikely to have a pair of storages be the source and target of each other. @@ -77,7 +81,25 @@ def copy_items_from(self, source: ObjectStorage, keys: Sequence[str]) -> None: raise NotImplementedError("Copying items is only supported from another ThreadSafeRohmuStorage") with source.get_storage() as source_storage: with self.get_storage() as target_storage: + self._copy_items_between(keys, source_storage=source_storage, target_storage=target_storage, stats=stats) + + def _copy_items_between( + self, + keys: Sequence[str], + *, + source_storage: BaseTransfer[Any], + target_storage: BaseTransfer[Any], + stats: StatsClient | None, + ) -> None: + match self.copy_config: + case DirectCopyConfig(): + logger.info("Copying the keys using the cloud APIs") target_storage.copy_files_from(source=source_storage, keys=keys) + case LocalCopyConfig(): + logger.info("Copying the keys by downloading from source/uploading to target") + _copy_via_local_filesystem( + keys, source=source_storage, target=target_storage, copy_config=self.copy_config, stats=stats + ) @contextlib.contextmanager def get_storage(self) -> Iterator[BaseTransfer[Any]]: @@ -85,6 +107,27 @@ def get_storage(self) -> Iterator[BaseTransfer[Any]]: yield self._storage +def _copy_via_local_filesystem( + keys: Sequence[str], + *, + source: BaseTransfer[Any], + target: BaseTransfer[Any], + copy_config: LocalCopyConfig, + stats: StatsClient | None, +) -> None: + keys_to_copy = len(keys) + for keys_copied, key in enumerate(keys): + with tempfile.TemporaryFile(dir=copy_config.temporary_directory) as temp_file: + metadata = source.get_contents_to_fileobj(key, temp_file) + target.store_file_object(key, temp_file, metadata) + if stats: + stats.gauge( + "astacus_restore_clickhouse_tiered_storage_keys_remaining", + keys_to_copy - keys_copied, + tags={"copy_method": copy_config.method}, + ) + + @dataclasses.dataclass(frozen=True) class MemoryObjectStorage(ObjectStorage): items: dict[str, ObjectStorageItem] = dataclasses.field(default_factory=dict) @@ -114,7 +157,7 @@ def delete_item(self, key: str) -> None: logger.info("deleting item: %r", key) self.items.pop(key) - def copy_items_from(self, source: "ObjectStorage", keys: Sequence[str]) -> None: + def copy_items_from(self, source: "ObjectStorage", keys: Sequence[str], *, stats: StatsClient | None) -> None: keys_set = set(keys) for source_item in source.list_items(): if source_item.key in keys_set: diff --git a/astacus/coordinator/plugins/clickhouse/plugin.py b/astacus/coordinator/plugins/clickhouse/plugin.py index 6a5c6347..84ec762a 100644 --- a/astacus/coordinator/plugins/clickhouse/plugin.py +++ b/astacus/coordinator/plugins/clickhouse/plugin.py @@ -188,8 +188,8 @@ def get_restore_steps(self, *, context: OperationContext, req: RestoreRequest) - sync_timeout=self.sync_databases_timeout, ), MapNodesStep(partial_restore_nodes=req.partial_restore_nodes), - RestoreStep(storage_name=context.storage_name, partial_restore_nodes=req.partial_restore_nodes), RestoreObjectStorageFilesStep(source_disks=source_disks, target_disks=disks), + RestoreStep(storage_name=context.storage_name, partial_restore_nodes=req.partial_restore_nodes), AttachMergeTreePartsStep( clients=clients, disks=disks, diff --git a/astacus/coordinator/plugins/clickhouse/steps.py b/astacus/coordinator/plugins/clickhouse/steps.py index 03dae684..fe676de6 100644 --- a/astacus/coordinator/plugins/clickhouse/steps.py +++ b/astacus/coordinator/plugins/clickhouse/steps.py @@ -791,7 +791,7 @@ def run_sync_step(self, cluster: Cluster, context: StepsContext) -> None: try: if source_storage.get_config() != target_storage.get_config(): paths = [file.path for file in object_storage_files.files] - target_storage.copy_items_from(source_storage, paths) + target_storage.copy_items_from(source_storage, paths, stats=cluster.stats) finally: target_storage.close() finally: diff --git a/tests/unit/coordinator/plugins/clickhouse/test_steps.py b/tests/unit/coordinator/plugins/clickhouse/test_steps.py index 128700b7..a1f090b7 100644 --- a/tests/unit/coordinator/plugins/clickhouse/test_steps.py +++ b/tests/unit/coordinator/plugins/clickhouse/test_steps.py @@ -1027,7 +1027,7 @@ async def test_restore_object_storage_files() -> None: assert target_object_storage.list_items() == object_storage_items -async def test_restore_object_storage_files_does_nothing_is_storages_have_same_config() -> None: +async def test_restore_object_storage_files_does_nothing_if_storages_have_same_config() -> None: same_object_storage = mock.Mock(spec_set=ObjectStorage) source_disks = Disks(disks=[create_object_storage_disk("remote", same_object_storage)]) target_disks = Disks(disks=[create_object_storage_disk("remote", same_object_storage)])