Skip to content

Commit

Permalink
cover local copy in restore integration test
Browse files Browse the repository at this point in the history
Run the restore integration suite both with direct and local copy.
  • Loading branch information
dmitry-potepalov committed Aug 28, 2024
1 parent 8b2b964 commit 13eb453
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 7 deletions.
10 changes: 7 additions & 3 deletions astacus/coordinator/plugins/clickhouse/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from astacus.coordinator.plugins.zookeeper_config import ZooKeeperConfiguration
from collections.abc import Mapping, Sequence
from pathlib import Path
from typing import Literal

import enum

Expand Down Expand Up @@ -40,12 +39,17 @@ class DiskType(enum.Enum):
object_storage = "object_storage"


class CopyMethod(enum.StrEnum):
direct = "direct"
local = "local"


class DirectCopyConfig(AstacusModel):
method: Literal["direct"] = "direct"
method: CopyMethod = CopyMethod.direct


class LocalCopyConfig(AstacusModel):
method: Literal["local"] = "local"
method: CopyMethod = CopyMethod.local
temporary_directory: str


Expand Down
19 changes: 18 additions & 1 deletion tests/integration/coordinator/plugins/clickhouse/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
from astacus.coordinator.plugins.clickhouse.config import (
ClickHouseConfiguration,
ClickHouseNode,
CopyMethod,
DirectCopyConfig,
DiskConfiguration,
DiskObjectStorageConfiguration,
DiskType,
LocalCopyConfig,
ReplicatedDatabaseSettings,
)
from astacus.coordinator.plugins.clickhouse.plugin import ClickHousePlugin
Expand Down Expand Up @@ -320,10 +323,11 @@ async def create_astacus_cluster(
clickhouse_cluster: ClickHouseServiceCluster,
ports: Ports,
minio_bucket: MinioBucket,
object_storage_copy_method: CopyMethod = CopyMethod.direct,
restorable_source: RestorableSource | None = None,
) -> AsyncIterator[ServiceCluster]:
configs = create_astacus_configs(
zookeeper, clickhouse_cluster, ports, Path(storage_path), minio_bucket, restorable_source
zookeeper, clickhouse_cluster, ports, Path(storage_path), minio_bucket, object_storage_copy_method, restorable_source
)
async with contextlib.AsyncExitStack() as stack:
astacus_services_coro: Sequence[Awaitable] = [
Expand Down Expand Up @@ -506,12 +510,23 @@ def run_astacus_command(astacus_cluster: ServiceCluster, *args: str) -> None:
raise AstacusCommandError(f"Command {all_args} on {astacus_url} failed")


def create_object_storage_copy_config(
object_storage_copy_method: CopyMethod, tmp_path: Path
) -> DirectCopyConfig | LocalCopyConfig:
match object_storage_copy_method:
case CopyMethod.direct:
return DirectCopyConfig()
case CopyMethod.local:
return LocalCopyConfig(temporary_directory=str(tmp_path))


def create_astacus_configs(
zookeeper: Service,
clickhouse_cluster: ClickHouseServiceCluster,
ports: Ports,
storage_path: Path,
minio_bucket: MinioBucket,
object_storage_copy_method: CopyMethod,
restorable_source: RestorableSource | None = None,
) -> Sequence[GlobalConfig]:
storage_tmp_path = storage_path / "tmp"
Expand Down Expand Up @@ -555,6 +570,7 @@ def create_astacus_configs(
bucket_name=minio_bucket.name,
prefix=restorable_source.clickhouse_object_storage_prefix,
)
object_storage_copy_config = create_object_storage_copy_config(object_storage_copy_method, storage_tmp_path)
return [
GlobalConfig(
coordinator=CoordinatorConfig(
Expand Down Expand Up @@ -599,6 +615,7 @@ def create_astacus_configs(
object_storage=DiskObjectStorageConfiguration(
default_storage="default",
storages=disk_storages,
copy_config=object_storage_copy_config,
),
),
],
Expand Down
25 changes: 22 additions & 3 deletions tests/integration/coordinator/plugins/clickhouse/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
escape_sql_string,
HttpClickHouseClient,
)
from astacus.coordinator.plugins.clickhouse.config import CopyMethod
from astacus.coordinator.plugins.clickhouse.engines import TableEngine
from astacus.coordinator.plugins.clickhouse.plugin import ClickHousePlugin
from collections.abc import AsyncIterable, AsyncIterator, Sequence
Expand Down Expand Up @@ -110,6 +111,7 @@ async def restored_cluster_manager(
stop_after_step: str | None,
clickhouse_restore_command: ClickHouseCommand,
minio_bucket: MinioBucket,
object_storage_copy_method: CopyMethod,
) -> AsyncIterator[Sequence[ClickHouseClient]]:
restorable_source = RestorableSource(
astacus_storage_path=restorable_cluster / "astacus_backup", clickhouse_object_storage_prefix="restorable/"
Expand All @@ -127,7 +129,13 @@ async def restored_cluster_manager(
with tempfile.TemporaryDirectory(prefix="storage_") as storage_path_str:
storage_path = Path(storage_path_str)
async with create_astacus_cluster(
storage_path, zookeeper, clickhouse_cluster, ports, minio_bucket, restorable_source
storage_path,
zookeeper,
clickhouse_cluster,
ports,
minio_bucket,
object_storage_copy_method,
restorable_source,
) as astacus_cluster:
# To test if we can survive transient failures during an entire restore operation,
# we first run a partial restore that stops after one of the restore steps,
Expand All @@ -148,17 +156,23 @@ async def restored_cluster_manager(
yield clients


@pytest.fixture(scope="module", name="object_storage_copy_method", params=CopyMethod)
def fixture_object_storage_copy_method(request: SubRequest) -> CopyMethod:
return request.param


@pytest.fixture(scope="module", name="restored_cluster", params=[*get_restore_steps_names(), None])
async def fixture_restored_cluster(
ports: Ports,
request: SubRequest,
restorable_cluster: Path,
clickhouse_restore_command: ClickHouseCommand,
minio_bucket: MinioBucket,
object_storage_copy_method: CopyMethod,
) -> AsyncIterable[Sequence[ClickHouseClient]]:
stop_after_step: str | None = request.param
async with restored_cluster_manager(
restorable_cluster, ports, stop_after_step, clickhouse_restore_command, minio_bucket
restorable_cluster, ports, stop_after_step, clickhouse_restore_command, minio_bucket, object_storage_copy_method
) as clients:
yield clients

Expand All @@ -172,7 +186,12 @@ async def fixture_function_restored_cluster(
) -> AsyncIterable[Sequence[ClickHouseClient]]:
async with restorable_cluster_manager(ports, clickhouse_command, function_minio_bucket) as restorable_cluster:
async with restored_cluster_manager(
restorable_cluster, ports, None, clickhouse_restore_command, function_minio_bucket
restorable_cluster,
ports,
None,
clickhouse_restore_command,
function_minio_bucket,
CopyMethod.direct, # is irrelevant in the context of materialized views restoration
) as clients:
yield clients

Expand Down

0 comments on commit 13eb453

Please sign in to comment.