Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cover local copy in restore integration test #231

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions astacus/coordinator/plugins/clickhouse/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from astacus.coordinator.plugins.zookeeper_config import ZooKeeperConfiguration
from collections.abc import Mapping, Sequence
from pathlib import Path
from typing import Literal

import enum

Expand Down Expand Up @@ -40,12 +39,17 @@ class DiskType(enum.Enum):
object_storage = "object_storage"


class CopyMethod(enum.StrEnum):
direct = "direct"
local = "local"


class DirectCopyConfig(AstacusModel):
method: Literal["direct"] = "direct"
method: CopyMethod = CopyMethod.direct


class LocalCopyConfig(AstacusModel):
method: Literal["local"] = "local"
method: CopyMethod = CopyMethod.local
temporary_directory: str


Expand Down
19 changes: 18 additions & 1 deletion tests/integration/coordinator/plugins/clickhouse/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
from astacus.coordinator.plugins.clickhouse.config import (
ClickHouseConfiguration,
ClickHouseNode,
CopyMethod,
DirectCopyConfig,
DiskConfiguration,
DiskObjectStorageConfiguration,
DiskType,
LocalCopyConfig,
ReplicatedDatabaseSettings,
)
from astacus.coordinator.plugins.clickhouse.plugin import ClickHousePlugin
Expand Down Expand Up @@ -320,10 +323,11 @@ async def create_astacus_cluster(
clickhouse_cluster: ClickHouseServiceCluster,
ports: Ports,
minio_bucket: MinioBucket,
object_storage_copy_method: CopyMethod = CopyMethod.direct,
restorable_source: RestorableSource | None = None,
) -> AsyncIterator[ServiceCluster]:
configs = create_astacus_configs(
zookeeper, clickhouse_cluster, ports, Path(storage_path), minio_bucket, restorable_source
zookeeper, clickhouse_cluster, ports, Path(storage_path), minio_bucket, object_storage_copy_method, restorable_source
)
async with contextlib.AsyncExitStack() as stack:
astacus_services_coro: Sequence[Awaitable] = [
Expand Down Expand Up @@ -506,12 +510,23 @@ def run_astacus_command(astacus_cluster: ServiceCluster, *args: str) -> None:
raise AstacusCommandError(f"Command {all_args} on {astacus_url} failed")


def create_object_storage_copy_config(
object_storage_copy_method: CopyMethod, tmp_path: Path
) -> DirectCopyConfig | LocalCopyConfig:
match object_storage_copy_method:
case CopyMethod.direct:
return DirectCopyConfig()
case CopyMethod.local:
return LocalCopyConfig(temporary_directory=str(tmp_path))


def create_astacus_configs(
zookeeper: Service,
clickhouse_cluster: ClickHouseServiceCluster,
ports: Ports,
storage_path: Path,
minio_bucket: MinioBucket,
object_storage_copy_method: CopyMethod,
restorable_source: RestorableSource | None = None,
) -> Sequence[GlobalConfig]:
storage_tmp_path = storage_path / "tmp"
Expand Down Expand Up @@ -555,6 +570,7 @@ def create_astacus_configs(
bucket_name=minio_bucket.name,
prefix=restorable_source.clickhouse_object_storage_prefix,
)
object_storage_copy_config = create_object_storage_copy_config(object_storage_copy_method, storage_tmp_path)
return [
GlobalConfig(
coordinator=CoordinatorConfig(
Expand Down Expand Up @@ -599,6 +615,7 @@ def create_astacus_configs(
object_storage=DiskObjectStorageConfiguration(
default_storage="default",
storages=disk_storages,
copy_config=object_storage_copy_config,
),
),
],
Expand Down
54 changes: 48 additions & 6 deletions tests/integration/coordinator/plugins/clickhouse/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
escape_sql_string,
HttpClickHouseClient,
)
from astacus.coordinator.plugins.clickhouse.config import CopyMethod
from astacus.coordinator.plugins.clickhouse.engines import TableEngine
from astacus.coordinator.plugins.clickhouse.plugin import ClickHousePlugin
from collections.abc import AsyncIterable, AsyncIterator, Sequence
Expand All @@ -28,10 +29,11 @@
RestorableSource,
run_astacus_command,
)
from typing import Final
from typing import Final, Iterable
from unittest import mock

import base64
import dataclasses
import pytest
import tempfile

Expand Down Expand Up @@ -110,6 +112,7 @@ async def restored_cluster_manager(
stop_after_step: str | None,
clickhouse_restore_command: ClickHouseCommand,
minio_bucket: MinioBucket,
object_storage_copy_method: CopyMethod,
) -> AsyncIterator[Sequence[ClickHouseClient]]:
restorable_source = RestorableSource(
astacus_storage_path=restorable_cluster / "astacus_backup", clickhouse_object_storage_prefix="restorable/"
Expand All @@ -127,7 +130,13 @@ async def restored_cluster_manager(
with tempfile.TemporaryDirectory(prefix="storage_") as storage_path_str:
storage_path = Path(storage_path_str)
async with create_astacus_cluster(
storage_path, zookeeper, clickhouse_cluster, ports, minio_bucket, restorable_source
storage_path,
zookeeper,
clickhouse_cluster,
ports,
minio_bucket,
object_storage_copy_method,
restorable_source,
) as astacus_cluster:
# To test if we can survive transient failures during an entire restore operation,
# we first run a partial restore that stops after one of the restore steps,
Expand All @@ -148,17 +157,45 @@ async def restored_cluster_manager(
yield clients


@pytest.fixture(scope="module", name="restored_cluster", params=[*get_restore_steps_names(), None])
@dataclasses.dataclass(frozen=True, kw_only=True)
class RestorationTestCase:
stop_after_step: str | None
object_storage_copy_method: CopyMethod

@property
def test_id(self) -> str:
return self.stop_after_step or self.object_storage_copy_method


def restoration_test_cases() -> Iterable[RestorationTestCase]:
# Does not make sense to test both copy methods in partial failure scenarios, since the post-effect of both
# copy methods is the same: data in the target tiered storage. Test only direct copy.
partial_failure_cases = [
RestorationTestCase(stop_after_step=step, object_storage_copy_method=CopyMethod.direct)
for step in get_restore_steps_names()
]
different_copy_method_cases = [
RestorationTestCase(stop_after_step=None, object_storage_copy_method=method) for method in CopyMethod
]
return partial_failure_cases + different_copy_method_cases


@pytest.fixture(scope="module", name="restored_cluster", params=restoration_test_cases(), ids=lambda p: p.test_id)
async def fixture_restored_cluster(
ports: Ports,
request: SubRequest,
restorable_cluster: Path,
clickhouse_restore_command: ClickHouseCommand,
minio_bucket: MinioBucket,
) -> AsyncIterable[Sequence[ClickHouseClient]]:
stop_after_step: str | None = request.param
test_case: RestorationTestCase = request.param
async with restored_cluster_manager(
restorable_cluster, ports, stop_after_step, clickhouse_restore_command, minio_bucket
restorable_cluster,
ports,
test_case.stop_after_step,
clickhouse_restore_command,
minio_bucket,
test_case.object_storage_copy_method,
) as clients:
yield clients

Expand All @@ -172,7 +209,12 @@ async def fixture_function_restored_cluster(
) -> AsyncIterable[Sequence[ClickHouseClient]]:
async with restorable_cluster_manager(ports, clickhouse_command, function_minio_bucket) as restorable_cluster:
async with restored_cluster_manager(
restorable_cluster, ports, None, clickhouse_restore_command, function_minio_bucket
restorable_cluster,
ports,
None,
clickhouse_restore_command,
function_minio_bucket,
CopyMethod.direct, # is irrelevant in the context of materialized views restoration
) as clients:
yield clients

Expand Down
Loading