Skip to content

Commit

Permalink
chore: update per review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
gk-kindred committed Jul 27, 2023
1 parent 9bd4373 commit e8c47be
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 15 deletions.
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,4 @@ REPLICATOR_SUFFIX_PRUNE_THRESHOLD=1000

STATEMAP_QUEUE_ENABLE_STATS=false
STATEMAP_QUEUE_CLEANUP_FREQUENCY_MS=10000
STATEMAP_INSTALLER_PARALLEL_COUNT=50
STATEMAP_INSTALLER_THREAD_POOL=50
10 changes: 6 additions & 4 deletions examples/cohort_banking/examples/cohort_banking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use talos_suffix::{core::SuffixConfig, Suffix};
use tokio::{signal, sync::Mutex, task::JoinHandle, try_join};

type ReplicatorTaskHandle = JoinHandle<Result<(), String>>;
type InstallerQueuTaskHandle = JoinHandle<Result<(), String>>;
type InstallerQueueTaskHandle = JoinHandle<Result<(), String>>;
type InstallationTaskHandle = JoinHandle<Result<(), String>>;
type HeartBeatReceiver = tokio::sync::watch::Receiver<u64>;

Expand Down Expand Up @@ -189,7 +189,7 @@ async fn start_replicator(
replicator_metrics: Option<i128>,
database: Arc<Database>,
channel_size: usize,
) -> (ReplicatorTaskHandle, InstallerQueuTaskHandle, InstallationTaskHandle, HeartBeatReceiver) {
) -> (ReplicatorTaskHandle, InstallerQueueTaskHandle, InstallationTaskHandle, HeartBeatReceiver) {
let mut kafka_config = KafkaConfig::from_env();
kafka_config.group_id = "talos-replicator-dev".to_string();
let kafka_consumer = KafkaConsumer::new(&kafka_config);
Expand Down Expand Up @@ -243,8 +243,10 @@ async fn start_replicator(
let future_installer_queue = statemap_queue_service(rx_install_req, rx_installation_feedback_req, tx_installation_req, get_snapshot_fn, queue_config);

// Installation Service
let parallel_thread_count = env_var_with_defaults!("STATEMAP_INSTALLER_PARALLEL_COUNT", Option::<u16>, 50);
let installer_config = StatemapInstallerConfig { parallel_thread_count };
let installer_thread_pool = env_var_with_defaults!("STATEMAP_INSTALLER_THREAD_POOL", Option::<u16>, 50);
let installer_config = StatemapInstallerConfig {
thread_pool: installer_thread_pool,
};
let future_installation = installation_service(
tx_install_resp,
Arc::new(installer),
Expand Down
4 changes: 2 additions & 2 deletions packages/cohort/src/bin/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ async fn main() {
queue_config,
);

let parallel_thread_count = env_var_with_defaults!("COHORT_INSTALLER_PARALLEL_COUNT", Option::<u16>, 50);
let installer_config = StatemapInstallerConfig { parallel_thread_count };
let thread_pool = env_var_with_defaults!("COHORT_INSTALLER_PARALLEL_COUNT", Option::<u16>, 50);
let installer_config = StatemapInstallerConfig { thread_pool };
let future_installation = installation_service(
replicator_tx,
Arc::new(pg_statemap_installer),
Expand Down
6 changes: 3 additions & 3 deletions packages/cohort/src/replicator/services/replicator_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ where
total_items_processed += all_versions_picked.len();

let statemap_batch_cloned = statemaps_batch.clone();
let versions_not_send = all_versions_picked.into_iter().filter(|&v| {
!statemap_batch_cloned.iter().any(|sm_b| sm_b.0 != v)
let versions_not_sent = all_versions_picked.into_iter().filter(|&v| {
!statemap_batch_cloned.iter().any(|(ver, _)| ver != &v)
});


Expand All @@ -74,7 +74,7 @@ where
}

// These versions are decided but they are not send to Statemap installer as they are either aborted or don't have statemap
versions_not_send.for_each(|version| {
versions_not_sent.for_each(|version| {
replicator.suffix.set_item_installed(version);

});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use log::{debug, error};
use tokio::sync::{mpsc, Semaphore};

pub struct StatemapInstallerConfig {
pub parallel_thread_count: Option<u16>,
pub thread_pool: Option<u16>,
}

pub async fn installation_service(
Expand All @@ -18,8 +18,7 @@ pub async fn installation_service(
statemap_installation_tx: mpsc::Sender<StatemapInstallationStatus>,
config: StatemapInstallerConfig,
) -> Result<(), String> {
// TODO: Pass the number of permits over an environment variable?
let permit_count = config.parallel_thread_count.unwrap_or(50) as usize;
let permit_count = config.thread_pool.unwrap_or(50) as usize;
let semaphore = Arc::new(Semaphore::new(permit_count));

loop {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub async fn statemap_queue_service(

// let index = statemap_queue.get_index_of(&key).unwrap();

if let Some(last_contiguous_install_item) = statemap_installer_queue.queue.iter().take_while(|item| item.1.state == StatemapInstallState::Installed).last(){
if let Some(last_contiguous_install_item) = statemap_installer_queue.queue.iter().take_while(|(_, statemap_installer_item)| statemap_installer_item.state == StatemapInstallState::Installed).last(){
statemap_installer_queue.update_snapshot(last_contiguous_install_item.1.version) ;
};

Expand Down
2 changes: 1 addition & 1 deletion packages/cohort/src/snapshot_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::state::postgres::database::{Database, SNAPSHOT_SINGLETON_ROW_ID};

pub static SNAPSHOT_UPDATE_QUERY: &str = r#"UPDATE cohort_snapshot SET "version" = ($1)::BIGINT WHERE id = $2 AND "version" < ($1)::BIGINT"#;

#[derive(Debug, Clone)]
// #[derive(Debug, Clone)]
pub struct SnapshotApi {}

impl SnapshotApi {
Expand Down

0 comments on commit e8c47be

Please sign in to comment.