Skip to content

Commit

Permalink
Feature/adding config (#4)
Browse files Browse the repository at this point in the history
* chore: changes for transactions

* chore: add Monitor config

---------

Co-authored-by: Amninder Kaur <amninder.kaur@kindredgroup.com>
  • Loading branch information
akaur13 and Amninder Kaur authored Sep 11, 2023
1 parent 9916f09 commit b20560a
Show file tree
Hide file tree
Showing 8 changed files with 317 additions and 159 deletions.
11 changes: 6 additions & 5 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ PG_PASSWORD=admin
PG_DATABASE=chronos
PG_POOL_SIZE=25


# CONFIG
RUST_LOG=info


#APP
DELAY_TIME=0
RANDOMNESS_DELAY=100
DB_POLL_INTERVAL=5
TIMING_ADVANCE=10
FAIL_DETECT_INTERVAL=10
MONITOR_DB_POLL=5
TIMING_ADVANCE=0
FAIL_DETECT_INTERVAL=500
MAX_RETRIES=3
PROCESSOR_DB_POLL=10
6 changes: 4 additions & 2 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ RUST_LOG=debug
#APP
DELAY_TIME=0
RANDOMNESS_DELAY=100
DB_POLL_INTERVAL=5
MONITOR_DB_POLL=5
TIMING_ADVANCE=0
FAIL_DETECT_INTERVAL=500
FAIL_DETECT_INTERVAL=500
MAX_RETRIES=3
PROCESSOR_DB_POLL=10
2 changes: 1 addition & 1 deletion How-to.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ All the required configurations for Chronos can be passed in environment variabl
| PG_POOL_SIZE|50|True
| DELAY_TIME|0|False
| RANDOMNESS_DELAY|100|False
| DB_POLL_INTERVAL|5|False
| MONITOR_DB_POLL|5|False
| TIMING_ADVANCE|0|False
| FAIL_DETECT_INTERVAL|500|False

Expand Down
149 changes: 96 additions & 53 deletions chronos_bin/src/message_processor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::kafka::producer::KafkaProducer;
use crate::postgres::pg::{GetReady, Pg, TableRow};
use crate::utils::config::ChronosConfig;
use crate::utils::delay_controller::DelayController;
use chrono::Utc;
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -13,80 +15,121 @@ pub struct MessageProcessor {

impl MessageProcessor {
pub async fn run(&self) {
println!("MessageProcessor ON!");
log::info!("MessageProcessor ON!");

//Get UUID for the node that deployed this thread
let node_id: String = std::env::var("NODE_ID").unwrap_or_else(|_| uuid::Uuid::new_v4().to_string());
// log::info!("node_id {}", node_id);
let mut delay_controller = DelayController::new(100);
loop {
tokio::time::sleep(Duration::from_millis(10)).await;
// println!("MessageProcessor");
let deadline = Utc::now();
let uuid = Uuid::new_v4();
// tokio::time::sleep(Duration::from_millis(ChronosConfig::from_env().db_poll_interval)).await;
let deadline = Utc::now() - Duration::from_secs(ChronosConfig::from_env().time_advance);

let param = GetReady {
readied_at: deadline,
readied_by: uuid,
readied_by: Uuid::parse_str(&node_id).unwrap(),
deadline,
limit: 1000,
// limit: 1000,
// order: "asc",
};

let mut ready_params = Vec::new();
ready_params.push(param);
//retry loop
loop {
// thread::sleep(Duration::from_millis(100));
let max_retry_count = 3;
let mut retry_count = 0;

match &self.data_store.ready_to_fire(&ready_params).await {
Ok(publish_rows) => {
let mut ids: Vec<String> = Vec::with_capacity(publish_rows.len());
let mut publish_futures = Vec::with_capacity(publish_rows.len());
for row in publish_rows {
let updated_row = TableRow {
id: row.get("id"),
deadline: row.get("deadline"),
readied_at: row.get("readied_at"),
readied_by: row.get("readied_by"),
message_headers: row.get("message_headers"),
message_key: row.get("message_key"),
message_value: row.get("message_value"),
};
let mut node_id: Option<String> = None;
// let mut row_id: Option<String> = None;
match &self.data_store.ready_to_fire(&param).await {
Ok(publish_rows) => {
let rdy_to_pblsh_count = publish_rows.len();
if rdy_to_pblsh_count > 0 {
let mut ids: Vec<String> = Vec::with_capacity(rdy_to_pblsh_count);
let mut publish_futures = Vec::with_capacity(rdy_to_pblsh_count);
for row in publish_rows {
let updated_row = TableRow {
id: row.get("id"),
deadline: row.get("deadline"),
readied_at: row.get("readied_at"),
readied_by: row.get("readied_by"),
message_headers: row.get("message_headers"),
message_key: row.get("message_key"),
message_value: row.get("message_value"),
};

let headers: HashMap<String, String> = match serde_json::from_str(&updated_row.message_headers.to_string()) {
Ok(t) => t,
Err(_e) => {
println!("error occurred while parsing");
HashMap::new()
}
};
//TODO: handle empty headers
// println!("checking {:?}",headers);
let mut headers: HashMap<String, String> = match serde_json::from_str(&updated_row.message_headers.to_string()) {
Ok(t) => t,
Err(_e) => {
println!("error occurred while parsing");
HashMap::new()
}
};
//TODO: handle empty headers
// println!("checking {:?}",headers);

publish_futures.push(self.producer.publish(
updated_row.message_value.to_string(),
Some(headers),
updated_row.message_key.to_string(),
updated_row.id.to_string(),
))
}
let results = futures::future::join_all(publish_futures).await;
for result in results {
match result {
Ok(m) => {
ids.push(m);
node_id = Some(updated_row.readied_by.to_string());
// row_id = Some(updated_row.id.to_string());

headers.insert("readied_by".to_string(), node_id.unwrap());

publish_futures.push(self.producer.publish(
updated_row.message_value.to_string(),
Some(headers),
updated_row.message_key.to_string(),
updated_row.id.to_string(),
))
}
Err(e) => {
println!("publish failed {:?}", e);
// failure detection needs to pick
let results = futures::future::join_all(publish_futures).await;
for result in results {
match result {
Ok(m) => {
ids.push(m);
}
Err(e) => {
log::error!("Error: delayed message publish failed {:?}", e);
break;
// failure detection needs to pick
}
}
}

if !ids.is_empty() {
if let Err(outcome_error) = &self.data_store.delete_fired(&ids).await {
println!("Error: error occurred in message processor delete_fired {}", outcome_error);
//add retry logic here
}
println!("delete ids {:?} and break", ids);
break;
}
log::debug!("number of rows published successfully and deleted from DB {}", ids.len());
} else {
log::debug!("no rows ready to fire for dealine {}", deadline);
break;
}
}

if !ids.is_empty() {
if let Err(outcome_error) = &self.data_store.delete_fired(&ids).await {
println!("error occurred in message processor delete_fired {}", outcome_error);
Err(e) => {
if e.contains("could not serialize access due to concurrent update") && retry_count < max_retry_count {
//retry goes here
eprintln!("retrying");
retry_count += 1;
if retry_count == max_retry_count {
log::error!(
"Error: max retry count {} reached by node {} for row ",
max_retry_count,
node_id.unwrap(),
// row_id.unwrap()
);
break;
}
}
log::error!("Error: error occurred in message processor while publishing {}", e);
break;
}
}
Err(e) => {
println!("error occurred in message processor {}", e);
}
}
delay_controller.sleep().await;
}
}
}
11 changes: 8 additions & 3 deletions chronos_bin/src/monitor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::postgres::pg::Pg;
use chrono::{Duration as chrono_duration, Utc};
use crate::utils::config::ChronosConfig;
use chrono::Utc;
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -12,9 +13,13 @@ impl FailureDetector {
pub async fn run(&self) {
println!("Monitoring On!");
loop {
let _ = tokio::time::sleep(Duration::from_secs(10)).await; // sleep for 10sec
let _ = tokio::time::sleep(Duration::from_secs(ChronosConfig::from_env().monitor_db_poll)).await; // sleep for 10sec

match &self.data_store.failed_to_fire(Utc::now() - chrono_duration::seconds(10)).await {
match &self
.data_store
.failed_to_fire(&(Utc::now() - Duration::from_secs(ChronosConfig::from_env().fail_detect_interval)))
.await
{
Ok(fetched_rows) => {
if !fetched_rows.is_empty() {
if let Err(e) = &self.data_store.reset_to_init(fetched_rows).await {
Expand Down
Loading

0 comments on commit b20560a

Please sign in to comment.