Skip to content

Commit

Permalink
feat: parallel statemap installs (#60)
Browse files Browse the repository at this point in the history
  • Loading branch information
gk-kindred authored Jul 27, 2023
1 parent 6f5f42f commit 9c43ba7
Show file tree
Hide file tree
Showing 32 changed files with 1,340 additions and 1,255 deletions.
12 changes: 12 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,15 @@ COHORT_PG_PORT=5432
COHORT_PG_USER=postgres
COHORT_PG_PASSWORD=admin
COHORT_PG_DATABASE=talos-sample-cohort-dev

# Replicator and Statemap Installer Services
REPLICATOR_KAFKA_COMMIT_FREQ_MS=10000
REPLICATOR_ENABLE_STATS=false

REPLICATOR_SUFFIX_CAPACITY=100000
REPLICATOR_SUFFIX_PRUNE_THRESHOLD=1000
# REPLICATOR_SUFFIX_PRUNE_THRESHOLD=100

STATEMAP_QUEUE_ENABLE_STATS=false
STATEMAP_QUEUE_CLEANUP_FREQUENCY_MS=10000
STATEMAP_INSTALLER_THREAD_POOL=50
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

108 changes: 75 additions & 33 deletions examples/cohort_banking/examples/cohort_banking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,18 @@ use cohort::{
metrics::Stats,
model::requests::TransferRequest,
replicator::{
core::{Replicator, ReplicatorCandidate},
core::Replicator,
models::ReplicatorCandidate,
pg_replicator_installer::PgReplicatorStatemapInstaller,
services::{replicator_service::replicator_service, statemap_installer_service::installer_service},
services::{
replicator_service::{replicator_service, ReplicatorServiceConfig},
statemap_installer_service::{installation_service, StatemapInstallerConfig},
statemap_queue_service::{statemap_queue_service, StatemapQueueServiceConfig},
},
utils::get_snapshot_callback,
},
state::postgres::{data_access::PostgresApi, database::Database},
snapshot_api::SnapshotApi,
state::postgres::database::Database,
};
use examples_support::{
cohort::queue_workers::QueueProcessor,
Expand All @@ -20,13 +27,14 @@ use examples_support::{
use metrics::model::{MicroMetrics, MinMax};
use rand::Rng;
use rust_decimal::{prelude::FromPrimitive, Decimal};
use talos_certifier::ports::MessageReciever;
use talos_certifier::{env_var_with_defaults, ports::MessageReciever};
use talos_certifier_adapters::{KafkaConfig, KafkaConsumer};
use talos_suffix::{core::SuffixConfig, Suffix};
use tokio::{signal, sync::Mutex, task::JoinHandle, try_join};

type ReplicatorTaskHandle = JoinHandle<Result<(), String>>;
type InstallerTaskHandle = JoinHandle<Result<(), String>>;
type InstallerQueueTaskHandle = JoinHandle<Result<(), String>>;
type InstallationTaskHandle = JoinHandle<Result<(), String>>;
type HeartBeatReceiver = tokio::sync::watch::Receiver<u64>;

#[derive(Clone)]
Expand Down Expand Up @@ -71,7 +79,7 @@ async fn main() -> Result<(), String> {
);

// TODO: extract 100_000 into command line parameter - channel_size between replicator and installer tasks
let (h_replicator, h_installer, rx_heartbeat) = start_replicator(params.replicator_metrics, db_ref2, 100_000).await;
let (h_replicator, h_installer, h_installation, rx_heartbeat) = start_replicator(params.replicator_metrics, db_ref2, 100_000).await;

let metrics_data = Arc::new(Mutex::new(Vec::new()));
let metrics_data = Arc::clone(&metrics_data);
Expand All @@ -85,7 +93,7 @@ async fn main() -> Result<(), String> {
);

let all_async_services = tokio::spawn(async move {
let result = try_join!(h_generator, h_replicator, h_installer, h_cohort, h_metrics_collector);
let result = try_join!(h_generator, h_replicator, h_installer, h_installation, h_cohort, h_metrics_collector);
log::warn!("Result from the services ={result:?}");
});

Expand Down Expand Up @@ -115,6 +123,8 @@ async fn main() -> Result<(), String> {
Ok(())
}

// TODO: Fix and enable these lints
#[allow(unused_variables, unused_mut)]
fn start_queue_monitor(
queue: Arc<Receiver<TransferRequest>>,
mut rx_heartbeat: tokio::sync::watch::Receiver<u64>,
Expand All @@ -136,21 +146,21 @@ fn start_queue_monitor(
(*reference, reference.has_changed())
};

if queue.is_empty() && !is_count_changed {
// queue is empty and there are no signals from other workers, reduce window and try again
remaining_attempts -= 1;
log::warn!(
"Workers queue is empty and there is no activity signal from replicator. Finishing in: {} seconds...",
remaining_attempts * check_frequency.as_secs()
);
} else {
remaining_attempts = total_attempts;
log::warn!(
"Counts. Remaining: {}, processed by replicator and installer: {}",
queue.len(),
recent_heartbeat_value
);
}
// if queue.is_empty() && !is_count_changed {
// // queue is empty and there are no signals from other workers, reduce window and try again
// remaining_attempts -= 1;
// log::warn!(
// "Workers queue is empty and there is no activity signal from replicator. Finishing in: {} seconds...",
// remaining_attempts * check_frequency.as_secs()
// );
// } else {
// remaining_attempts = total_attempts;
// log::warn!(
// "Counts. Remaining: {}, processed by replicator and installer: {}",
// queue.len(),
// recent_heartbeat_value
// );
// }

tokio::time::sleep(check_frequency).await;
}
Expand Down Expand Up @@ -179,7 +189,7 @@ async fn start_replicator(
replicator_metrics: Option<i128>,
database: Arc<Database>,
channel_size: usize,
) -> (ReplicatorTaskHandle, InstallerTaskHandle, HeartBeatReceiver) {
) -> (ReplicatorTaskHandle, InstallerQueueTaskHandle, InstallationTaskHandle, HeartBeatReceiver) {
let mut kafka_config = KafkaConfig::from_env();
kafka_config.group_id = "talos-replicator-dev".to_string();
let kafka_consumer = KafkaConsumer::new(&kafka_config);
Expand All @@ -191,10 +201,12 @@ async fn start_replicator(
let (tx_install_req, rx_install_req) = tokio::sync::mpsc::channel(channel_size);
let (tx_install_resp, rx_install_resp) = tokio::sync::mpsc::channel(channel_size);

let manual_tx_api = PostgresApi { client: database.get().await };
let (tx_installation_feedback_req, rx_installation_feedback_req) = tokio::sync::mpsc::channel(channel_size);
let (tx_installation_req, rx_installation_req) = tokio::sync::mpsc::channel(channel_size);
// let manual_tx_api = PostgresApi { client: database.get().await };
let installer = PgReplicatorStatemapInstaller {
metrics_frequency: replicator_metrics,
pg: manual_tx_api,
pg: database.clone(),
metrics: MicroMetrics::new(1_000_000_000_f32, true),
m_total: MinMax::default(),
m1_tx: MinMax::default(),
Expand All @@ -204,20 +216,50 @@ async fn start_replicator(
m5_commit: MinMax::default(),
};

// Replicator Service
let replicator_config = ReplicatorServiceConfig {
commit_frequency_ms: env_var_with_defaults!("REPLICATOR_KAFKA_COMMIT_FREQ_MS", u64, 10_000),
enable_stats: env_var_with_defaults!("REPLICATOR_ENABLE_STATS", bool, true),
};
let suffix_config = SuffixConfig {
capacity: 10,
prune_start_threshold: Some(2000),
min_size_after_prune: None,
capacity: env_var_with_defaults!("REPLICATOR_SUFFIX_CAPACITY", usize, 100_000),
prune_start_threshold: env_var_with_defaults!("REPLICATOR_SUFFIX_PRUNE_THRESHOLD", Option::<usize>, 2_000),
min_size_after_prune: env_var_with_defaults!("REPLICATOR_SUFFIX_MIN_SIZE", Option::<usize>),
};

let talos_suffix: Suffix<ReplicatorCandidate> = Suffix::with_config(suffix_config);
let replicator_v1 = Replicator::new(kafka_consumer, talos_suffix);
let future_replicator = replicator_service(tx_install_req, rx_install_resp, replicator_v1);
let future_installer = installer_service(rx_install_req, tx_install_resp, installer);
let replicator = Replicator::new(kafka_consumer, talos_suffix);
let future_replicator = replicator_service(tx_install_req, rx_install_resp, replicator, replicator_config);

// Statemap Queue Service
let get_snapshot_fn = get_snapshot_callback(SnapshotApi::query(database.clone()));

let enable_stats = env_var_with_defaults!("STATEMAP_QUEUE_ENABLE_STATS", bool, true);
let queue_cleanup_frequency_ms = env_var_with_defaults!("STATEMAP_QUEUE_CLEANUP_FREQUENCY_MS", u64, 10_000);
let queue_config = StatemapQueueServiceConfig {
enable_stats,
queue_cleanup_frequency_ms,
};
let future_installer_queue = statemap_queue_service(rx_install_req, rx_installation_feedback_req, tx_installation_req, get_snapshot_fn, queue_config);

// Installation Service
let installer_thread_pool = env_var_with_defaults!("STATEMAP_INSTALLER_THREAD_POOL", Option::<u16>, 50);
let installer_config = StatemapInstallerConfig {
thread_pool: installer_thread_pool,
};
let future_installation = installation_service(
tx_install_resp,
Arc::new(installer),
rx_installation_req,
tx_installation_feedback_req,
installer_config,
);

let h_replicator = tokio::spawn(future_replicator);
let h_installer = tokio::spawn(future_installer);
let h_installer = tokio::spawn(future_installer_queue);
let h_installation = tokio::spawn(future_installation);

(h_replicator, h_installer, rx_heartbeat)
(h_replicator, h_installer, h_installation, rx_heartbeat)
}

fn create_transfer_request() -> TransferRequest {
Expand Down
9 changes: 9 additions & 0 deletions packages/cohort/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ rust_decimal = { version = "1.30.0", features = ["db-tokio-postgres", "serd
strum = { version = "0.24", features = ["derive"] }
uuid = { version = "1.2.2", features = ["v4"] }

# indexmap
indexmap = { version = "2.0.0", features = ["rayon"]}
ahash = "0.8.3"
# rayon = { version = "1.7.0"}


# tokio-utils
tokio-util = "0.7.8"

# local package dependecies
metrics = { path = "../metrics" }
talos_agent = { path = "../talos_agent" }
Expand Down
67 changes: 53 additions & 14 deletions packages/cohort/src/bin/replicator.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,26 @@
// $coverage:ignore-start

use std::sync::Arc;

use cohort::{
config_loader::ConfigLoader,
replicator::{
core::{Replicator, ReplicatorCandidate},
core::Replicator,
models::ReplicatorCandidate,
pg_replicator_installer::PgReplicatorStatemapInstaller,
services::{replicator_service::replicator_service, statemap_installer_service::installer_service},
services::{
replicator_service::{replicator_service, ReplicatorServiceConfig},
statemap_installer_service::{installation_service, StatemapInstallerConfig},
statemap_queue_service::{statemap_queue_service, StatemapQueueServiceConfig},
},
utils::get_snapshot_callback,
},
state::postgres::{data_access::PostgresApi, database::Database},
snapshot_api::SnapshotApi,
state::postgres::database::Database,
};
use log::{info, warn};
use metrics::model::{MicroMetrics, MinMax};
use talos_certifier::ports::MessageReciever;
use talos_certifier::{env_var_with_defaults, ports::MessageReciever};
use talos_certifier_adapters::{KafkaConfig, KafkaConsumer};
use talos_suffix::{core::SuffixConfig, Suffix};
use tokio::{signal, sync::mpsc, try_join};
Expand All @@ -30,10 +39,14 @@ async fn main() {
kafka_consumer.subscribe().await.unwrap();

// c. Create suffix.
let replicator_config = ReplicatorServiceConfig {
commit_frequency_ms: env_var_with_defaults!("REPLICATOR_KAFKA_COMMIT_FREQ_MS", u64, 10_000),
enable_stats: env_var_with_defaults!("REPLICATOR_ENABLE_STATS", bool, true),
};
let suffix_config = SuffixConfig {
capacity: 10,
prune_start_threshold: Some(2000),
min_size_after_prune: None,
capacity: env_var_with_defaults!("REPLICATOR_SUFFIX_CAPACITY", usize, 100_000),
prune_start_threshold: env_var_with_defaults!("REPLICATOR_SUFFIX_PRUNE_THRESHOLD", Option::<usize>, 2_000),
min_size_after_prune: env_var_with_defaults!("REPLICATOR_SUFFIX_MIN_SIZE", Option::<usize>),
};
let suffix: Suffix<ReplicatorCandidate> = Suffix::with_config(suffix_config);

Expand All @@ -44,11 +57,11 @@ async fn main() {
// e. Create postgres statemap installer instance.
let cfg_db = ConfigLoader::load_db_config().unwrap();
let database = Database::init_db(cfg_db).await;
let manual_tx_api = PostgresApi { client: database.get().await };
// let manual_tx_api = PostgresApi { client: database.get().await };

let pg_statemap_installer = PgReplicatorStatemapInstaller {
metrics_frequency: None,
pg: manual_tx_api,
pg: database.clone(),
metrics: MicroMetrics::new(1_000_000_000_f32, true),
m_total: MinMax::default(),
m1_tx: MinMax::default(),
Expand All @@ -62,17 +75,43 @@ async fn main() {
// run_talos_replicator(&mut replicator, &mut pg_statemap_installer).await;
let (replicator_tx, replicator_rx) = mpsc::channel(3_000);
let (statemap_installer_tx, statemap_installer_rx) = mpsc::channel(3_000);
let (tx_installation_req, rx_installation_req) = mpsc::channel(3_000);
let (tx_installation_feedback_req, rx_installation_feedback_req) = mpsc::channel(3_000);

let pg_statemap_installer_service = installer_service(statemap_installer_rx, replicator_tx, pg_statemap_installer);
let replicator_service = replicator_service(statemap_installer_tx, replicator_rx, replicator, replicator_config);

let replicator_service = replicator_service(statemap_installer_tx, replicator_rx, replicator);
let get_snapshot_fn = get_snapshot_callback(SnapshotApi::query(database.clone()));
let enable_stats = env_var_with_defaults!("COHORT_SQ_ENABLE_STATS", bool, true);
let queue_cleanup_frequency_ms = env_var_with_defaults!("COHORT_SQ_QUEUE_CLEANUP_FREQUENCY_MS", u64, 10_000);
let queue_config = StatemapQueueServiceConfig {
enable_stats,
queue_cleanup_frequency_ms,
};
let future_installer_queue = statemap_queue_service(
statemap_installer_rx,
rx_installation_feedback_req,
tx_installation_req,
get_snapshot_fn,
queue_config,
);

let thread_pool = env_var_with_defaults!("COHORT_INSTALLER_PARALLEL_COUNT", Option::<u16>, 50);
let installer_config = StatemapInstallerConfig { thread_pool };
let future_installation = installation_service(
replicator_tx,
Arc::new(pg_statemap_installer),
rx_installation_req,
tx_installation_feedback_req,
installer_config,
);

let replicator_handle = tokio::spawn(replicator_service);
let installer_handle = tokio::spawn(pg_statemap_installer_service);
let h_installer = tokio::spawn(future_installer_queue);
let h_installation = tokio::spawn(future_installation);

let handle = tokio::spawn(async move {
// g. Run the 2 services.
let result = try_join!(replicator_handle, installer_handle,);
// g. Run the services.
let result = try_join!(replicator_handle, h_installer, h_installation);
// h. Both the services are in infinite loops.
// We reach here only if there was an error in either of the service.
warn!("Result from the services ={result:?}");
Expand Down
1 change: 0 additions & 1 deletion packages/cohort/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ pub mod delay_controller;
pub mod metrics;
pub mod model;
pub mod replicator;
pub mod replicator2;
pub mod snapshot_api;
pub mod state; // TODO: move to /packages/src/core/
pub mod tx_batch_executor; // TODO: move to /packages/src/core/
Loading

0 comments on commit 9c43ba7

Please sign in to comment.