-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: update replicator retry logic #66
Merged
Merged
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
59980e6
feat: initial commit for replicator as standalone package
gk-kindred b1c3075
feat: first working cut of replicator as a package with example
gk-kindred 2aaee04
chore: remove old cohort package
gk-kindred 3878c4f
feat: retry applied to banking statemap installer
gk-kindred 16c7d2e
feat: use env variables in config
gk-kindred 2b409e3
feat: remove replicator dependency from cohort-sdk
gk-kindred 2862e6d
feat: initial commit for error handling
gk-kindred 09249ba
feat: update retry logic of replicator + installer
gk-kindred fefdbda
feat: remove give up logic and retry infinitly for retriable errors
gk-kindred 2b25481
chore: update inputs from review comments
gk-kindred b76dae0
chore: updates from review comments
gk-kindred File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,68 +1,106 @@ | ||
use std::{sync::Arc, time::Duration}; | ||
|
||
use async_trait::async_trait; | ||
use talos_cohort_replicator::{ReplicatorInstallStatus, ReplicatorInstaller, StatemapItem}; | ||
use deadpool_postgres::Transaction; | ||
use talos_cohort_replicator::{ReplicatorInstaller, StatemapItem}; | ||
use tokio_postgres::types::ToSql; | ||
|
||
use crate::{model::requests::TransferRequest, state::postgres::database::Database}; | ||
|
||
pub struct StatemapInstallerImpl { | ||
const BANK_ACCOUNTS_UPDATE_QUERY: &str = r#" | ||
UPDATE bank_accounts ba SET | ||
"amount" = | ||
(CASE | ||
WHEN ba."number" = ($1)::TEXT THEN ba."amount" + ($3)::DECIMAL | ||
WHEN ba."number" = ($2)::TEXT THEN ba."amount" - ($3)::DECIMAL | ||
END), | ||
"version" = ($4)::BIGINT | ||
WHERE ba."number" IN (($1)::TEXT, ($2)::TEXT) | ||
AND ba."version" < ($4)::BIGINT | ||
"#; | ||
|
||
const SNAPSHOT_UPDATE_QUERY: &str = r#"UPDATE cohort_snapshot SET "version" = ($1)::BIGINT WHERE id = $2 AND "version" < ($1)::BIGINT"#; | ||
|
||
fn is_retriable_error(error: &str) -> bool { | ||
// TODO: improve retriable error detection when error moves from String to enum or struct. | ||
error.contains("could not serialize access due to concurrent update") | ||
} | ||
|
||
pub struct BankStatemapInstaller { | ||
pub database: Arc<Database>, | ||
pub max_retry: u32, | ||
pub retry_wait_ms: u64, | ||
} | ||
|
||
#[async_trait] | ||
impl ReplicatorInstaller for StatemapInstallerImpl { | ||
async fn install(&self, statemap: Vec<StatemapItem>, snapshot_version: u64) -> Result<ReplicatorInstallStatus, String> { | ||
let mut current_count = 0; | ||
|
||
loop { | ||
// from = 1 | ||
// to = 2 | ||
// amount = 3 | ||
// new_ver = 4 | ||
|
||
let mut cnn = self.database.get().await.map_err(|e| e.to_string())?; | ||
let tx = cnn.transaction().await.map_err(|e| e.to_string())?; | ||
impl BankStatemapInstaller { | ||
async fn install_bank_transfer_statemap(tx: &Transaction<'_>, statemap: &[StatemapItem], _snapshot_version: u64) -> Result<u64, String> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks |
||
let sti = statemap[0].clone(); | ||
|
||
if !statemap.is_empty() { | ||
let sti = statemap[0].clone(); | ||
let request: TransferRequest = serde_json::from_value(sti.payload.clone()).map_err(|e| e.to_string())?; | ||
let params: &[&(dyn ToSql + Sync)] = &[&request.from, &request.to, &request.amount, &(sti.version as i64)]; | ||
|
||
let request: TransferRequest = serde_json::from_value(sti.payload.clone()).map_err(|e| e.to_string())?; | ||
let updated_rows = tx.execute(BANK_ACCOUNTS_UPDATE_QUERY, params).await.map_err(|e| e.to_string())?; | ||
Ok(updated_rows) | ||
} | ||
|
||
let sql = r#" | ||
UPDATE bank_accounts ba SET | ||
"amount" = | ||
(CASE | ||
WHEN ba."number" = ($1)::TEXT THEN ba."amount" + ($3)::DECIMAL | ||
WHEN ba."number" = ($2)::TEXT THEN ba."amount" - ($3)::DECIMAL | ||
END), | ||
"version" = ($4)::BIGINT | ||
WHERE ba."number" IN (($1)::TEXT, ($2)::TEXT) | ||
AND ba."version" < ($4)::BIGINT | ||
"#; | ||
async fn update_snapshot(tx: &Transaction<'_>, snapshot_version: u64) -> Result<u64, String> { | ||
let params: &[&(dyn ToSql + Sync)] = &[&(snapshot_version as i64), &"SINGLETON"]; | ||
|
||
let params: &[&(dyn ToSql + Sync)] = &[&request.from, &request.to, &request.amount, &(sti.version as i64)]; | ||
tx.execute(SNAPSHOT_UPDATE_QUERY, params).await.map_err(|e| e.to_string()) | ||
} | ||
} | ||
|
||
let updated_rows = tx.execute(sql, params).await.map_err(|e| e.to_string())?; | ||
#[async_trait] | ||
impl ReplicatorInstaller for BankStatemapInstaller { | ||
/// Install statemaps and version for respective rows in the `bank_accounts` table and update the `snapshot_version` in `cohort_snapshot` table. | ||
/// | ||
/// Certain errors like `could not serialize access due to concurrent update` in postgres are retriable. | ||
/// - Updating `bank_account` workflow. | ||
/// - On successful completion, we proceed to update the `snapshot_version` in `cohort_snapshot`. | ||
/// - When there is a retryable error, go to the start of the loop to retry. `(Txn aborts and retries)` | ||
/// - For non-retryable error, we return the `Error`. | ||
/// - Updating `cohort_snapshot` workflow. | ||
/// - On successful completion, we commit the transaction and return. `(Txn Commits and returns)` | ||
/// - When there is a retryable error, go to the start of the loop to retry. `(Txn aborts and retries)` | ||
/// - For non-retryable error, we return the `Error`. | ||
|
||
async fn install(&self, statemap: Vec<StatemapItem>, snapshot_version: u64) -> Result<(), String> { | ||
let mut cnn = self.database.get().await.map_err(|e| e.to_string())?; | ||
loop { | ||
let tx = cnn.transaction().await.map_err(|e| e.to_string())?; | ||
if !statemap.is_empty() { | ||
let updated_rows_res = BankStatemapInstaller::install_bank_transfer_statemap(&tx, &statemap, snapshot_version).await; | ||
|
||
match updated_rows_res { | ||
Ok(updated_rows) => { | ||
if updated_rows == 0 { | ||
log::debug!( | ||
"No rows were updated when installing: {:?}. Snapshot will be set to: {}", | ||
fmarek-kindred marked this conversation as resolved.
Show resolved
Hide resolved
|
||
statemap, | ||
snapshot_version | ||
); | ||
} | ||
|
||
if updated_rows > 0 { | ||
log::debug!("No rows were updated when installing: {:?}. Snapshot will be set to: {}", sti, snapshot_version); | ||
log::info!( | ||
"{} rows were updated when installing: {:?}. Snapshot will be set to: {}", | ||
updated_rows, | ||
statemap, | ||
snapshot_version | ||
); | ||
} | ||
Err(bank_transfer_db_error) => { | ||
// Check if retry is allowed on the error. | ||
if is_retriable_error(&bank_transfer_db_error) { | ||
tokio::time::sleep(Duration::from_millis(self.retry_wait_ms)).await; | ||
continue; | ||
} else { | ||
return Err(bank_transfer_db_error); | ||
} | ||
} | ||
} | ||
|
||
log::info!( | ||
"{} rows were updated when installing: {:?}. Snapshot will be set to: {}", | ||
updated_rows, | ||
sti, | ||
snapshot_version | ||
); | ||
} | ||
|
||
let params: &[&(dyn ToSql + Sync)] = &[&(snapshot_version as i64), &"SINGLETON"]; | ||
|
||
let sql = r#"UPDATE cohort_snapshot SET "version" = ($1)::BIGINT WHERE id = $2 AND "version" < ($1)::BIGINT"#; | ||
let result = tx.execute(sql, params).await; | ||
let result = BankStatemapInstaller::update_snapshot(&tx, snapshot_version).await; | ||
|
||
match result { | ||
Ok(updated_rows) => { | ||
|
@@ -79,23 +117,19 @@ impl ReplicatorInstaller for StatemapInstallerImpl { | |
.await | ||
.map_err(|tx_error| format!("Commit error for statemap. Error: {}", tx_error))?; | ||
|
||
return Ok(ReplicatorInstallStatus::Success); | ||
return Ok(()); | ||
} | ||
Err(error) => { | ||
if error.to_string().contains("could not serialize access due to concurrent update") { | ||
current_count += 1; | ||
|
||
if current_count >= self.max_retry { | ||
return Ok(ReplicatorInstallStatus::Gaveup(current_count)); | ||
} | ||
|
||
// If serialize error and haven't reached the retry limit, sleep for 2 ms. | ||
Err(error) => | ||
// Check if retry is allowed on the error. | ||
{ | ||
if is_retriable_error(&error) { | ||
tokio::time::sleep(Duration::from_millis(self.retry_wait_ms)).await; | ||
continue; | ||
} else { | ||
return Err(error.to_string()); | ||
return Err(error); | ||
} | ||
} | ||
} | ||
}; | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
use thiserror::Error as ThisError; | ||
|
||
// enum ServiceErrorKind { | ||
// // i/o errors | ||
// DBError, | ||
// EventStreamError, | ||
// // internal channel errors | ||
// ChannelError, | ||
// // general system related errors | ||
// SystemError, | ||
// } | ||
|
||
#[derive(Debug, ThisError, PartialEq, Clone)] | ||
#[error("error reason={reason} ")] | ||
pub struct ServiceError { | ||
// pub kind: SystemServiceErrorKind, | ||
pub reason: String, | ||
// pub data: Option<String>, | ||
// pub service: String, | ||
} | ||
|
||
// enum ReplicatorInstallerError { | ||
// DBConnectionError | ||
// DB | ||
// // DB Connection Error | ||
// // DB Transaction Error | ||
// // Data deserialization Error | ||
// // Update table - Cohort related or snapshot ([table name], retry count) | ||
// // Exhausted retry and no install | ||
|
||
// } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,12 @@ | ||
mod core; | ||
pub mod errors; | ||
mod models; | ||
mod services; | ||
mod suffix; | ||
mod talos_cohort_replicator; | ||
pub mod utils; | ||
|
||
pub use crate::core::{ReplicatorInstallStatus, ReplicatorInstaller, ReplicatorSnapshotProvider, StatemapItem}; | ||
pub use crate::core::{ReplicatorInstaller, ReplicatorSnapshotProvider, StatemapItem}; | ||
pub use talos_cohort_replicator::{talos_cohort_replicator, CohortReplicatorConfig}; | ||
#[cfg(test)] | ||
mod tests; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, this is Postgres text. Somehow it should be abstracted depending on DB type.