Skip to content

Commit

Permalink
feat: Introduce opentel metrics to cohort sdk and banking example. (#65)
Browse files Browse the repository at this point in the history
* feat: Introduce opentel metrics to cohort sdk and banking example.

* fix: Remove unused 'opentelemtry-prometheus' crate.

* feat: Remove replicator settings from cohort sdk

* feat: initial commit for replicator as standalone package

* feat: first working cut of replicator as a package with example

* chore: remove old cohort package

* feat: retry applied to banking statemap installer

* feat: use env variables in config

* feat: remove replicator dependency from cohort-sdk

* feat: initial commit for error handling

* feat: update retry logic of replicator + installer

* feat: remove give up logic and retry infinitly for retriable errors

* chore: update inputs from review comments

* fix: Join monitor handle with rest of async services.

* feature: When Talos aborts tx poll for snapshot until conflict is resolved

* chore: Minor typo fix.

* feature: Implement timeout when waiting for snapshot.

* fix: Remove global storage.

* fix: Use version:u64 to represent conflict data.

* chore: Minor fixes.

* chore: Remove unused setting from .env.examples

* chore: Set REPLICATOR_SUFFIX_PRUNE_THRESHOLD=1 in .env.example

* chore: Remove unused dependency 'talos_certifier_adapters' from cohort_sdk

---------

Co-authored-by: Gethyl Kurian <gethyl.kurian@kindredgroup.com>
  • Loading branch information
fmarek-kindred and gk-kindred authored Aug 16, 2023
1 parent e500dcc commit e279978
Show file tree
Hide file tree
Showing 30 changed files with 999 additions and 509 deletions.
1 change: 0 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ REPLICATOR_CHANNEL_SIZE=100000

REPLICATOR_SUFFIX_CAPACITY=100000
REPLICATOR_SUFFIX_PRUNE_THRESHOLD=1
# REPLICATOR_SUFFIX_PRUNE_THRESHOLD=100

STATEMAP_QUEUE_CLEANUP_FREQUENCY_MS=10000
STATEMAP_INSTALLER_THREAD_POOL=10
Expand Down
83 changes: 38 additions & 45 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions examples/agent_client/examples/agent_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn main() -> Result<(), String> {
async fn certify() -> Result<(), String> {
let params = get_params().await?;

let generated = async_channel::unbounded::<CertificationRequest>();
let generated = async_channel::unbounded::<(CertificationRequest, f64)>();
let tx_generated = Arc::new(generated.0);

// give this to worker threads
Expand Down Expand Up @@ -81,7 +81,7 @@ async fn certify() -> Result<(), String> {
Ok(())
}

fn init_workers(params: LaunchParams, queue: Arc<Receiver<CertificationRequest>>) -> JoinHandle<()> {
fn init_workers(params: LaunchParams, queue: Arc<Receiver<(CertificationRequest, f64)>>) -> JoinHandle<()> {
tokio::spawn(async move {
let agent = Arc::new(make_agent(params.clone()).await);

Expand All @@ -96,7 +96,7 @@ fn init_workers(params: LaunchParams, queue: Arc<Receiver<CertificationRequest>>
loop {
let queue = Arc::clone(&queue_ref);
let agent = Arc::clone(&agent_ref);
if let Ok(tx_req) = queue.recv().await {
if let Ok((tx_req, _)) = queue.recv().await {
if (agent.certify(tx_req).await).is_err() {
errors_count += 1
}
Expand Down Expand Up @@ -264,7 +264,7 @@ async fn make_agent(params: LaunchParams) -> impl TalosAgent {
agent
}

fn create_stop_controller(params: LaunchParams, queue: Arc<Receiver<CertificationRequest>>) -> JoinHandle<Result<(), String>> {
fn create_stop_controller(params: LaunchParams, queue: Arc<Receiver<(CertificationRequest, f64)>>) -> JoinHandle<Result<(), String>> {
tokio::spawn(async move {
let mut remaining_checks = params.stop_max_empty_checks;
loop {
Expand Down
8 changes: 4 additions & 4 deletions examples/cohort_banking_with_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ tokio = { workspace = true, features = ["full"] }

async-channel = { version = "1.8.0" }
deadpool-postgres = { version = "0.10" }
opentelemetry_api = { version = "0.19.0" }
opentelemetry_sdk = { version = "0.19.0", features = ["metrics", "rt-tokio"] }
opentelemetry = { version = "0.19.0" }
opentelemetry-prometheus = { version = "0.12.0", features = ["prometheus-encoding"] }
opentelemetry_api = { version = "0.20.0", features = ["metrics"] }
opentelemetry-stdout = { version = "0.1.0", features = ["metrics"] }
opentelemetry_sdk = { version = "0.20.0", features = ["metrics", "rt-tokio"] }
opentelemetry = { version = "0.20.0", features = ["metrics"] }
rand = { version = "0.8.5" }
rdkafka = { version = "0.29.0", features = ["sasl"] }
rdkafka-sys = { version = "4.3.0" }
Expand Down
Loading

0 comments on commit e279978

Please sign in to comment.