Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update replicator retry logic #66

Merged
merged 11 commits into from
Aug 9, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ use std::sync::Arc;

use async_trait::async_trait;
use cohort_banking::{
callbacks::statemap_installer::StatemapInstallerImpl,
callbacks::statemap_installer::BankStatemapInstaller,
state::postgres::{database::Database, database_config::DatabaseConfig},
};
use talos_certifier::{env_var, env_var_with_defaults, ports::MessageReciever};
use talos_certifier_adapters::{KafkaConfig, KafkaConsumer};
use talos_cohort_replicator::{talos_cohort_replicator, CohortReplicatorConfig, ReplicatorSnapshotProvider};

use cohort_banking::state::postgres::database::DatabaseError;
use tokio::{signal, try_join};
use tokio::signal;

pub static SNAPSHOT_SINGLETON_ROW_ID: &str = "SINGLETON";

Expand Down Expand Up @@ -61,7 +61,7 @@ async fn main() {
};
let database = Database::init_db(cfg_db).await.map_err(|e| e.to_string()).unwrap();

let pg_statemap_installer = StatemapInstallerImpl {
let pg_statemap_installer = BankStatemapInstaller {
database: Arc::clone(&database),
max_retry: env_var_with_defaults!("BANK_STATEMAP_INSTALLER_MAX_RETRY", u32, 3),
retry_wait_ms: env_var_with_defaults!("BANK_STATEMAP_INSTALL_RETRY_WAIT_MS", u64, 2),
Expand All @@ -80,11 +80,8 @@ async fn main() {

let snapshot_api = SnapshotApi { db: Arc::clone(&database) };

let (replicator_handle, statemap_queue_handle, statemap_installer_handle) =
talos_cohort_replicator(kafka_consumer, Arc::new(pg_statemap_installer), snapshot_api, config).await;

let all_async_services = tokio::spawn(async move {
let result = try_join!(replicator_handle, statemap_queue_handle, statemap_installer_handle);
let result = talos_cohort_replicator(kafka_consumer, Arc::new(pg_statemap_installer), snapshot_api, config).await;
log::info!("Result from the services ={result:?}");
});

Expand Down
146 changes: 90 additions & 56 deletions packages/cohort_banking/src/callbacks/statemap_installer.rs
Original file line number Diff line number Diff line change
@@ -1,68 +1,106 @@
use std::{sync::Arc, time::Duration};

use async_trait::async_trait;
use talos_cohort_replicator::{ReplicatorInstallStatus, ReplicatorInstaller, StatemapItem};
use deadpool_postgres::Transaction;
use talos_cohort_replicator::{ReplicatorInstaller, StatemapItem};
use tokio_postgres::types::ToSql;

use crate::{model::requests::TransferRequest, state::postgres::database::Database};

pub struct StatemapInstallerImpl {
const BANK_ACCOUNTS_UPDATE_QUERY: &str = r#"
UPDATE bank_accounts ba SET
"amount" =
(CASE
WHEN ba."number" = ($1)::TEXT THEN ba."amount" + ($3)::DECIMAL
WHEN ba."number" = ($2)::TEXT THEN ba."amount" - ($3)::DECIMAL
END),
"version" = ($4)::BIGINT
WHERE ba."number" IN (($1)::TEXT, ($2)::TEXT)
AND ba."version" < ($4)::BIGINT
"#;

const SNAPSHOT_UPDATE_QUERY: &str = r#"UPDATE cohort_snapshot SET "version" = ($1)::BIGINT WHERE id = $2 AND "version" < ($1)::BIGINT"#;

fn is_retriable_error(error: &str) -> bool {
// TODO: improve retriable error detection when error moves from String to enum or struct.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, this is Postgres text. Somehow it should be abstracted depending on DB type.

error.contains("could not serialize access due to concurrent update")
}

pub struct BankStatemapInstaller {
pub database: Arc<Database>,
pub max_retry: u32,
pub retry_wait_ms: u64,
}

#[async_trait]
impl ReplicatorInstaller for StatemapInstallerImpl {
async fn install(&self, statemap: Vec<StatemapItem>, snapshot_version: u64) -> Result<ReplicatorInstallStatus, String> {
let mut current_count = 0;

loop {
// from = 1
// to = 2
// amount = 3
// new_ver = 4

let mut cnn = self.database.get().await.map_err(|e| e.to_string())?;
let tx = cnn.transaction().await.map_err(|e| e.to_string())?;
impl BankStatemapInstaller {
async fn install_bank_transfer_statemap(tx: &Transaction<'_>, statemap: &[StatemapItem], _snapshot_version: u64) -> Result<u64, String> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks

let sti = statemap[0].clone();

if !statemap.is_empty() {
let sti = statemap[0].clone();
let request: TransferRequest = serde_json::from_value(sti.payload.clone()).map_err(|e| e.to_string())?;
let params: &[&(dyn ToSql + Sync)] = &[&request.from, &request.to, &request.amount, &(sti.version as i64)];

let request: TransferRequest = serde_json::from_value(sti.payload.clone()).map_err(|e| e.to_string())?;
let updated_rows = tx.execute(BANK_ACCOUNTS_UPDATE_QUERY, params).await.map_err(|e| e.to_string())?;
Ok(updated_rows)
}

let sql = r#"
UPDATE bank_accounts ba SET
"amount" =
(CASE
WHEN ba."number" = ($1)::TEXT THEN ba."amount" + ($3)::DECIMAL
WHEN ba."number" = ($2)::TEXT THEN ba."amount" - ($3)::DECIMAL
END),
"version" = ($4)::BIGINT
WHERE ba."number" IN (($1)::TEXT, ($2)::TEXT)
AND ba."version" < ($4)::BIGINT
"#;
async fn update_snapshot(tx: &Transaction<'_>, snapshot_version: u64) -> Result<u64, String> {
let params: &[&(dyn ToSql + Sync)] = &[&(snapshot_version as i64), &"SINGLETON"];

let params: &[&(dyn ToSql + Sync)] = &[&request.from, &request.to, &request.amount, &(sti.version as i64)];
tx.execute(SNAPSHOT_UPDATE_QUERY, params).await.map_err(|e| e.to_string())
}
}

let updated_rows = tx.execute(sql, params).await.map_err(|e| e.to_string())?;
#[async_trait]
impl ReplicatorInstaller for BankStatemapInstaller {
/// Install statemaps and version for respective rows in the `bank_accounts` table and update the `snapshot_version` in `cohort_snapshot` table.
///
/// Certain errors like `could not serialize access due to concurrent update` in postgres are retriable.
/// - Updating `bank_account` workflow.
/// - On successful completion, we proceed to update the `snapshot_version` in `cohort_snapshot`.
/// - When there is a retryable error, go to the start of the loop to retry. `(Txn aborts and retries)`
/// - For non-retryable error, we return the `Error`.
/// - Updating `cohort_snapshot` workflow.
/// - On successful completion, we commit the transaction and return. `(Txn Commits and returns)`
/// - When there is a retryable error, go to the start of the loop to retry. `(Txn aborts and retries)`
/// - For non-retryable error, we return the `Error`.

async fn install(&self, statemap: Vec<StatemapItem>, snapshot_version: u64) -> Result<(), String> {
let mut cnn = self.database.get().await.map_err(|e| e.to_string())?;
loop {
let tx = cnn.transaction().await.map_err(|e| e.to_string())?;
if !statemap.is_empty() {
let updated_rows_res = BankStatemapInstaller::install_bank_transfer_statemap(&tx, &statemap, snapshot_version).await;

match updated_rows_res {
Ok(updated_rows) => {
if updated_rows == 0 {
log::debug!(
"No rows were updated when installing: {:?}. Snapshot will be set to: {}",
fmarek-kindred marked this conversation as resolved.
Show resolved Hide resolved
statemap,
snapshot_version
);
}

if updated_rows > 0 {
log::debug!("No rows were updated when installing: {:?}. Snapshot will be set to: {}", sti, snapshot_version);
log::info!(
"{} rows were updated when installing: {:?}. Snapshot will be set to: {}",
updated_rows,
statemap,
snapshot_version
);
}
Err(bank_transfer_db_error) => {
// Check if retry is allowed on the error.
if is_retriable_error(&bank_transfer_db_error) {
tokio::time::sleep(Duration::from_millis(self.retry_wait_ms)).await;
continue;
} else {
return Err(bank_transfer_db_error);
}
}
}

log::info!(
"{} rows were updated when installing: {:?}. Snapshot will be set to: {}",
updated_rows,
sti,
snapshot_version
);
}

let params: &[&(dyn ToSql + Sync)] = &[&(snapshot_version as i64), &"SINGLETON"];

let sql = r#"UPDATE cohort_snapshot SET "version" = ($1)::BIGINT WHERE id = $2 AND "version" < ($1)::BIGINT"#;
let result = tx.execute(sql, params).await;
let result = BankStatemapInstaller::update_snapshot(&tx, snapshot_version).await;

match result {
Ok(updated_rows) => {
Expand All @@ -79,23 +117,19 @@ impl ReplicatorInstaller for StatemapInstallerImpl {
.await
.map_err(|tx_error| format!("Commit error for statemap. Error: {}", tx_error))?;

return Ok(ReplicatorInstallStatus::Success);
return Ok(());
}
Err(error) => {
if error.to_string().contains("could not serialize access due to concurrent update") {
current_count += 1;

if current_count >= self.max_retry {
return Ok(ReplicatorInstallStatus::Gaveup(current_count));
}

// If serialize error and haven't reached the retry limit, sleep for 2 ms.
Err(error) =>
// Check if retry is allowed on the error.
{
if is_retriable_error(&error) {
tokio::time::sleep(Duration::from_millis(self.retry_wait_ms)).await;
continue;
} else {
return Err(error.to_string());
return Err(error);
}
}
}
};
}
}
}
3 changes: 1 addition & 2 deletions packages/cohort_sdk/src/cohort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ impl Cohort {
// Param1: The list of statemap items.
// Param2: Version to install.
// Returns error descrition. If string is empty it means there was no error installing
) -> Result<Self, ClientError>
where {
) -> Result<Self, ClientError> {
let agent_config: AgentConfig = config.clone().into();
let kafka_config: KafkaConfig = config.clone().into();

Expand Down
9 changes: 1 addition & 8 deletions packages/talos_cohort_replicator/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ pub struct StatemapInstallerHashmap {
#[derive(Debug)]
pub enum StatemapInstallationStatus {
Success(u64),
GaveUp(u64),
Error(u64, String),
}

Expand Down Expand Up @@ -64,15 +63,9 @@ impl StatemapItem {
}
}

pub type RetryCount = u32;
pub enum ReplicatorInstallStatus {
Success,
Gaveup(RetryCount),
}

#[async_trait]
pub trait ReplicatorInstaller {
async fn install(&self, sm: Vec<StatemapItem>, version: u64) -> Result<ReplicatorInstallStatus, String>;
async fn install(&self, sm: Vec<StatemapItem>, version: u64) -> Result<(), String>;
}
#[async_trait]
pub trait ReplicatorSnapshotProvider {
Expand Down
31 changes: 31 additions & 0 deletions packages/talos_cohort_replicator/src/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use thiserror::Error as ThisError;

// enum ServiceErrorKind {
// // i/o errors
// DBError,
// EventStreamError,
// // internal channel errors
// ChannelError,
// // general system related errors
// SystemError,
// }

#[derive(Debug, ThisError, PartialEq, Clone)]
#[error("error reason={reason} ")]
pub struct ServiceError {
// pub kind: SystemServiceErrorKind,
pub reason: String,
// pub data: Option<String>,
// pub service: String,
}

// enum ReplicatorInstallerError {
// DBConnectionError
// DB
// // DB Connection Error
// // DB Transaction Error
// // Data deserialization Error
// // Update table - Cohort related or snapshot ([table name], retry count)
// // Exhausted retry and no install

// }
3 changes: 2 additions & 1 deletion packages/talos_cohort_replicator/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
mod core;
pub mod errors;
mod models;
mod services;
mod suffix;
mod talos_cohort_replicator;
pub mod utils;

pub use crate::core::{ReplicatorInstallStatus, ReplicatorInstaller, ReplicatorSnapshotProvider, StatemapItem};
pub use crate::core::{ReplicatorInstaller, ReplicatorSnapshotProvider, StatemapItem};
pub use talos_cohort_replicator::{talos_cohort_replicator, CohortReplicatorConfig};
#[cfg(test)]
mod tests;
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{fmt::Debug, time::Duration};

use crate::{
core::{Replicator, ReplicatorChannel, StatemapItem},
errors::ServiceError,
models::ReplicatorCandidate,
suffix::ReplicatorSuffixTrait,
};
Expand All @@ -21,7 +22,7 @@ pub async fn replicator_service<S, M>(
mut replicator_rx: mpsc::Receiver<ReplicatorChannel>,
mut replicator: Replicator<ReplicatorCandidate, S, M>,
config: ReplicatorServiceConfig,
) -> Result<(), String>
) -> Result<(), ServiceError>
where
S: ReplicatorSuffixTrait<ReplicatorCandidate> + Debug,
M: MessageReciever<Message = ChannelMessage> + Send + Sync,
Expand Down Expand Up @@ -130,7 +131,6 @@ where
total_items_installed += 1;
time_last_item_installed_ns = OffsetDateTime::now_utc().unix_timestamp_nanos();


}
ReplicatorChannel::InstallationFailure(_) => {
// panic!("[panic panic panic] Installation Failed and replicator will panic and stop");
Expand Down
Loading