Skip to content

Commit

Permalink
Operator Persistence - snapshots merging (#7796)
Browse files Browse the repository at this point in the history
Co-authored-by: Sergey <sergey@pathway.com>
GitOrigin-RevId: 80ffb0d50c2e761646f7d36cf743a332ea687b59
  • Loading branch information
2 people authored and Manul from Pathway committed Dec 17, 2024
1 parent 6ade4ac commit 1cba641
Show file tree
Hide file tree
Showing 8 changed files with 765 additions and 159 deletions.
121 changes: 88 additions & 33 deletions integration_tests/wordcount/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import random
import shutil
import subprocess
import threading
import time
import uuid
import warnings
Expand Down Expand Up @@ -187,7 +188,7 @@ def check_output_correctness(
), f"Word: {word} Output count: {output_count} Input count: {input_word_counts.get(word)}"

if not interrupted_run:
assert n_old_lines < DEFAULT_INPUT_SIZE / 10, (
assert latest_input_file is None or n_old_lines < DEFAULT_INPUT_SIZE / 10, (
f"Output contains too many old lines: {n_old_lines} while 1/10 of the input size "
+ f"is {DEFAULT_INPUT_SIZE / 10}"
)
Expand Down Expand Up @@ -335,7 +336,7 @@ def run_pw_program_suddenly_terminate(
input_path=input_path,
output_path=output_path,
pstorage_path=pstorage_path,
mode=STATIC_MODE_NAME,
mode=STREAMING_MODE_NAME,
pstorage_type=pstorage_type,
persistence_mode=persistence_mode,
first_port=first_port,
Expand Down Expand Up @@ -391,10 +392,15 @@ def generate_dictionary(dict_size: int) -> list[str]:
DICTIONARY: list[str] = generate_dictionary(10000)


def generate_input(file_name, input_size, commit_frequency):
def generate_input(
file_name: pathlib.Path | str,
input_size: int,
commit_frequency: int,
dictionary: list[str],
) -> None:
with open(file_name, "w") as fw:
for seq_line_id in range(input_size):
word = random.choice(DICTIONARY)
word = random.choice(dictionary)
dataset_line_dict = {"word": word}
dataset_line = json.dumps(dataset_line_dict)
fw.write(dataset_line + "\n")
Expand All @@ -403,13 +409,20 @@ def generate_input(file_name, input_size, commit_frequency):
pass


def generate_next_input(inputs_path):
def generate_next_input(
inputs_path: pathlib.Path,
*,
input_size: int | None = None,
dictionary: list[str] | None = None,
commit_frequency: int | None = None,
) -> str:
file_name = os.path.join(inputs_path, str(time.time()))

generate_input(
file_name=file_name,
input_size=DEFAULT_INPUT_SIZE,
commit_frequency=100000,
input_size=input_size or DEFAULT_INPUT_SIZE,
commit_frequency=commit_frequency or 100000,
dictionary=dictionary or DICTIONARY,
)

return file_name
Expand Down Expand Up @@ -454,7 +467,47 @@ def do_test_persistent_wordcount(
print(f"Run {n_run}: finished")


def do_test_failure_recovery_static(
class InputGenerator:
def __init__(
self,
inputs_path: pathlib.Path,
input_size: int,
max_files: int,
waiting_time: float,
dictionary_size: int,
commit_frequency: int,
) -> None:
self.inputs_path = inputs_path
self.input_size = input_size
self.max_files = max_files
self.waiting_time = waiting_time
self.should_stop = False
self.dictionary = generate_dictionary(dictionary_size)
self.commit_frequency = commit_frequency

def start(self) -> None:
def run() -> None:
for _ in range(self.max_files):
print(f"generating input of size {self.input_size}")
generate_next_input(
self.inputs_path,
input_size=self.input_size,
dictionary=self.dictionary,
commit_frequency=self.commit_frequency,
)
time.sleep(self.waiting_time)
if self.should_stop:
break

self.thread = threading.Thread(target=run, daemon=True)
self.thread.start()

def stop(self) -> None:
self.should_stop = True
self.thread.join()


def do_test_failure_recovery(
*,
n_backfilling_runs,
n_threads,
Expand All @@ -471,11 +524,17 @@ def do_test_failure_recovery_static(

with PStoragePath(pstorage_type, tmp_path) as pstorage_path:
reset_runtime(inputs_path, output_path, pstorage_path, pstorage_type)
finished = False
input_file_name = generate_next_input(inputs_path)
for n_run in range(n_backfilling_runs):
print(f"Run {n_run}: generating input")

input_generator = InputGenerator(
inputs_path,
input_size=100_000,
max_files=n_backfilling_runs * 5,
waiting_time=min_work_time / 5,
dictionary_size=100_000,
commit_frequency=1_000_000, # don't do manual commits
)
input_generator.start()
for n_run in range(n_backfilling_runs):
print(f"Run {n_run}: running pathway program")
run_pw_program_suddenly_terminate(
n_threads=n_threads,
Expand All @@ -490,26 +549,22 @@ def do_test_failure_recovery_static(
first_port=first_port,
)

finished_in_this_run = check_output_correctness(
input_file_name, inputs_path, output_path, interrupted_run=True
check_output_correctness(
None, inputs_path, output_path, interrupted_run=True
)
if finished_in_this_run:
finished = True

if finished:
print("The program finished during one of interrupted runs")
else:
elapsed = get_pw_program_run_time(
n_threads=n_threads,
n_processes=n_processes,
input_path=inputs_path,
output_path=output_path,
pstorage_path=pstorage_path,
mode=STATIC_MODE_NAME,
pstorage_type=pstorage_type,
persistence_mode=persistence_mode,
first_port=first_port,
)
print("Time elapsed for non-interrupted run:", elapsed)
print("Checking correctness at the end")
check_output_correctness(input_file_name, inputs_path, output_path)
input_generator.stop()
elapsed = get_pw_program_run_time(
n_threads=n_threads,
n_processes=n_processes,
input_path=inputs_path,
output_path=output_path,
pstorage_path=pstorage_path,
mode=STATIC_MODE_NAME,
pstorage_type=pstorage_type,
persistence_mode=persistence_mode,
first_port=first_port,
)
print("Time elapsed for non-interrupted run:", elapsed)
print("Checking correctness at the end")
check_output_correctness(None, inputs_path, output_path)
2 changes: 1 addition & 1 deletion integration_tests/wordcount/pw_wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class InputSchema(pw.Schema):
format="json",
mode=args.mode,
persistent_id="1",
autocommit_duration_ms=10,
autocommit_duration_ms=100,
)
result = words.groupby(words.word).reduce(
words.word,
Expand Down
39 changes: 37 additions & 2 deletions integration_tests/wordcount/test_recovery.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright © 2024 Pathway

import argparse
import pathlib

import pytest
Expand All @@ -9,7 +10,7 @@
INPUT_PERSISTENCE_MODE_NAME,
OPERATOR_PERSISTENCE_MODE_NAME,
S3_STORAGE_NAME,
do_test_failure_recovery_static,
do_test_failure_recovery,
)


Expand Down Expand Up @@ -46,7 +47,7 @@ def test_integration_failure_recovery(
tmp_path: pathlib.Path,
port: int,
):
do_test_failure_recovery_static(
do_test_failure_recovery(
n_backfilling_runs=n_backfilling_runs,
n_threads=n_threads,
n_processes=n_processes,
Expand All @@ -57,3 +58,37 @@ def test_integration_failure_recovery(
persistence_mode=persistence_mode,
first_port=port,
)


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Simple persistence test")
parser.add_argument("--n-backfilling-runs", type=int, default=3)
parser.add_argument("--n-threads", type=int, default=1, choices=[1, 2, 4])
parser.add_argument("--n-processes", type=int, default=1, choices=[1, 2, 4])
parser.add_argument("--min-work-time", type=float, default=5.0)
parser.add_argument("--max-work-time", type=float, default=15.0)
parser.add_argument(
"--pstorage-type",
type=str,
choices=["s3", "fs"],
default="fs",
)
parser.add_argument(
"--persistence_mode",
type=str,
choices=[INPUT_PERSISTENCE_MODE_NAME, OPERATOR_PERSISTENCE_MODE_NAME],
default=INPUT_PERSISTENCE_MODE_NAME,
)
args = parser.parse_args()

do_test_failure_recovery(
n_backfilling_runs=args.n_backfilling_runs,
n_threads=args.n_threads,
n_processes=args.n_processes,
tmp_path=pathlib.Path("./"),
min_work_time=args.min_work_time,
max_work_time=args.max_work_time,
pstorage_type=args.pstorage_type,
persistence_mode=args.persistence_mode,
first_port=5670,
)
21 changes: 16 additions & 5 deletions src/persistence/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ use crate::persistence::backends::{
FilesystemKVStorage, MockKVStorage, PersistenceBackend, S3KVStorage,
};
use crate::persistence::cached_object_storage::CachedObjectStorage;
use crate::persistence::operator_snapshot::{ConcreteSnapshotReader, MultiConcreteSnapshotReader};
use crate::persistence::operator_snapshot::{
ConcreteSnapshotMerger, ConcreteSnapshotReader, ConcreteSnapshotWriter,
MultiConcreteSnapshotReader,
};
use crate::persistence::state::FinalizedTimeQuerier;
use crate::persistence::state::MetadataAccessor;
use crate::persistence::Error as PersistenceBackendError;
use crate::persistence::{PersistentId, SharedSnapshotWriter};

use super::operator_snapshot::ConcreteSnapshotWriter;

const STREAMS_DIRECTORY_NAME: &str = "streams";

pub type ConnectorWorkerPair = (PersistentId, usize);
Expand Down Expand Up @@ -431,12 +433,21 @@ impl PersistenceManagerConfig {
pub fn create_operator_snapshot_writer<D, R>(
&mut self,
persistent_id: PersistentId,
) -> Result<ConcreteSnapshotWriter<D, R>, PersistenceBackendError>
) -> Result<(ConcreteSnapshotWriter<D, R>, ConcreteSnapshotMerger), PersistenceBackendError>
where
D: ExchangeData,
R: ExchangeData + Semigroup,
{
let backend = self.get_writer_backend(persistent_id)?;
Ok(ConcreteSnapshotWriter::new(backend, self.snapshot_interval))
let merger_backend = self.get_writer_backend(persistent_id)?;
let writer = ConcreteSnapshotWriter::new(backend, self.snapshot_interval);
let metadata_backend = self.backend.create()?;
let time_querier = FinalizedTimeQuerier::new(metadata_backend, self.total_workers);
let merger = ConcreteSnapshotMerger::new::<D, R>(
merger_backend,
self.snapshot_interval,
time_querier,
);
Ok((writer, merger))
}
}
Loading

0 comments on commit 1cba641

Please sign in to comment.