Skip to content

Commit

Permalink
Merge pull request #228 from Aiven-Open/dmitry-potepalov-clickhouse-l…
Browse files Browse the repository at this point in the history
…ocal-object-storage-restore

clickhouse: add local copy strategy for cross-provider tiered storage restore
  • Loading branch information
sjamgade authored Aug 24, 2024
2 parents d179687 + 416dd66 commit 4c7d045
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 8 deletions.
11 changes: 11 additions & 0 deletions astacus/coordinator/plugins/clickhouse/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from astacus.coordinator.plugins.zookeeper_config import ZooKeeperConfiguration
from collections.abc import Mapping, Sequence
from pathlib import Path
from typing import Literal

import enum

Expand Down Expand Up @@ -39,9 +40,19 @@ class DiskType(enum.Enum):
object_storage = "object_storage"


class DirectCopyConfig(AstacusModel):
method: Literal["direct"] = "direct"


class LocalCopyConfig(AstacusModel):
method: Literal["local"] = "local"
temporary_directory: str


class DiskObjectStorageConfiguration(AstacusModel):
default_storage: str
storages: Mapping[str, RohmuStorageConfig]
copy_config: DirectCopyConfig | LocalCopyConfig = DirectCopyConfig()


class DiskConfiguration(AstacusModel):
Expand Down
3 changes: 2 additions & 1 deletion astacus/coordinator/plugins/clickhouse/disks.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ def from_disk_config(cls, config: DiskConfiguration, storage_name: str | None =
else:
config_name = storage_name if storage_name is not None else config.object_storage.default_storage
object_storage_config = config.object_storage.storages[config_name]
copy_config = config.object_storage.copy_config

def create_storage() -> ObjectStorage:
return ThreadSafeRohmuStorage(config=object_storage_config)
return ThreadSafeRohmuStorage(config=object_storage_config, copy_config=copy_config)

object_storage_factory = create_storage
return Disk(
Expand Down
51 changes: 47 additions & 4 deletions astacus/coordinator/plugins/clickhouse/object_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from .config import DirectCopyConfig, LocalCopyConfig
from abc import ABC, abstractmethod
from astacus.common.rohmustorage import RohmuStorageConfig
from astacus.common.statsd import StatsClient
from collections.abc import Iterator, Sequence
from rohmu import BaseTransfer
from rohmu.errors import FileNotFoundFromStorageError
Expand All @@ -14,6 +16,7 @@
import datetime
import logging
import rohmu
import tempfile
import threading

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -43,13 +46,14 @@ def delete_item(self, key: str) -> None:
...

@abstractmethod
def copy_items_from(self, source: "ObjectStorage", keys: Sequence[str]) -> None:
def copy_items_from(self, source: "ObjectStorage", keys: Sequence[str], *, stats: StatsClient | None) -> None:
...


class ThreadSafeRohmuStorage(ObjectStorage):
def __init__(self, config: RohmuStorageConfig) -> None:
def __init__(self, *, config: RohmuStorageConfig, copy_config: DirectCopyConfig | LocalCopyConfig) -> None:
self.config = config
self.copy_config = copy_config
self._storage = rohmu.get_transfer_from_model(config)
self._storage_lock = threading.Lock()

Expand All @@ -68,7 +72,7 @@ def delete_item(self, key: str) -> None:
with self._storage_lock:
self._storage.delete_key(key)

def copy_items_from(self, source: ObjectStorage, keys: Sequence[str]) -> None:
def copy_items_from(self, source: ObjectStorage, keys: Sequence[str], *, stats: StatsClient | None) -> None:
# In theory this could deadlock if some other place was locking the same two storages
# in the reverse order at the same time. Within the context of backups and restore,
# it's quite unlikely to have a pair of storages be the source and target of each other.
Expand All @@ -77,14 +81,53 @@ def copy_items_from(self, source: ObjectStorage, keys: Sequence[str]) -> None:
raise NotImplementedError("Copying items is only supported from another ThreadSafeRohmuStorage")
with source.get_storage() as source_storage:
with self.get_storage() as target_storage:
self._copy_items_between(keys, source_storage=source_storage, target_storage=target_storage, stats=stats)

def _copy_items_between(
self,
keys: Sequence[str],
*,
source_storage: BaseTransfer[Any],
target_storage: BaseTransfer[Any],
stats: StatsClient | None,
) -> None:
match self.copy_config:
case DirectCopyConfig():
logger.info("Copying the keys using the cloud APIs")
target_storage.copy_files_from(source=source_storage, keys=keys)
case LocalCopyConfig():
logger.info("Copying the keys by downloading from source/uploading to target")
_copy_via_local_filesystem(
keys, source=source_storage, target=target_storage, copy_config=self.copy_config, stats=stats
)

@contextlib.contextmanager
def get_storage(self) -> Iterator[BaseTransfer[Any]]:
with self._storage_lock:
yield self._storage


def _copy_via_local_filesystem(
keys: Sequence[str],
*,
source: BaseTransfer[Any],
target: BaseTransfer[Any],
copy_config: LocalCopyConfig,
stats: StatsClient | None,
) -> None:
keys_to_copy = len(keys)
for keys_copied, key in enumerate(keys):
with tempfile.TemporaryFile(dir=copy_config.temporary_directory) as temp_file:
metadata = source.get_contents_to_fileobj(key, temp_file)
target.store_file_object(key, temp_file, metadata)
if stats:
stats.gauge(
"astacus_restore_clickhouse_tiered_storage_keys_remaining",
keys_to_copy - keys_copied,
tags={"copy_method": copy_config.method},
)


@dataclasses.dataclass(frozen=True)
class MemoryObjectStorage(ObjectStorage):
items: dict[str, ObjectStorageItem] = dataclasses.field(default_factory=dict)
Expand Down Expand Up @@ -114,7 +157,7 @@ def delete_item(self, key: str) -> None:
logger.info("deleting item: %r", key)
self.items.pop(key)

def copy_items_from(self, source: "ObjectStorage", keys: Sequence[str]) -> None:
def copy_items_from(self, source: "ObjectStorage", keys: Sequence[str], *, stats: StatsClient | None) -> None:
keys_set = set(keys)
for source_item in source.list_items():
if source_item.key in keys_set:
Expand Down
2 changes: 1 addition & 1 deletion astacus/coordinator/plugins/clickhouse/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ def get_restore_steps(self, *, context: OperationContext, req: RestoreRequest) -
sync_timeout=self.sync_databases_timeout,
),
MapNodesStep(partial_restore_nodes=req.partial_restore_nodes),
RestoreStep(storage_name=context.storage_name, partial_restore_nodes=req.partial_restore_nodes),
RestoreObjectStorageFilesStep(source_disks=source_disks, target_disks=disks),
RestoreStep(storage_name=context.storage_name, partial_restore_nodes=req.partial_restore_nodes),
AttachMergeTreePartsStep(
clients=clients,
disks=disks,
Expand Down
2 changes: 1 addition & 1 deletion astacus/coordinator/plugins/clickhouse/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ def run_sync_step(self, cluster: Cluster, context: StepsContext) -> None:
try:
if source_storage.get_config() != target_storage.get_config():
paths = [file.path for file in object_storage_files.files]
target_storage.copy_items_from(source_storage, paths)
target_storage.copy_items_from(source_storage, paths, stats=cluster.stats)
finally:
target_storage.close()
finally:
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/coordinator/plugins/clickhouse/test_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,7 @@ async def test_restore_object_storage_files() -> None:
assert target_object_storage.list_items() == object_storage_items


async def test_restore_object_storage_files_does_nothing_is_storages_have_same_config() -> None:
async def test_restore_object_storage_files_does_nothing_if_storages_have_same_config() -> None:
same_object_storage = mock.Mock(spec_set=ObjectStorage)
source_disks = Disks(disks=[create_object_storage_disk("remote", same_object_storage)])
target_disks = Disks(disks=[create_object_storage_disk("remote", same_object_storage)])
Expand Down

0 comments on commit 4c7d045

Please sign in to comment.