diff --git a/.env.example b/.env.example index 5f597522..7b0f215c 100644 --- a/.env.example +++ b/.env.example @@ -60,7 +60,6 @@ REPLICATOR_CHANNEL_SIZE=100000 REPLICATOR_SUFFIX_CAPACITY=100000 REPLICATOR_SUFFIX_PRUNE_THRESHOLD=1 -# REPLICATOR_SUFFIX_PRUNE_THRESHOLD=100 STATEMAP_QUEUE_CLEANUP_FREQUENCY_MS=10000 STATEMAP_INSTALLER_THREAD_POOL=10 diff --git a/Cargo.lock b/Cargo.lock index 6ef7df28..61f881b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -445,7 +445,7 @@ dependencies = [ "log", "metrics", "opentelemetry", - "opentelemetry-prometheus", + "opentelemetry-stdout", "opentelemetry_api", "opentelemetry_sdk", "rand", @@ -494,6 +494,7 @@ dependencies = [ "env_logger", "futures", "log", + "metrics", "opentelemetry", "opentelemetry_api", "opentelemetry_sdk", @@ -505,8 +506,6 @@ dependencies = [ "strum 0.25.0", "talos_agent", "talos_certifier", - "talos_certifier_adapters", - "talos_suffix", "tokio", "uuid", ] @@ -808,12 +807,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "fnv" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" - [[package]] name = "form_urlencoded" version = "1.2.0" @@ -1196,6 +1189,15 @@ dependencies = [ name = "metrics" version = "0.0.1" dependencies = [ + "env_logger", + "log", + "once_cell", + "opentelemetry", + "opentelemetry-stdout", + "opentelemetry_api", + "opentelemetry_sdk", + "serde", + "serde_json", "time 0.3.24", ] @@ -1336,35 +1338,38 @@ dependencies = [ [[package]] name = "opentelemetry" -version = "0.19.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f4b8347cc26099d3aeee044065ecc3ae11469796b4d65d065a23a584ed92a6f" +checksum = "9591d937bc0e6d2feb6f71a559540ab300ea49955229c347a517a28d27784c54" dependencies = [ "opentelemetry_api", "opentelemetry_sdk", ] [[package]] -name = "opentelemetry-prometheus" -version = "0.12.0" +name = "opentelemetry-stdout" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a9f186f6293ebb693caddd0595e66b74a6068fa51048e26e0bf9c95478c639c" +checksum = "8bd550321bc0f9d3f6dcbfe5c75262789de5b3e2776da2cbcfd2392aa05db0c6" dependencies = [ - "opentelemetry", - "prometheus", - "protobuf", + "async-trait", + "opentelemetry_api", + "opentelemetry_sdk", + "ordered-float", + "serde", + "serde_json", ] [[package]] name = "opentelemetry_api" -version = "0.19.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed41783a5bf567688eb38372f2b7a8530f5a607a4b49d38dd7573236c23ca7e2" +checksum = "8a81f725323db1b1206ca3da8bb19874bbd3f57c3bcd59471bfb04525b265b9b" dependencies = [ - "fnv", "futures-channel", "futures-util", "indexmap 1.9.3", + "js-sys", "once_cell", "pin-project-lite", "thiserror", @@ -1373,26 +1378,35 @@ dependencies = [ [[package]] name = "opentelemetry_sdk" -version = "0.19.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b3a2a91fdbfdd4d212c0dcc2ab540de2c2bcbbd90be17de7a7daf8822d010c1" +checksum = "fa8e705a0612d48139799fcbaba0d4a90f06277153e43dd2bdc16c6f0edd8026" dependencies = [ "async-trait", "crossbeam-channel", - "dashmap", - "fnv", "futures-channel", "futures-executor", "futures-util", "once_cell", "opentelemetry_api", + "ordered-float", "percent-encoding", "rand", + "regex", "thiserror", "tokio", "tokio-stream", ] +[[package]] +name = "ordered-float" +version = "3.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fc2dbde8f8a79f2102cc474ceb0ad68e3b80b85289ea62389b60e66777e4213" +dependencies = [ + "num-traits", +] + [[package]] name = "os_pipe" version = "1.1.4" @@ -1612,27 +1626,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "prometheus" -version = "0.13.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c" -dependencies = [ - "cfg-if", - "fnv", - "lazy_static", - "memchr", - "parking_lot", - "protobuf", - "thiserror", -] - -[[package]] -name = "protobuf" -version = "2.28.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" - [[package]] name = "ptr_meta" version = "0.1.4" diff --git a/examples/agent_client/examples/agent_client.rs b/examples/agent_client/examples/agent_client.rs index 327e0c50..814a411e 100644 --- a/examples/agent_client/examples/agent_client.rs +++ b/examples/agent_client/examples/agent_client.rs @@ -43,7 +43,7 @@ async fn main() -> Result<(), String> { async fn certify() -> Result<(), String> { let params = get_params().await?; - let generated = async_channel::unbounded::(); + let generated = async_channel::unbounded::<(CertificationRequest, f64)>(); let tx_generated = Arc::new(generated.0); // give this to worker threads @@ -81,7 +81,7 @@ async fn certify() -> Result<(), String> { Ok(()) } -fn init_workers(params: LaunchParams, queue: Arc>) -> JoinHandle<()> { +fn init_workers(params: LaunchParams, queue: Arc>) -> JoinHandle<()> { tokio::spawn(async move { let agent = Arc::new(make_agent(params.clone()).await); @@ -96,7 +96,7 @@ fn init_workers(params: LaunchParams, queue: Arc> loop { let queue = Arc::clone(&queue_ref); let agent = Arc::clone(&agent_ref); - if let Ok(tx_req) = queue.recv().await { + if let Ok((tx_req, _)) = queue.recv().await { if (agent.certify(tx_req).await).is_err() { errors_count += 1 } @@ -264,7 +264,7 @@ async fn make_agent(params: LaunchParams) -> impl TalosAgent { agent } -fn create_stop_controller(params: LaunchParams, queue: Arc>) -> JoinHandle> { +fn create_stop_controller(params: LaunchParams, queue: Arc>) -> JoinHandle> { tokio::spawn(async move { let mut remaining_checks = params.stop_max_empty_checks; loop { diff --git a/examples/cohort_banking_with_sdk/Cargo.toml b/examples/cohort_banking_with_sdk/Cargo.toml index d82bff86..ca1da297 100644 --- a/examples/cohort_banking_with_sdk/Cargo.toml +++ b/examples/cohort_banking_with_sdk/Cargo.toml @@ -23,10 +23,10 @@ tokio = { workspace = true, features = ["full"] } async-channel = { version = "1.8.0" } deadpool-postgres = { version = "0.10" } -opentelemetry_api = { version = "0.19.0" } -opentelemetry_sdk = { version = "0.19.0", features = ["metrics", "rt-tokio"] } -opentelemetry = { version = "0.19.0" } -opentelemetry-prometheus = { version = "0.12.0", features = ["prometheus-encoding"] } +opentelemetry_api = { version = "0.20.0", features = ["metrics"] } +opentelemetry-stdout = { version = "0.1.0", features = ["metrics"] } +opentelemetry_sdk = { version = "0.20.0", features = ["metrics", "rt-tokio"] } +opentelemetry = { version = "0.20.0", features = ["metrics"] } rand = { version = "0.8.5" } rdkafka = { version = "0.29.0", features = ["sasl"] } rdkafka-sys = { version = "4.3.0" } diff --git a/examples/cohort_banking_with_sdk/examples/cohort_banking_with_sdk.rs b/examples/cohort_banking_with_sdk/examples/cohort_banking_with_sdk.rs index 30e71e8c..f8ef5847 100644 --- a/examples/cohort_banking_with_sdk/examples/cohort_banking_with_sdk.rs +++ b/examples/cohort_banking_with_sdk/examples/cohort_banking_with_sdk.rs @@ -1,24 +1,25 @@ -use std::str::FromStr; use std::{collections::HashMap, env, sync::Arc, time::Duration}; use async_channel::Receiver; use cohort_banking::{app::BankingApp, examples_support::queue_processor::QueueProcessor, model::requests::TransferRequest}; -use cohort_sdk::model::Config; +use cohort_sdk::model::{BackoffConfig, Config}; use examples_support::load_generator::models::Generator; use examples_support::load_generator::{generator::ControlledRateLoadGenerator, models::StopType}; -use metrics::model::MinMax; -use opentelemetry_api::KeyValue; -use opentelemetry_sdk::Resource; +use metrics::opentel::aggregation_selector::CustomHistogramSelector; +use metrics::opentel::printer::MetricsToStringPrinter; +use metrics::opentel::scaling::ScalingConfig; +use opentelemetry_api::global; +use opentelemetry_api::metrics::MetricsError; +use opentelemetry_sdk::metrics::reader::DefaultTemporalitySelector; +use opentelemetry_sdk::metrics::{MeterProvider, PeriodicReader}; +use opentelemetry_sdk::runtime; +use opentelemetry_stdout::MetricsExporterBuilder; use rand::Rng; -use rust_decimal::{prelude::FromPrimitive, Decimal}; +use rust_decimal::prelude::FromPrimitive; use tokio::{signal, task::JoinHandle, try_join}; -use opentelemetry::global; use opentelemetry::global::shutdown_tracer_provider; -use opentelemetry::sdk::metrics::{controllers, processors, selectors}; - -use opentelemetry_prometheus::{Encoder, ExporterConfig, PrometheusExporter, TextEncoder}; #[derive(Clone)] struct LaunchParams { @@ -26,8 +27,28 @@ struct LaunchParams { target_rate: f32, threads: u64, accounts: u64, + scaling_config: HashMap, + metric_print_raw: bool, } +/// Connects to database, to kafka certification topic as talso agent and as cohort replicator. +/// Generates some number of banking transactions and passes then all to Talos for certification. +/// Once all transactions have been processed, it prints some output metrics to console. +/// Metric logging is set to WARN. If RUST_LOG is set to stricter than WARN then no metrics will be printed. +/// Preprequisite: +/// Talos, Kafka and DB must be running +/// Kafka topic must be empty. +/// DB should have snapshot table initialised with zero version +/// Banking database should have some accounts ready. +/// +/// Lauch parameters: +/// --accounts - How many accouts are avaiable in database. +/// --threads - How many threads to use. +/// --rate - In TPS, at what rate to generate transactions. +/// --volume - How many transaction to generate or "--volume 10-sec" for how long to generate transactions. +/// --metric_print_raw - When present, raw metric histograms will be printed. The value of this param is ignored. +/// --metric_scaling - The default scaling is 1.0; this parameter controls scaling factor for individual metric. +/// Format is: "--metric_scaling metric-name1=scaling-factor,metric-name2=scaling-factor,..." #[tokio::main] async fn main() -> Result<(), String> { env_logger::builder().format_timestamp_millis().init(); @@ -35,7 +56,7 @@ async fn main() -> Result<(), String> { let params = get_params().await?; - let (tx_queue, rx_queue) = async_channel::unbounded::(); + let (tx_queue, rx_queue) = async_channel::unbounded::<(TransferRequest, f64)>(); let rx_queue = Arc::new(rx_queue); let rx_queue_ref = Arc::clone(&rx_queue); @@ -51,11 +72,14 @@ async fn main() -> Result<(), String> { // // cohort configs // + backoff_on_conflict: BackoffConfig::new(1, 1500), + retry_backoff: BackoffConfig::new(20, 1500), retry_attempts_max: 10, - retry_backoff_max_ms: 1500, - retry_oo_backoff_max_ms: 1000, + retry_oo_backoff: BackoffConfig::new(20, 1000), retry_oo_attempts_max: 10, + snapshot_wait_timeout_ms: 10_000, + // // agent config values // @@ -87,40 +111,10 @@ async fn main() -> Result<(), String> { // should be mapped to rdkafka::config::RDKafkaLogLevel agent_log_level: 6, - // - // Kafka configs for Replicator - // - replicator_client_id: "cohort-banking".into(), - replicator_group_id: "cohort-banking-replicator".into(), - producer_config_overrides: HashMap::new(), - consumer_config_overrides: HashMap::new(), - - // - // Suffix config values - // - /// Initial capacity of the suffix - // suffix_size_max: 500_000, - suffix_size_max: 10, - /// - The suffix prune threshold from when we start checking if the suffix - /// should prune. - /// - Set to None if pruning is not required. - /// - Defaults to None. - // suffix_prune_at_size: Some(300_000), - suffix_prune_at_size: Some(2000), - /// Minimum size of suffix after prune. - /// - Defaults to None. - // suffix_size_min: Some(100_000), - suffix_size_min: None, - - // - // Replicator config values - // - replicator_buffer_size: 100_000, - // // Database config // - db_pool_size: 200, + db_pool_size: 100, db_user: "postgres".into(), db_password: "admin".into(), db_host: "127.0.0.1".into(), @@ -128,25 +122,23 @@ async fn main() -> Result<(), String> { db_database: "talos-sample-cohort-dev".into(), }; - let buckets = [ - 0.1, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 15.0, 20.0, 25.0, 30.0, 35.0, 40.0, 45.0, 50.0, 60.0, 70.0, 80.0, 90.0, 100.0, 200.0, 300.0, 400.0, 500.0, 1000.0, - 1500.0, 2000.0, 2500.0, 3000.0, 3500.0, 4000.0, 4500.0, 5000.0, 10000.0, - ]; - - let factory = processors::factory( - selectors::simple::histogram(buckets), - opentelemetry::sdk::export::metrics::aggregation::cumulative_temporality_selector(), - ); - - let controller = controllers::basic(factory) - .with_collect_period(Duration::from_secs(20)) - .with_resource(Resource::new([KeyValue::new("service_name", "banking_with_cohort_sdk")])) + let printer = MetricsToStringPrinter::new(params.threads, params.metric_print_raw, ScalingConfig { ratios: params.scaling_config }); + let (tx_metrics, rx_metrics) = tokio::sync::watch::channel("".to_string()); + let exporter = MetricsExporterBuilder::default() + .with_aggregation_selector(CustomHistogramSelector::new_with_4k_buckets()?) + .with_temporality_selector(DefaultTemporalitySelector::new()) + .with_encoder(move |_writer, data| { + let report = printer.print(&data).map_err(MetricsError::Other)?; + tx_metrics.send(report).map_err(|e| MetricsError::Other(e.to_string()))?; + Ok(()) + }) .build(); - // this exporter can export into file - let exporter = opentelemetry_prometheus::exporter(controller) - .with_config(ExporterConfig::default().with_scope_info(true)) - .init(); + let reader = PeriodicReader::builder(exporter, runtime::Tokio).build(); + + let meter_provider = MeterProvider::builder().with_reader(reader).build(); + let meter_provider_copy = meter_provider.clone(); + global::set_meter_provider(meter_provider); let meter = global::meter("banking_cohort"); let meter = Arc::new(meter); @@ -158,14 +150,10 @@ async fn main() -> Result<(), String> { let mut i = 1; let mut errors_count = 0; - let mut timeline = MinMax::default(); for task in tasks { - match task.await { - Err(e) => { - errors_count += 1; - log::error!("{:?}", e); - } - Ok(thread_timeline) => timeline.merge(thread_timeline), + if let Err(e) = task.await { + errors_count += 1; + log::error!("{:?}", e); } if i % 10 == 0 { log::warn!("Initiator thread {} of {} finished.", i, params.threads); @@ -173,12 +161,10 @@ async fn main() -> Result<(), String> { i += 1; } - - log::warn!("Duration: {}", Duration::from_nanos((timeline.max - timeline.min) as u64).as_secs_f32()); - log::info!("Finished. errors count: {}", errors_count); + log::info!("Finished. Errors count: {}", errors_count); }); - let h_stop: JoinHandle> = start_queue_monitor(rx_queue_ref); + let queue_monitor: JoinHandle> = start_queue_monitor(rx_queue_ref); let all_async_services = tokio::spawn(async move { let result = try_join!(h_generator, h_cohort); @@ -186,10 +172,7 @@ async fn main() -> Result<(), String> { }); tokio::select! { - _ = h_stop => { - log::warn!("Stop manager is active..."); - } - + _ = queue_monitor => {} _ = all_async_services => {} // CTRL + C termination signal @@ -199,42 +182,19 @@ async fn main() -> Result<(), String> { } shutdown_tracer_provider(); - - print_prometheus_report_as_text(exporter, params.threads); - + let _ = meter_provider_copy.shutdown(); + let report = rx_metrics.borrow(); + log::warn!("{}", *report); Ok(()) } -fn start_queue_monitor(queue: Arc>) -> JoinHandle> { +fn start_queue_monitor(queue: Arc>) -> JoinHandle> { tokio::spawn(async move { let check_frequency = Duration::from_secs(10); - let total_attempts = 3; - - let mut remaining_attempts = total_attempts; loop { - if remaining_attempts == 0 { - // we consumed all attempts - break; - } - - if queue.is_empty() { - // queue is empty and there are no signals from other workers, reduce window and try again - remaining_attempts -= 1; - log::warn!( - "Workers queue is empty and there is no activity signal from replicator. Finishing in: {} seconds...", - remaining_attempts * check_frequency.as_secs() - ); - } else { - remaining_attempts = total_attempts; - log::warn!("Counts. Remaining: {}", queue.len(),); - } - + log::warn!("Remaining requests: {}", queue.len()); tokio::time::sleep(check_frequency).await; } - - queue.close(); - - Err("Signal from StopController".into()) }) } @@ -244,6 +204,8 @@ async fn get_params() -> Result { let mut accounts: Option = None; let mut target_rate: Option = None; let mut stop_type: Option = None; + let mut scaling_config: Option> = None; + let mut metric_print_raw = None; if args.len() >= 3 { let mut i = 1; @@ -270,6 +232,34 @@ async fn get_params() -> Result { let count: u64 = param_value.parse().unwrap(); stop_type = Some(StopType::LimitGeneratedTransactions { count }) } + } else if param_name.eq("--metric_print_raw") { + metric_print_raw = Some(true); + } else if param_name.eq("--metric_scaling") { + let param_value = &args[i + 1]; + let mut cfg: HashMap = HashMap::new(); + for spec in param_value.replace(' ', "").split(',') { + if let Some(i) = spec.find('=') { + let metric: String = spec[..i].into(); + let scale_factor_raw: String = spec[i + 1..].into(); + let scale_factor = match scale_factor_raw.parse::() { + Err(e) => { + log::error!( + "Unable to parse scaling factor for metric '{}'. No scaling will be applied. Pasing: '{}'. Error: {}.", + metric, + scale_factor_raw, + e + ); + 1_f32 + } + Ok(scale_factor) => scale_factor, + }; + cfg.insert(metric, scale_factor); + } + } + + if !cfg.is_empty() { + scaling_config = Some(cfg); + } } i += 2; @@ -288,149 +278,12 @@ async fn get_params() -> Result { stop_type: stop_type.unwrap(), threads: threads.unwrap(), accounts: accounts.unwrap(), + scaling_config: scaling_config.unwrap(), + metric_print_raw: metric_print_raw.is_some(), }) } } -fn print_prometheus_report_as_text(exporter: PrometheusExporter, threads: u64) { - let encoder = TextEncoder::new(); - let metric_families = exporter.registry().gather(); - let mut report_buffer = Vec::::new(); - encoder.encode(&metric_families, &mut report_buffer).unwrap(); - - let report: Vec<&str> = match std::str::from_utf8(&report_buffer) { - Ok(v) => v.split('\n').collect(), - Err(e) => panic!("Invalid UTF-8 sequence: {}", e), - }; - // for line in report.iter().filter(|v| v.starts_with("metric_")) { - // log::warn!("Printing results = {}", line); - // } - - print_histogram("Out of Order Install (DB work)", "metric_oo_install_duration", "ms", &report, threads, true); - print_histogram("Out of Order Install (sleeps)", "metric_oo_wait_duration", "ms", &report, threads, true); - print_histogram( - "Out of Order Install (full span)", - "metric_oo_install_and_wait_duration", - "ms", - &report, - threads, - true, - ); - print_histogram("Out of Order Install attempts used", "metric_oo_attempts", "attempts", &report, threads, false); - print_histogram("Talos roundtrip", "metric_talos", "ms", &report, threads, true); - print_histogram("Candidate roundtrip", "metric_duration", "ms", &report, threads, true); - - let aborts = extract_num_value::(&report, "metric_aborts_total"); - let commits = extract_num_value::(&report, "metric_commits_total"); - let oo_retries = extract_num_value::(&report, "metric_oo_retry_count_total"); - let oo_giveups = extract_num_value::(&report, "metric_oo_giveups_count_total"); - let oo_no_data_found = extract_num_value::(&report, "metric_oo_no_data_found_total"); - let oo_not_safe = extract_num_value::(&report, "metric_oo_not_safe_count_total"); - let talos_aborts = extract_num_value::(&report, "metric_talos_aborts_count_total"); - let agent_retries = extract_num_value::(&report, "metric_agent_retries_count_total"); - let agent_errors = extract_num_value::(&report, "metric_agent_errors_count_total"); - let db_errors = extract_num_value::(&report, "metric_db_errors_count_total"); - - log::warn!("Commits : {}", if let Some(v) = commits { v } else { 0 }); - log::warn!("Aborts : {}", if let Some(v) = aborts { v } else { 0 }); - - log::warn!("OO no data : {}", if let Some(v) = oo_no_data_found { v } else { 0 }); - log::warn!("OO not safe : {}", if let Some(v) = oo_not_safe { v } else { 0 }); - log::warn!("OO retries : {}", if let Some(v) = oo_retries { v } else { 0 }); - log::warn!("OO giveups : {}", if let Some(v) = oo_giveups { v } else { 0 }); - - log::warn!("Talos aborts : {}", if let Some(v) = talos_aborts { v } else { 0 }); - log::warn!("Agent retries : {}", if let Some(v) = agent_retries { v } else { 0 }); - log::warn!("Agent errors : {}", if let Some(v) = agent_errors { v } else { 0 }); - - log::warn!("DB errors : {}", if let Some(v) = db_errors { v } else { 0 }); -} - -fn print_histogram(name: &str, id: &str, unit: &str, report: &[&str], threads: u64, print_tps: bool) { - let histogram: Vec<(f64, u64)> = report - .iter() - .filter(|v| v.starts_with(format!("{}_bucket", id).as_str())) - .filter_map(|line| { - let bucket_label_start_index = line.find("le=\""); - bucket_label_start_index?; - - let line_remainder = &line[bucket_label_start_index.unwrap() + 4..]; - let bucket_label_end_index = line_remainder.find("\"}"); - bucket_label_end_index?; - - let bucket_label = &line_remainder[..bucket_label_end_index.unwrap()]; - let bucket_label_value = if bucket_label == "+Inf" { - f64::MAX - } else { - bucket_label.parse::().unwrap() - }; - let count_in_bucket = &line_remainder[bucket_label_end_index.unwrap() + 3..]; - Some((bucket_label_value, count_in_bucket.parse::().unwrap())) - }) - .collect(); - - let extracted_count = extract_num_value::(report, format!("{}_count", id).as_str()); - if let Some(total_count) = extracted_count { - log::warn!("---------------------------------------------------------"); - log::warn!("{}", name); - for (bucket, count_in_bucket) in histogram { - let percents_in_bucket = (100.0 * count_in_bucket as f64) / total_count as f64; - if bucket == f64::MAX { - log::warn!("< {:>8} {} : {:>9} : {:>6.2}%", "10000+", unit, count_in_bucket, percents_in_bucket); - } else { - log::warn!("< {:>8} {} : {:>9} : {:>6.2}%", bucket, unit, count_in_bucket, percents_in_bucket); - } - } - - let rslt_sum = extract_num_value::(report, format!("{}_sum", id).as_str()); - if let Some(sum) = rslt_sum { - if unit == "ms" { - if sum > 1000.0 { - log::warn!("Total (sec) : {:.1}", sum / 1_000_f64); - log::warn!("Total (sec) avg per th : {:.1}", sum / 1_000_f64 / threads as f64); - } else { - log::warn!("Total (ms) : {:.1}", sum); - log::warn!("Total (ms) avg per th : {:.1}", sum / threads as f64); - } - } else { - log::warn!("Total ({}) : {:.1}", unit, sum); - log::warn!("Total ({}) avg per th : {:.1}", unit, sum / threads as f64); - } - - log::warn!("Count : {}", total_count); - if print_tps { - log::warn!("Approx throughput (tps) : {:.1}", (total_count as f64) / sum * 1000.0 * threads as f64); - } - } - log::warn!("---------------------------------------------------------\n"); - } -} - -fn extract_num_value(report: &[&str], value: &str) -> Option { - let extracted_as_list: Vec = report - .iter() - .filter(|i| i.starts_with(value)) - .filter_map(|i| { - if let Some(pos) = i.find(' ') { - let parsed = i[pos + 1..].parse::(); - if let Ok(num) = parsed { - Some(num) - } else { - None - } - } else { - None - } - }) - .collect(); - - if extracted_as_list.len() == 1 { - Some(extracted_as_list[0].clone()) - } else { - None - } -} - struct TransferRequestGenerator { available_accounts: u64, generated: Vec<(u64, u64)>, @@ -468,7 +321,7 @@ impl Generator for TransferRequestGenerator { TransferRequest { from: format!("{:<04}", from), to: format!("{:<04}", to), - amount: Decimal::from_f32(1.0).unwrap(), + amount: rust_decimal::Decimal::from_f32(1.0).unwrap(), } } } diff --git a/examples/cohort_replicator_kafka_pg/examples/cohort_replicator_kafka_pg.rs b/examples/cohort_replicator_kafka_pg/examples/cohort_replicator_kafka_pg.rs index e0a3dc5a..4d6046b0 100644 --- a/examples/cohort_replicator_kafka_pg/examples/cohort_replicator_kafka_pg.rs +++ b/examples/cohort_replicator_kafka_pg/examples/cohort_replicator_kafka_pg.rs @@ -72,7 +72,7 @@ async fn main() { channel_size: env_var_with_defaults!("REPLICATOR_CHANNEL_SIZE", usize, 100_000), suffix_capacity: env_var_with_defaults!("REPLICATOR_SUFFIX_CAPACITY", usize, 10_000), suffix_prune_threshold: env_var_with_defaults!("REPLICATOR_SUFFIX_PRUNE_THRESHOLD", Option::, 1), - suffix_minimum_size_on_prune: env_var_with_defaults!("REPLICATOR_SUFFIX_PRUNE_THRESHOLD", Option::), + suffix_minimum_size_on_prune: env_var_with_defaults!("REPLICATOR_SUFFIX_MIN_SIZE", Option::), certifier_message_receiver_commit_freq_ms: env_var_with_defaults!("REPLICATOR_COMMIT_FREQ_MS", u64, 10_000), statemap_queue_cleanup_freq_ms: env_var_with_defaults!("STATEMAP_QUEUE_CLEANUP_FREQUENCY_MS", u64, 10_000), statemap_installer_threadpool: env_var_with_defaults!("STATEMAP_INSTALLER_THREAD_POOL", u64, 50), diff --git a/packages/cohort_banking/Cargo.toml b/packages/cohort_banking/Cargo.toml index 30ca7d69..e72ca870 100644 --- a/packages/cohort_banking/Cargo.toml +++ b/packages/cohort_banking/Cargo.toml @@ -20,9 +20,9 @@ deadpool-postgres = { version = "0.10" } async-channel = { version = "1.8.0" } futures = { version = "0.3.28" } -opentelemetry_api = { version = "0.19.0" } -opentelemetry_sdk = { version = "0.19.0", features = ["metrics", "rt-tokio"] } -opentelemetry = { version = "0.19.0" } +opentelemetry_api = { version = "0.20.0" } +opentelemetry_sdk = { version = "0.20.0", features = ["metrics", "rt-tokio"] } +opentelemetry = { version = "0.20.0" } rand = { version = "0.8.5" } strum = { version = "0.25", features = ["derive"] } uuid = { version = "1.2.2", features = ["v4"] } diff --git a/packages/cohort_banking/src/app.rs b/packages/cohort_banking/src/app.rs index 9fb4ca4b..4a7a35f5 100644 --- a/packages/cohort_banking/src/app.rs +++ b/packages/cohort_banking/src/app.rs @@ -5,10 +5,10 @@ use cohort_sdk::{ cohort::Cohort, model::{CandidateData, CertificationRequest, ClientErrorKind, Config}, }; + use opentelemetry_api::{ global, metrics::{Counter, Unit}, - Context, }; use talos_agent::messaging::api::Decision; @@ -56,6 +56,10 @@ impl BankingApp { pub async fn init(&mut self) -> Result<(), String> { let cohort_api = Cohort::create(self.config.clone()).await.map_err(|e| e.to_string())?; + // if no metrics are reported to meter then it will not be visible in the final report. + self.counter_aborts.add(0, &[]); + self.counter_commits.add(0, &[]); + self.counter_oo_no_data_found.add(0, &[]); self.cohort_api = Some(cohort_api); @@ -107,14 +111,14 @@ impl Handler for BankingApp { .await { Ok(rsp) => { - let ca = Arc::clone(&self.counter_aborts); - let cc = Arc::clone(&self.counter_commits); + let c_aborts = Arc::clone(&self.counter_aborts); + let c_commits = Arc::clone(&self.counter_commits); let is_abort = rsp.decision == Decision::Aborted; tokio::spawn(async move { if is_abort { - ca.add(&Context::current(), 1, &[]); + c_aborts.add(1, &[]); } else { - cc.add(&Context::current(), 1, &[]); + c_commits.add(1, &[]); } }); diff --git a/packages/cohort_banking/src/callbacks/oo_installer.rs b/packages/cohort_banking/src/callbacks/oo_installer.rs index decd8a23..3730e74a 100644 --- a/packages/cohort_banking/src/callbacks/oo_installer.rs +++ b/packages/cohort_banking/src/callbacks/oo_installer.rs @@ -6,7 +6,7 @@ use std::{ use async_trait::async_trait; use cohort_sdk::model::callbacks::{OutOfOrderInstallOutcome, OutOfOrderInstaller}; -use opentelemetry_api::{metrics::Counter, Context}; +use opentelemetry_api::metrics::Counter; use tokio::task::JoinHandle; use tokio_postgres::types::ToSql; @@ -156,7 +156,7 @@ impl OutOfOrderInstallerImpl { ); let c = Arc::clone(&self.counter_oo_no_data_found); tokio::spawn(async move { - c.add(&Context::current(), 1, &[]); + c.add(1, &[]); }); return Ok(OutOfOrderInstallOutcome::InstalledAlready); } diff --git a/packages/cohort_banking/src/callbacks/statemap_installer.rs b/packages/cohort_banking/src/callbacks/statemap_installer.rs index fa397307..f23c8d22 100644 --- a/packages/cohort_banking/src/callbacks/statemap_installer.rs +++ b/packages/cohort_banking/src/callbacks/statemap_installer.rs @@ -63,7 +63,6 @@ impl ReplicatorInstaller for BankStatemapInstaller { /// - On successful completion, we commit the transaction and return. `(Txn Commits and returns)` /// - When there is a retryable error, go to the start of the loop to retry. `(Txn aborts and retries)` /// - For non-retryable error, we return the `Error`. - async fn install(&self, statemap: Vec, snapshot_version: u64) -> Result<(), String> { let mut cnn = self.database.get().await.map_err(|e| e.to_string())?; loop { diff --git a/packages/cohort_banking/src/examples_support/queue_processor.rs b/packages/cohort_banking/src/examples_support/queue_processor.rs index 2ac44a61..16064cc5 100644 --- a/packages/cohort_banking/src/examples_support/queue_processor.rs +++ b/packages/cohort_banking/src/examples_support/queue_processor.rs @@ -1,13 +1,9 @@ use std::{ sync::Arc, - time::{Instant, SystemTime, UNIX_EPOCH}, + time::{SystemTime, UNIX_EPOCH}, }; -use metrics::model::MinMax; -use opentelemetry_api::{ - metrics::{Meter, Unit}, - Context, -}; +use opentelemetry_api::metrics::{Meter, Unit}; use tokio::task::JoinHandle; use async_trait::async_trait; @@ -21,37 +17,57 @@ pub trait Handler: Sync + Send { impl QueueProcessor { pub async fn process + 'static>( - queue: Arc>, + queue: Arc>, meter: Arc, threads: u64, item_handler: Arc, - ) -> Vec> { + ) -> Vec> { let item_handler = Arc::new(item_handler); - let mut tasks = Vec::>::new(); + let mut tasks = Vec::>::new(); for thread_number in 1..=threads { let queue_ref = Arc::clone(&queue); let item_handler = Arc::clone(&item_handler); let meter = Arc::clone(&meter); - let task_h: JoinHandle = tokio::spawn(async move { - let mut timeline = MinMax::default(); - let histogram = Arc::new(meter.f64_histogram("metric_duration").with_unit(Unit::new("ms")).init()); + + let task_h: JoinHandle<()> = tokio::spawn(async move { let counter = Arc::new(meter.u64_counter("metric_count").with_unit(Unit::new("tx")).init()); + let histogram = Arc::new(meter.f64_histogram("metric_candidate_roundtrip").with_unit(Unit::new("ms")).init()); + let histogram_sys = Arc::new(meter.f64_histogram("metric_candidate_roundtrip_sys").with_unit(Unit::new("ms")).init()); + let histogram_throughput = Arc::new(meter.f64_histogram("metric_throughput").with_unit(Unit::new("ms")).init()); let mut handled_count = 0; + let mut errors_count = 0; loop { let histogram_ref = Arc::clone(&histogram); + let histogram_sys_ref = Arc::clone(&histogram_sys); + let histogram_throughput_ref = Arc::clone(&histogram_throughput); + let counter_ref = Arc::clone(&counter); + match queue_ref.recv().await { - Err(_) => break, - Ok(item) => { - timeline.add(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() as i128); + Err(_) => { + errors_count += 1; + break; + } + Ok((item, scheduled_at_ms)) => { handled_count += 1; - let span_1 = Instant::now(); + + let started_at_ms = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() as f64 / 1_000_000_f64; let result = item_handler.handle(item).await; - let span_1_val = span_1.elapsed().as_nanos() as f64 / 1_000_000_f64; + + let processing_finished_ms = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() as f64 / 1_000_000_f64; tokio::spawn(async move { - histogram_ref.record(&Context::current(), span_1_val, &[]); + histogram_ref.record((processing_finished_ms - started_at_ms) * 100.0, &[]); + histogram_sys_ref.record((processing_finished_ms - scheduled_at_ms) * 10.0, &[]); + + // Record start and stop times of each transaction. + // This histogram will track min and max values, giving us the total duration of the test, excluding test specific code. + // The count value in this histogram will be 2x inflated, which will need to be accounted for. + histogram_throughput_ref.record(scheduled_at_ms, &[]); + histogram_throughput_ref.record(processing_finished_ms, &[]); + + counter_ref.add(1, &[]); }); if let Err(e) = result { @@ -67,14 +83,12 @@ impl QueueProcessor { } } - timeline.add(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() as i128); - - tokio::spawn(async move { - counter.add(&Context::current(), handled_count, &[]); - }); - log::debug!("Thread {:>2} stopped. Processed items: {}.", thread_number, handled_count); - - timeline + log::debug!( + "Thread {:>2} stopped. Processed items: {}. Errors: {}", + thread_number, + handled_count, + errors_count + ); }); tasks.push(task_h); } diff --git a/packages/cohort_sdk/Cargo.toml b/packages/cohort_sdk/Cargo.toml index ad0f29d6..a78c6998 100644 --- a/packages/cohort_sdk/Cargo.toml +++ b/packages/cohort_sdk/Cargo.toml @@ -9,20 +9,18 @@ async-trait = { workspace = true } env_logger = { workspace = true } log = { workspace = true } futures = { version = "0.3.28" } -opentelemetry_api = { version = "0.19.0" } -opentelemetry_sdk = { version = "0.19.0", features = ["metrics", "rt-tokio"] } -opentelemetry = { version = "0.19.0" } +opentelemetry_api = { version = "0.20.0" } +opentelemetry_sdk = { version = "0.20.0", features = ["metrics", "rt-tokio"] } +opentelemetry = { version = "0.20.0" } rand = { version = "0.8.5" } rdkafka = { version = "0.33.0", features = ["sasl"] } rdkafka-sys = { version = "4.3.0" } serde = { workspace = true } serde_json = { workspace = true } strum = { version = "0.25", features = ["derive"] } +metrics = { path = "../metrics" } talos_agent = { path = "../talos_agent" } -talos_suffix = { path = "../talos_suffix" } -# talos_cohort_replicator = { path = "../talos_cohort_replicator" } talos_certifier = { path = "../talos_certifier" } -talos_certifier_adapters = { path = "../talos_certifier_adapters" } uuid = { version = "1.2.2", features = ["v4"] } tokio = { workspace = true, features = ["full"] } diff --git a/packages/cohort_sdk/src/cohort.rs b/packages/cohort_sdk/src/cohort.rs index 97ba87fc..63ef98f8 100644 --- a/packages/cohort_sdk/src/cohort.rs +++ b/packages/cohort_sdk/src/cohort.rs @@ -6,7 +6,6 @@ use std::{ use opentelemetry_api::{ global, metrics::{Counter, Histogram, Unit}, - Context, }; use talos_agent::{ agent::{ @@ -26,7 +25,7 @@ use crate::{ delay_controller::DelayController, model::{ self, - callbacks::{ItemStateProvider, OutOfOrderInstallOutcome, OutOfOrderInstaller}, + callbacks::{CapturedState, ItemStateProvider, OutOfOrderInstallOutcome, OutOfOrderInstaller}, internal::CertificationAttemptOutcome, CertificationResponse, ClientError, Config, ResponseMetadata, }, @@ -39,7 +38,6 @@ pub struct Cohort { config: Config, talos_agent: Box, agent_services: AgentServices, - // replicator_services: ReplicatorServices, oo_retry_counter: Arc>, oo_giveups_counter: Arc>, oo_not_safe_counter: Arc>, @@ -49,7 +47,7 @@ pub struct Cohort { oo_wait_histogram: Arc>, talos_histogram: Arc>, talos_aborts_counter: Arc>, - agent_retries_counter: Arc>, + agent_retries_histogram: Arc>, agent_errors_counter: Arc>, db_errors_counter: Arc>, } @@ -105,27 +103,6 @@ impl Cohort { let agent_services = agent.start(rx_certify, rx_cancel, tx_decision, rx_decision, publisher, consumer); - // - // Code below is to start replicator from master branch... - // - - // // - // // start replicator - // // - - // let suffix = CohortSuffix::with_config(config.clone().into()); - // let kafka_consumer = KafkaConsumer::new(&talos_kafka_config); - // kafka_consumer.subscribe().await.unwrap(); - // let replicator = CohortReplicator::new(kafka_consumer, suffix); - - // let (tx_install_req, rx_statemaps_ch) = mpsc::channel(config.replicator_buffer_size); - // let (tx_install_result_ch, rx_install_result) = tokio::sync::mpsc::channel(config.replicator_buffer_size); - // let replicator_handle = tokio::spawn(ReplicatorService2::start_replicator(replicator, tx_install_req, rx_install_result)); - // let replicator_impl = ReplicatorInstallerImpl { - // installer_impl: statemap_installer, - // }; - // let installer_handle = tokio::spawn(ReplicatorService2::start_installer(rx_statemaps_ch, tx_install_result_ch, replicator_impl)); - let meter = global::meter("cohort_sdk"); let oo_install_histogram = meter.f64_histogram("metric_oo_install_duration").with_unit(Unit::new("ms")).init(); let oo_attempts_histogram = meter.u64_histogram("metric_oo_attempts").with_unit(Unit::new("tx")).init(); @@ -137,17 +114,20 @@ impl Cohort { let oo_not_safe_counter = meter.u64_counter("metric_oo_not_safe_count").with_unit(Unit::new("tx")).init(); let talos_aborts_counter = meter.u64_counter("metric_talos_aborts_count").with_unit(Unit::new("tx")).init(); let agent_errors_counter = meter.u64_counter("metric_agent_errors_count").with_unit(Unit::new("tx")).init(); - let agent_retries_counter = meter.u64_counter("metric_agent_retries_count").with_unit(Unit::new("tx")).init(); - let db_errors_counter = meter.u64_counter("metric_db_errors_counter").with_unit(Unit::new("tx")).init(); + let agent_retries_histogram = meter.u64_histogram("metric_agent_retries").with_unit(Unit::new("tx")).init(); + let db_errors_counter = meter.u64_counter("metric_db_errors_count").with_unit(Unit::new("tx")).init(); + + oo_retry_counter.add(0, &[]); + oo_giveups_counter.add(0, &[]); + oo_not_safe_counter.add(0, &[]); + talos_aborts_counter.add(0, &[]); + agent_errors_counter.add(0, &[]); + db_errors_counter.add(0, &[]); Ok(Self { config, talos_agent: Box::new(agent), agent_services, - // replicator_services: ReplicatorServices { - // replicator_handle, - // installer_handle, - // }, oo_install_histogram: Arc::new(oo_install_histogram), oo_install_and_wait_histogram: Arc::new(oo_install_and_wait_histogram), oo_wait_histogram: Arc::new(oo_wait_histogram), @@ -156,8 +136,8 @@ impl Cohort { oo_not_safe_counter: Arc::new(oo_not_safe_counter), oo_attempts_histogram: Arc::new(oo_attempts_histogram), talos_histogram: Arc::new(talos_histogram), + agent_retries_histogram: Arc::new(agent_retries_histogram), talos_aborts_counter: Arc::new(talos_aborts_counter), - agent_retries_counter: Arc::new(agent_retries_counter), agent_errors_counter: Arc::new(agent_errors_counter), db_errors_counter: Arc::new(db_errors_counter), }) @@ -179,7 +159,7 @@ impl Cohort { let h_talos = Arc::clone(&self.talos_histogram); tokio::spawn(async move { - h_talos.record(&Context::current(), span_1_val, &[]); + h_talos.record(span_1_val * 100.0, &[]); }); if response.decision == Decision::Aborted { @@ -190,7 +170,7 @@ impl Cohort { let safepoint = response.safepoint.unwrap(); let new_version = response.version; - let mut controller = DelayController::new(20, self.config.retry_oo_backoff_max_ms); + let mut controller = DelayController::new(self.config.retry_oo_backoff.min_ms, self.config.retry_oo_backoff.max_ms); let mut attempt = 0; let span_2 = Instant::now(); @@ -207,8 +187,7 @@ impl Cohort { let h_install = Arc::clone(&self.oo_install_histogram); tokio::spawn(async move { - let ctx = &Context::current(); - h_install.record(ctx, span_3_val, &[]); + h_install.record(span_3_val * 100.0, &[]); }); let error = match install_result { @@ -254,23 +233,21 @@ impl Cohort { let c_retry = Arc::clone(&self.oo_retry_counter); tokio::spawn(async move { - let ctx = &Context::current(); - if is_not_save > 0 { - c_not_safe.add(ctx, is_not_save, &[]); + c_not_safe.add(is_not_save, &[]); } if total_sleep > 0 { - h_total_sleep.record(ctx, total_sleep as f64, &[]); + h_total_sleep.record(total_sleep as f64 * 100.0, &[]); } if giveups > 0 { - c_giveups.add(ctx, giveups, &[]); + c_giveups.add(giveups, &[]); } if attempt > 1 { - c_retry.add(ctx, attempt - 1, &[]); + c_retry.add(attempt - 1, &[]); } - h_attempts.record(ctx, attempt, &[]); - h_span_2.record(ctx, span_2_val, &[]); + h_attempts.record(attempt, &[]); + h_span_2.record(span_2_val * 100.0, &[]); }); result } @@ -282,109 +259,126 @@ impl Cohort { let started_at = Instant::now(); let mut attempts = 0; - // let mut delay_controller = Box::new(DelayController::new(20, self.config.retry_backoff_max_ms)); - let mut delay_controller = DelayController::new(20, self.config.retry_backoff_max_ms); + let mut delay_controller = DelayController::new(self.config.retry_backoff.min_ms, self.config.retry_backoff.max_ms); let mut talos_aborts = 0_u64; let mut agent_errors = 0_u64; let mut db_errors = 0_u64; + let mut recent_conflict: Option = None; + let mut recent_abort: Option = None; + let result = loop { // One of these will be sent to client if we failed - let recent_error: Option; - let recent_response: Option; + let result: Option>; attempts += 1; - let is_success = match self.send_to_talos_attempt(request.clone(), state_provider).await { + let is_success = match self.send_to_talos_attempt(request.clone(), state_provider, recent_conflict).await { CertificationAttemptOutcome::Success { mut response } => { response.metadata.duration_ms = started_at.elapsed().as_millis() as u64; response.metadata.attempts = attempts; - recent_error = None; - recent_response = Some(response); + result = Some(Ok(response)); true } CertificationAttemptOutcome::Aborted { mut response } => { talos_aborts += 1; response.metadata.duration_ms = started_at.elapsed().as_millis() as u64; response.metadata.attempts = attempts; - recent_error = None; - recent_response = Some(response); + recent_conflict = response.conflict; + recent_abort = Some(response.clone()); + // result = recent_abort.map(|a| Ok(a)); + result = Some(Ok(response)); + false + } + CertificationAttemptOutcome::SnapshotTimeout { waited, conflict } => { + log::error!("Timeout wating for snapshot: {:?}. Waited: {:.2} sec", conflict, waited.as_secs_f32()); + result = recent_abort.clone().map(Ok); false } CertificationAttemptOutcome::AgentError { error } => { - recent_error = Some(ClientError::from(error)); - recent_response = None; + result = Some(Err(ClientError::from(error))); agent_errors += 1; false } CertificationAttemptOutcome::DataError { reason } => { - recent_error = Some(ClientError { + result = Some(Err(ClientError { kind: model::ClientErrorKind::Persistence, reason, cause: None, - }); - recent_response = None; + })); db_errors += 1; false } }; - if is_success { - break Ok(recent_response.unwrap()); + let rslt_response = result.unwrap(); + if is_success || self.config.retry_attempts_max <= attempts { + break rslt_response; } - if self.config.retry_attempts_max <= attempts { - if let Some(response) = recent_response { - break Ok(response); - } else if let Some(error) = recent_error { - break Err(error); + match rslt_response { + Ok(response) => { + log::debug!( + "Unsuccessful transaction: {:?}. Response: {:?} This might retry. Attempts: {}", + request.candidate.statemap, + response.decision, + attempts + ); + } + Err(error) => { + log::debug!( + "Unsuccessful transaction with error: {:?}. {} This might retry. Attempts: {}", + request.candidate.statemap, + error, + attempts + ); } - } else if let Some(response) = recent_response { - log::debug!( - "Unsuccessful transaction: {:?}. Response: {:?} This might retry. Attempts: {}", - request.candidate.statemap, - response.decision, - attempts - ); - } else if let Some(error) = recent_error { - log::debug!( - "Unsuccessful transaction with error: {:?}. {} This might retry. Attempts: {}", - request.candidate.statemap, - error, - attempts - ); } delay_controller.sleep().await; }; let c_talos_aborts = Arc::clone(&self.talos_aborts_counter); - let c_agent_retries = Arc::clone(&self.agent_retries_counter); let c_agent_errors = Arc::clone(&self.agent_errors_counter); let c_db_errors = Arc::clone(&self.db_errors_counter); + let h_agent_retries = Arc::clone(&self.agent_retries_histogram); - if agent_errors > 0 || db_errors > 0 || attempts > 1 || talos_aborts > 0 { + if agent_errors > 0 || db_errors > 0 || talos_aborts > 0 || attempts > 0 { tokio::spawn(async move { - let ctx = &Context::current(); - c_talos_aborts.add(ctx, talos_aborts, &[]); - c_agent_retries.add(ctx, attempts, &[]); - c_agent_errors.add(ctx, agent_errors, &[]); - c_db_errors.add(ctx, db_errors, &[]); + c_talos_aborts.add(talos_aborts, &[]); + c_agent_errors.add(agent_errors, &[]); + c_db_errors.add(db_errors, &[]); + h_agent_retries.record(attempts, &[]); }); } result } - async fn send_to_talos_attempt(&self, request: model::CertificationRequest, state_provider: &S) -> CertificationAttemptOutcome + async fn send_to_talos_attempt( + &self, + request: model::CertificationRequest, + state_provider: &S, + previous_conflict: Option, + ) -> CertificationAttemptOutcome where S: ItemStateProvider, { - let result_local_state = state_provider.get_state().await; - if let Err(reason) = result_local_state { - return CertificationAttemptOutcome::DataError { reason }; - } + let timeout = if request.timeout_ms > 0 { + Duration::from_millis(request.timeout_ms) + } else { + Duration::from_millis(self.config.snapshot_wait_timeout_ms) + }; - let local_state = result_local_state.unwrap(); + let local_state: CapturedState = match self.await_for_snapshot(state_provider, previous_conflict, timeout).await { + Err(SnapshotPollErrorType::FetchError { reason }) => return CertificationAttemptOutcome::DataError { reason }, + Err(SnapshotPollErrorType::Timeout { waited }) => { + return CertificationAttemptOutcome::SnapshotTimeout { + waited, + conflict: previous_conflict.unwrap(), + } + } + Ok(local_state) => local_state, + }; log::debug!("loaded state: {}, {:?}", local_state.snapshot_version, local_state.items); @@ -416,6 +410,7 @@ impl Cohort { safepoint: agent_response.safepoint, version: agent_response.version, metadata: ResponseMetadata { duration_ms: 0, attempts: 0 }, + conflict: agent_response.conflict.map(|cm| cm.version), }; if response.decision == Decision::Aborted { @@ -428,6 +423,38 @@ impl Cohort { } } + async fn await_for_snapshot(&self, state_provider: &S, previous_conflict: Option, timeout: Duration) -> Result + where + S: ItemStateProvider, + { + match previous_conflict { + None => state_provider.get_state().await.map_err(|reason| SnapshotPollErrorType::FetchError { reason }), + Some(conflict) => { + let mut delay_controller = DelayController::new(self.config.backoff_on_conflict.min_ms, self.config.backoff_on_conflict.max_ms); + let poll_started_at = Instant::now(); + loop { + let result_local_state = state_provider.get_state().await; + match result_local_state { + Err(reason) => return Err(SnapshotPollErrorType::FetchError { reason }), + Ok(current_state) => { + if current_state.snapshot_version < conflict { + // not safe yet + let waited = poll_started_at.elapsed(); + if waited >= timeout { + return Err(SnapshotPollErrorType::Timeout { waited }); + } + delay_controller.sleep().await; + continue; + } else { + break Ok(current_state); + } + } + } + } + } + } + } + fn select_snapshot_and_readvers(cpt_snapshot: u64, cpt_versions: Vec) -> (u64, Vec) { if cpt_versions.is_empty() { log::debug!( @@ -463,11 +490,11 @@ impl Cohort { } pub async fn shutdown(&self) { - // TODO implement graceful shutdown with timeout? Wait for channels to be drained and then exit. - // while self.channel_tx_certify.capacity() != MAX { wait() } self.agent_services.decision_reader.abort(); self.agent_services.state_manager.abort(); - // self.replicator_services.replicator_handle.abort(); - // self.replicator_services.installer_handle.abort(); } } +pub enum SnapshotPollErrorType { + Timeout { waited: Duration }, + FetchError { reason: String }, +} diff --git a/packages/cohort_sdk/src/model/internal.rs b/packages/cohort_sdk/src/model/internal.rs index 838563af..ea32b708 100644 --- a/packages/cohort_sdk/src/model/internal.rs +++ b/packages/cohort_sdk/src/model/internal.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use talos_agent::agent::errors::AgentError; use super::CertificationResponse; @@ -7,4 +9,5 @@ pub(crate) enum CertificationAttemptOutcome { Aborted { response: CertificationResponse }, AgentError { error: AgentError }, DataError { reason: String }, + SnapshotTimeout { waited: Duration, conflict: u64 }, } diff --git a/packages/cohort_sdk/src/model/mod.rs b/packages/cohort_sdk/src/model/mod.rs index 5e19fbc0..90872f46 100644 --- a/packages/cohort_sdk/src/model/mod.rs +++ b/packages/cohort_sdk/src/model/mod.rs @@ -9,11 +9,8 @@ use talos_agent::{ api::{AgentConfig, KafkaConfig, TalosType}, messaging::api::Decision, }; -use talos_certifier_adapters::kafka::config::KafkaConfig as TalosKafkaConfig; -use talos_suffix::core::SuffixConfig; use tokio::task::JoinHandle; -// #[napi] #[derive(Clone)] pub struct CandidateData { pub readset: Vec, @@ -22,29 +19,29 @@ pub struct CandidateData { // The "snapshot" is intentionally messing here. We will compute it ourselves before feeding this data to Talos } -// #[napi] #[derive(Clone)] pub struct CertificationRequest { pub candidate: CandidateData, pub timeout_ms: u64, } -// #[napi] +#[derive(Clone)] pub struct CertificationResponse { pub xid: String, pub decision: Decision, pub version: u64, pub safepoint: Option, + pub conflict: Option, pub metadata: ResponseMetadata, } +#[derive(Clone)] pub struct ResponseMetadata { pub attempts: u64, pub duration_ms: u64, } #[derive(strum::Display)] -// #[napi] // this is napi friendly copy of talos_agent::agent::errors::AgentErrorKind pub enum ClientErrorKind { Certification, @@ -56,7 +53,6 @@ pub enum ClientErrorKind { OutOfOrderSnapshotTimeout, } -// #[napi] pub struct ClientError { pub kind: ClientErrorKind, pub reason: String, @@ -64,16 +60,31 @@ pub struct ClientError { } #[derive(Clone)] -// #[napi] +pub struct BackoffConfig { + pub min_ms: u64, + pub max_ms: u64, +} + +impl BackoffConfig { + pub fn new(min_ms: u64, max_ms: u64) -> Self { + Self { min_ms, max_ms } + } +} + +#[derive(Clone)] pub struct Config { // // cohort configs // + pub backoff_on_conflict: BackoffConfig, + pub retry_backoff: BackoffConfig, + pub retry_attempts_max: u64, - pub retry_backoff_max_ms: u64, - pub retry_oo_backoff_max_ms: u64, + pub retry_oo_backoff: BackoffConfig, pub retry_oo_attempts_max: u64, + pub snapshot_wait_timeout_ms: u64, + // // agent config values // @@ -105,33 +116,6 @@ pub struct Config { // should be mapped to rdkafka::config::RDKafkaLogLevel pub agent_log_level: u64, - // - // Kafka configs for Replicator - // - pub replicator_client_id: String, - pub replicator_group_id: String, - pub producer_config_overrides: HashMap<&'static str, &'static str>, - pub consumer_config_overrides: HashMap<&'static str, &'static str>, - - // - // Suffix config values - // - /// Initial capacity of the suffix - pub suffix_size_max: usize, - /// - The suffix prune threshold from when we start checking if the suffix - /// should prune. - /// - Set to None if pruning is not required. - /// - Defaults to None. - pub suffix_prune_at_size: Option, - /// Minimum size of suffix after prune. - /// - Defaults to None. - pub suffix_size_min: Option, - - // - // Replicator config values - // - pub replicator_buffer_size: usize, - // // Database config // @@ -158,16 +142,6 @@ impl From for AgentConfig { } } -impl From for SuffixConfig { - fn from(val: Config) -> Self { - SuffixConfig { - capacity: val.suffix_size_max, - prune_start_threshold: val.suffix_prune_at_size, - min_size_after_prune: val.suffix_size_min, - } - } -} - impl From for KafkaConfig { fn from(val: Config) -> Self { KafkaConfig { @@ -186,23 +160,6 @@ impl From for KafkaConfig { } } -impl From for TalosKafkaConfig { - fn from(val: Config) -> Self { - TalosKafkaConfig { - brokers: val.brokers.split(',').map(|i| i.to_string()).collect(), - topic: val.topic, - // TODO: not sure how napi will handle Option<> fields, if it can process them then we dont need to use this mapping. - username: val.kafka_username.unwrap_or_else(|| "".into()), - // TODO: not sure how napi will handle Option<> fields, if it can process them then we dont need to use this mapping. - password: val.kafka_password.unwrap_or_else(|| "".into()), - client_id: val.replicator_client_id, - group_id: val.replicator_group_id, - producer_config_overrides: val.producer_config_overrides, - consumer_config_overrides: val.consumer_config_overrides, - } - } -} - impl From for ClientError { fn from(agent_error: AgentError) -> Self { let (kind, reason) = match agent_error.kind { diff --git a/packages/examples_support/src/load_generator/generator.rs b/packages/examples_support/src/load_generator/generator.rs index 059a37bd..36ad3af4 100644 --- a/packages/examples_support/src/load_generator/generator.rs +++ b/packages/examples_support/src/load_generator/generator.rs @@ -10,7 +10,7 @@ use super::models::Generator; pub struct ControlledRateLoadGenerator {} impl ControlledRateLoadGenerator { - pub async fn generate(stop_type: StopType, target_rate: f32, mut generator_impl: G, tx_output: Arc>) -> Result<(), String> + pub async fn generate(stop_type: StopType, target_rate: f32, mut generator_impl: G, tx_output: Arc>) -> Result<(), String> where G: Generator + Sized + 'static, { @@ -75,7 +75,8 @@ impl ControlledRateLoadGenerator { } let new_item: T = generator_impl.generate(); - let _ = tx_output.send(new_item).await; + let now_ms = OffsetDateTime::now_utc().unix_timestamp_nanos() as f64 / 1_000_000_f64; + let _ = tx_output.send((new_item, now_ms)).await; generated_count += 1; let now = OffsetDateTime::now_utc().unix_timestamp_nanos(); @@ -93,6 +94,8 @@ impl ControlledRateLoadGenerator { } } + tx_output.close(); + let now = OffsetDateTime::now_utc().unix_timestamp_nanos(); let duration = Duration::from_nanos((now - started_at) as u64).as_secs_f32(); log::warn!( diff --git a/packages/metrics/Cargo.toml b/packages/metrics/Cargo.toml index ed74845d..348e4f2b 100644 --- a/packages/metrics/Cargo.toml +++ b/packages/metrics/Cargo.toml @@ -5,4 +5,16 @@ edition = "2021" [dependencies] -time = { version = "0.3.17" } \ No newline at end of file +once_cell = { version = "1.18.0"} +opentelemetry_api = { version = "0.20.0", features = ["metrics"] } +opentelemetry-stdout = { version = "0.1.0", features = ["metrics"] } +opentelemetry_sdk = { version = "0.20.0", features = ["metrics", "rt-tokio"] } +opentelemetry = { version = "0.20.0", features = ["metrics"] } + +serde = { workspace = true } +serde_json = { workspace = true } + +env_logger = { workspace = true } +log = { workspace = true } + +time = { version = "0.3.17" } diff --git a/packages/metrics/src/lib.rs b/packages/metrics/src/lib.rs index 65880be0..5fc7db15 100644 --- a/packages/metrics/src/lib.rs +++ b/packages/metrics/src/lib.rs @@ -1 +1,2 @@ pub mod model; +pub mod opentel; diff --git a/packages/metrics/src/opentel/aggregation_selector.rs b/packages/metrics/src/opentel/aggregation_selector.rs new file mode 100644 index 00000000..31bc177d --- /dev/null +++ b/packages/metrics/src/opentel/aggregation_selector.rs @@ -0,0 +1,56 @@ +use opentelemetry_sdk::metrics::{reader::AggregationSelector, Aggregation, InstrumentKind}; + +use super::buckets::BUCKETS_4K; + +#[derive(Debug)] +pub struct CustomHistogramSelector { + buckets: Vec, +} + +impl Default for CustomHistogramSelector { + fn default() -> Self { + CustomHistogramSelector { + buckets: vec![0.0, 10.0, 100.0, 500.0, 1_000.0, 10_000.0], + } + } +} + +impl CustomHistogramSelector { + pub fn new_with_4k_buckets() -> Result { + let mut buckets: Vec = Vec::new(); + for b in BUCKETS_4K { + buckets.push(b as f64) + } + + Ok(Self { buckets }) + } + pub fn new2(csv_buckets: &str) -> Result { + let buckets_iter = csv_buckets.split(','); + + let mut buckets: Vec = Vec::new(); + for txt in buckets_iter { + let cleaned = txt.replace(' ', ""); + let parsed = cleaned + .parse::() + .map_err(|e| format!("Cannot convert this '{}' to f64. Error: {}", cleaned, e))?; + buckets.push(parsed); + } + + Ok(Self { buckets }) + } +} + +impl AggregationSelector for CustomHistogramSelector { + fn aggregation(&self, kind: InstrumentKind) -> Aggregation { + match kind { + InstrumentKind::Counter | InstrumentKind::UpDownCounter | InstrumentKind::ObservableCounter | InstrumentKind::ObservableUpDownCounter => { + Aggregation::Sum + } + InstrumentKind::ObservableGauge => Aggregation::LastValue, + InstrumentKind::Histogram => Aggregation::ExplicitBucketHistogram { + boundaries: self.buckets.clone(), + record_min_max: true, + }, + } + } +} diff --git a/packages/metrics/src/opentel/buckets.rs b/packages/metrics/src/opentel/buckets.rs new file mode 100644 index 00000000..164e1bb0 --- /dev/null +++ b/packages/metrics/src/opentel/buckets.rs @@ -0,0 +1,178 @@ +// pub static BUCKETS_4K: [u32; 2] = [0, 1]; +pub static BUCKETS_4K: [u32; 4096] = [ + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, + 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, + 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, + 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, + 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, + 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, + 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, + 239, 240, 241, 242, 243, 244, 245, 246, 247, 248, 249, 250, 251, 252, 253, 254, 255, 256, 257, 258, 259, 260, 261, 262, 263, 264, 265, 266, 267, 268, 269, + 270, 271, 272, 273, 274, 275, 276, 277, 278, 279, 280, 281, 282, 283, 284, 285, 286, 287, 288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 298, 299, 300, + 301, 302, 303, 304, 305, 306, 307, 308, 309, 310, 311, 312, 313, 314, 315, 316, 317, 318, 319, 320, 321, 322, 323, 324, 325, 326, 327, 328, 329, 330, 331, + 332, 333, 334, 335, 336, 337, 338, 339, 340, 341, 342, 343, 344, 345, 346, 347, 348, 349, 350, 351, 352, 353, 354, 355, 356, 357, 358, 359, 360, 361, 362, + 363, 364, 365, 366, 367, 368, 369, 370, 371, 372, 373, 374, 375, 376, 377, 378, 379, 380, 381, 382, 383, 384, 385, 386, 387, 388, 389, 390, 391, 392, 393, + 394, 395, 396, 397, 398, 399, 400, 401, 402, 403, 404, 405, 406, 407, 408, 409, 410, 411, 412, 413, 414, 415, 416, 417, 418, 419, 420, 421, 422, 423, 424, + 425, 426, 427, 428, 429, 430, 431, 432, 433, 434, 435, 436, 437, 438, 439, 440, 441, 442, 443, 444, 445, 446, 447, 448, 449, 450, 451, 452, 453, 454, 455, + 456, 457, 458, 459, 460, 461, 462, 463, 464, 465, 466, 467, 468, 469, 470, 471, 472, 473, 474, 475, 476, 477, 478, 479, 480, 481, 482, 483, 484, 485, 486, + 487, 488, 489, 490, 491, 492, 493, 494, 495, 496, 497, 498, 499, 500, 501, 502, 503, 504, 505, 506, 507, 508, 509, 510, 511, 512, 513, 514, 515, 516, 517, + 518, 519, 520, 521, 522, 523, 524, 525, 526, 527, 528, 529, 530, 531, 532, 533, 534, 535, 536, 537, 538, 539, 540, 541, 542, 543, 544, 545, 546, 547, 548, + 549, 550, 551, 552, 553, 554, 555, 556, 557, 558, 559, 560, 561, 562, 563, 564, 565, 566, 567, 568, 569, 570, 571, 572, 573, 574, 575, 576, 577, 578, 579, + 580, 581, 582, 583, 584, 585, 586, 587, 588, 589, 590, 591, 592, 593, 594, 595, 596, 597, 598, 599, 600, 601, 602, 603, 604, 605, 606, 607, 608, 609, 610, + 611, 612, 613, 614, 615, 616, 617, 618, 619, 620, 621, 622, 623, 624, 625, 626, 627, 628, 629, 630, 631, 632, 633, 634, 635, 636, 637, 638, 639, 640, 641, + 642, 643, 644, 645, 646, 647, 648, 649, 650, 651, 652, 653, 654, 655, 656, 657, 658, 659, 660, 661, 662, 663, 664, 665, 666, 667, 668, 669, 670, 671, 672, + 673, 674, 675, 676, 677, 678, 679, 680, 681, 682, 683, 684, 685, 686, 687, 688, 689, 690, 691, 692, 693, 694, 695, 696, 697, 698, 700, 702, 704, 706, 708, + 710, 712, 714, 716, 718, 720, 722, 724, 726, 728, 730, 732, 734, 736, 738, 740, 742, 744, 746, 748, 750, 752, 754, 756, 758, 760, 762, 764, 766, 768, 770, + 772, 774, 776, 778, 780, 782, 784, 786, 788, 790, 792, 794, 796, 798, 800, 802, 804, 806, 808, 810, 812, 814, 816, 818, 820, 822, 824, 826, 828, 830, 832, + 834, 836, 838, 840, 842, 844, 846, 848, 850, 852, 854, 856, 858, 860, 862, 864, 866, 868, 870, 872, 874, 876, 878, 880, 882, 884, 886, 888, 890, 892, 894, + 896, 898, 900, 902, 904, 906, 908, 910, 912, 914, 916, 918, 920, 922, 924, 926, 928, 930, 932, 934, 936, 938, 940, 942, 944, 946, 948, 950, 952, 954, 956, + 958, 960, 962, 964, 966, 968, 970, 972, 974, 976, 978, 980, 982, 984, 986, 988, 990, 992, 994, 996, 998, 1000, 1002, 1004, 1006, 1008, 1010, 1012, 1014, + 1016, 1018, 1020, 1022, 1024, 1026, 1028, 1030, 1032, 1034, 1036, 1038, 1040, 1042, 1044, 1046, 1048, 1050, 1052, 1054, 1056, 1058, 1060, 1062, 1064, 1066, + 1068, 1070, 1072, 1074, 1076, 1078, 1080, 1082, 1084, 1086, 1088, 1090, 1092, 1094, 1096, 1098, 1100, 1102, 1104, 1106, 1108, 1110, 1112, 1114, 1116, 1118, + 1120, 1122, 1124, 1126, 1128, 1130, 1132, 1134, 1136, 1138, 1140, 1142, 1144, 1146, 1148, 1150, 1152, 1154, 1156, 1158, 1160, 1162, 1164, 1167, 1170, 1173, + 1176, 1179, 1182, 1185, 1188, 1191, 1194, 1197, 1200, 1203, 1206, 1209, 1212, 1215, 1218, 1221, 1224, 1227, 1230, 1233, 1236, 1239, 1242, 1245, 1248, 1251, + 1254, 1257, 1260, 1263, 1266, 1269, 1272, 1275, 1278, 1281, 1284, 1287, 1290, 1293, 1296, 1299, 1302, 1305, 1308, 1311, 1314, 1317, 1320, 1323, 1326, 1329, + 1332, 1335, 1338, 1341, 1344, 1347, 1350, 1353, 1356, 1359, 1362, 1365, 1368, 1371, 1374, 1377, 1380, 1383, 1386, 1389, 1392, 1395, 1398, 1401, 1404, 1407, + 1410, 1413, 1416, 1419, 1422, 1425, 1428, 1431, 1434, 1437, 1440, 1443, 1446, 1449, 1452, 1455, 1458, 1461, 1464, 1467, 1470, 1473, 1476, 1479, 1482, 1485, + 1488, 1491, 1494, 1497, 1500, 1503, 1506, 1509, 1512, 1515, 1518, 1521, 1524, 1527, 1530, 1533, 1536, 1539, 1542, 1545, 1548, 1551, 1554, 1557, 1560, 1563, + 1566, 1569, 1572, 1575, 1578, 1581, 1584, 1587, 1590, 1593, 1596, 1599, 1602, 1605, 1608, 1611, 1614, 1617, 1620, 1623, 1626, 1629, 1633, 1637, 1641, 1645, + 1649, 1653, 1657, 1661, 1665, 1669, 1673, 1677, 1681, 1685, 1689, 1693, 1697, 1701, 1705, 1709, 1713, 1717, 1721, 1725, 1729, 1733, 1737, 1741, 1745, 1749, + 1753, 1757, 1761, 1765, 1769, 1773, 1777, 1781, 1785, 1789, 1793, 1797, 1801, 1805, 1809, 1813, 1817, 1821, 1825, 1829, 1833, 1837, 1841, 1845, 1849, 1853, + 1857, 1861, 1865, 1869, 1873, 1877, 1881, 1885, 1889, 1893, 1897, 1901, 1905, 1909, 1913, 1917, 1921, 1925, 1929, 1933, 1937, 1941, 1945, 1949, 1953, 1957, + 1961, 1965, 1969, 1973, 1977, 1981, 1985, 1989, 1993, 1997, 2001, 2005, 2009, 2013, 2017, 2021, 2025, 2029, 2033, 2037, 2041, 2045, 2049, 2053, 2057, 2061, + 2065, 2069, 2073, 2077, 2081, 2085, 2089, 2093, 2098, 2103, 2108, 2113, 2118, 2123, 2128, 2133, 2138, 2143, 2148, 2153, 2158, 2163, 2168, 2173, 2178, 2183, + 2188, 2193, 2198, 2203, 2208, 2213, 2218, 2223, 2228, 2233, 2238, 2243, 2248, 2253, 2258, 2263, 2268, 2273, 2278, 2283, 2288, 2293, 2298, 2303, 2308, 2313, + 2318, 2323, 2328, 2333, 2338, 2343, 2348, 2353, 2358, 2363, 2368, 2373, 2378, 2383, 2388, 2393, 2398, 2403, 2408, 2413, 2418, 2423, 2428, 2433, 2438, 2443, + 2448, 2453, 2458, 2463, 2468, 2473, 2478, 2483, 2488, 2493, 2498, 2503, 2508, 2513, 2518, 2523, 2528, 2533, 2538, 2543, 2548, 2553, 2558, 2564, 2570, 2576, + 2582, 2588, 2594, 2600, 2606, 2612, 2618, 2624, 2630, 2636, 2642, 2648, 2654, 2660, 2666, 2672, 2678, 2684, 2690, 2696, 2702, 2708, 2714, 2720, 2726, 2732, + 2738, 2744, 2750, 2756, 2762, 2768, 2774, 2780, 2786, 2792, 2798, 2804, 2810, 2816, 2822, 2828, 2834, 2840, 2846, 2852, 2858, 2864, 2870, 2876, 2882, 2888, + 2894, 2900, 2906, 2912, 2918, 2924, 2930, 2936, 2942, 2948, 2954, 2960, 2966, 2972, 2978, 2984, 2990, 2996, 3002, 3008, 3014, 3020, 3026, 3033, 3040, 3047, + 3054, 3061, 3068, 3075, 3082, 3089, 3096, 3103, 3110, 3117, 3124, 3131, 3138, 3145, 3152, 3159, 3166, 3173, 3180, 3187, 3194, 3201, 3208, 3215, 3222, 3229, + 3236, 3243, 3250, 3257, 3264, 3271, 3278, 3285, 3292, 3299, 3306, 3313, 3320, 3327, 3334, 3341, 3348, 3355, 3362, 3369, 3376, 3383, 3390, 3397, 3404, 3411, + 3418, 3425, 3432, 3439, 3446, 3453, 3460, 3467, 3474, 3481, 3488, 3496, 3504, 3512, 3520, 3528, 3536, 3544, 3552, 3560, 3568, 3576, 3584, 3592, 3600, 3608, + 3616, 3624, 3632, 3640, 3648, 3656, 3664, 3672, 3680, 3688, 3696, 3704, 3712, 3720, 3728, 3736, 3744, 3752, 3760, 3768, 3776, 3784, 3792, 3800, 3808, 3816, + 3824, 3832, 3840, 3848, 3856, 3864, 3872, 3880, 3888, 3896, 3904, 3912, 3920, 3928, 3936, 3944, 3952, 3960, 3969, 3978, 3987, 3996, 4005, 4014, 4023, 4032, + 4041, 4050, 4059, 4068, 4077, 4086, 4095, 4104, 4113, 4122, 4131, 4140, 4149, 4158, 4167, 4176, 4185, 4194, 4203, 4212, 4221, 4230, 4239, 4248, 4257, 4266, + 4275, 4284, 4293, 4302, 4311, 4320, 4329, 4338, 4347, 4356, 4365, 4374, 4383, 4392, 4401, 4410, 4419, 4429, 4439, 4449, 4459, 4469, 4479, 4489, 4499, 4509, + 4519, 4529, 4539, 4549, 4559, 4569, 4579, 4589, 4599, 4609, 4619, 4629, 4639, 4649, 4659, 4669, 4679, 4689, 4699, 4709, 4719, 4729, 4739, 4749, 4759, 4769, + 4779, 4789, 4799, 4809, 4819, 4829, 4839, 4849, 4859, 4869, 4879, 4889, 4900, 4911, 4922, 4933, 4944, 4955, 4966, 4977, 4988, 4999, 5010, 5021, 5032, 5043, + 5054, 5065, 5076, 5087, 5098, 5109, 5120, 5131, 5142, 5153, 5164, 5175, 5186, 5197, 5208, 5219, 5230, 5241, 5252, 5263, 5274, 5285, 5296, 5307, 5318, 5329, + 5340, 5351, 5363, 5375, 5387, 5399, 5411, 5423, 5435, 5447, 5459, 5471, 5483, 5495, 5507, 5519, 5531, 5543, 5555, 5567, 5579, 5591, 5603, 5615, 5627, 5639, + 5651, 5663, 5675, 5687, 5699, 5711, 5723, 5735, 5747, 5759, 5771, 5783, 5795, 5807, 5819, 5832, 5845, 5858, 5871, 5884, 5897, 5910, 5923, 5936, 5949, 5962, + 5975, 5988, 6001, 6014, 6027, 6040, 6053, 6066, 6079, 6092, 6105, 6118, 6131, 6144, 6157, 6170, 6183, 6196, 6209, 6222, 6235, 6248, 6261, 6274, 6287, 6301, + 6315, 6329, 6343, 6357, 6371, 6385, 6399, 6413, 6427, 6441, 6455, 6469, 6483, 6497, 6511, 6525, 6539, 6553, 6567, 6581, 6595, 6609, 6623, 6637, 6651, 6665, + 6679, 6693, 6707, 6721, 6735, 6749, 6764, 6779, 6794, 6809, 6824, 6839, 6854, 6869, 6884, 6899, 6914, 6929, 6944, 6959, 6974, 6989, 7004, 7019, 7034, 7049, + 7064, 7079, 7094, 7109, 7124, 7139, 7154, 7169, 7184, 7199, 7214, 7230, 7246, 7262, 7278, 7294, 7310, 7326, 7342, 7358, 7374, 7390, 7406, 7422, 7438, 7454, + 7470, 7486, 7502, 7518, 7534, 7550, 7566, 7582, 7598, 7614, 7630, 7646, 7662, 7678, 7695, 7712, 7729, 7746, 7763, 7780, 7797, 7814, 7831, 7848, 7865, 7882, + 7899, 7916, 7933, 7950, 7967, 7984, 8001, 8018, 8035, 8052, 8069, 8086, 8103, 8120, 8137, 8154, 8172, 8190, 8208, 8226, 8244, 8262, 8280, 8298, 8316, 8334, + 8352, 8370, 8388, 8406, 8424, 8442, 8460, 8478, 8496, 8514, 8532, 8550, 8568, 8586, 8604, 8623, 8642, 8661, 8680, 8699, 8718, 8737, 8756, 8775, 8794, 8813, + 8832, 8851, 8870, 8889, 8908, 8927, 8946, 8965, 8984, 9003, 9022, 9041, 9060, 9079, 9099, 9119, 9139, 9159, 9179, 9199, 9219, 9239, 9259, 9279, 9299, 9319, + 9339, 9359, 9379, 9399, 9419, 9439, 9459, 9479, 9499, 9519, 9539, 9560, 9581, 9602, 9623, 9644, 9665, 9686, 9707, 9728, 9749, 9770, 9791, 9812, 9833, 9854, + 9875, 9896, 9917, 9938, 9959, 9980, 10001, 10023, 10045, 10067, 10089, 10111, 10133, 10155, 10177, 10199, 10221, 10243, 10265, 10287, 10309, 10331, 10353, + 10375, 10397, 10419, 10441, 10463, 10485, 10508, 10531, 10554, 10577, 10600, 10623, 10646, 10669, 10692, 10715, 10738, 10761, 10784, 10807, 10830, 10853, + 10876, 10899, 10922, 10945, 10969, 10993, 11017, 11041, 11065, 11089, 11113, 11137, 11161, 11185, 11209, 11233, 11257, 11281, 11305, 11329, 11353, 11377, + 11401, 11426, 11451, 11476, 11501, 11526, 11551, 11576, 11601, 11626, 11651, 11676, 11701, 11726, 11751, 11776, 11801, 11826, 11851, 11876, 11902, 11928, + 11954, 11980, 12006, 12032, 12058, 12084, 12110, 12136, 12162, 12188, 12214, 12240, 12266, 12292, 12318, 12344, 12371, 12398, 12425, 12452, 12479, 12506, + 12533, 12560, 12587, 12614, 12641, 12668, 12695, 12722, 12749, 12776, 12803, 12831, 12859, 12887, 12915, 12943, 12971, 12999, 13027, 13055, 13083, 13111, + 13139, 13167, 13195, 13223, 13251, 13279, 13308, 13337, 13366, 13395, 13424, 13453, 13482, 13511, 13540, 13569, 13598, 13627, 13656, 13685, 13714, 13743, + 13773, 13803, 13833, 13863, 13893, 13923, 13953, 13983, 14013, 14043, 14073, 14103, 14133, 14163, 14193, 14224, 14255, 14286, 14317, 14348, 14379, 14410, + 14441, 14472, 14503, 14534, 14565, 14596, 14627, 14658, 14690, 14722, 14754, 14786, 14818, 14850, 14882, 14914, 14946, 14978, 15010, 15042, 15074, 15106, + 15138, 15171, 15204, 15237, 15270, 15303, 15336, 15369, 15402, 15435, 15468, 15501, 15534, 15567, 15600, 15634, 15668, 15702, 15736, 15770, 15804, 15838, + 15872, 15906, 15940, 15974, 16008, 16042, 16076, 16111, 16146, 16181, 16216, 16251, 16286, 16321, 16356, 16391, 16426, 16461, 16496, 16531, 16567, 16603, + 16639, 16675, 16711, 16747, 16783, 16819, 16855, 16891, 16927, 16963, 16999, 17036, 17073, 17110, 17147, 17184, 17221, 17258, 17295, 17332, 17369, 17406, + 17443, 17481, 17519, 17557, 17595, 17633, 17671, 17709, 17747, 17785, 17823, 17861, 17899, 17937, 17976, 18015, 18054, 18093, 18132, 18171, 18210, 18249, + 18288, 18327, 18366, 18405, 18445, 18485, 18525, 18565, 18605, 18645, 18685, 18725, 18765, 18805, 18845, 18886, 18927, 18968, 19009, 19050, 19091, 19132, + 19173, 19214, 19255, 19296, 19337, 19379, 19421, 19463, 19505, 19547, 19589, 19631, 19673, 19715, 19757, 19799, 19842, 19885, 19928, 19971, 20014, 20057, + 20100, 20143, 20186, 20229, 20272, 20316, 20360, 20404, 20448, 20492, 20536, 20580, 20624, 20668, 20712, 20757, 20802, 20847, 20892, 20937, 20982, 21027, + 21072, 21117, 21162, 21208, 21254, 21300, 21346, 21392, 21438, 21484, 21530, 21576, 21622, 21668, 21715, 21762, 21809, 21856, 21903, 21950, 21997, 22044, + 22091, 22139, 22187, 22235, 22283, 22331, 22379, 22427, 22475, 22523, 22571, 22620, 22669, 22718, 22767, 22816, 22865, 22914, 22963, 23012, 23061, 23111, + 23161, 23211, 23261, 23311, 23361, 23411, 23461, 23511, 23562, 23613, 23664, 23715, 23766, 23817, 23868, 23919, 23970, 24022, 24074, 24126, 24178, 24230, + 24282, 24334, 24386, 24438, 24491, 24544, 24597, 24650, 24703, 24756, 24809, 24862, 24915, 24969, 25023, 25077, 25131, 25185, 25239, 25293, 25347, 25402, + 25457, 25512, 25567, 25622, 25677, 25732, 25787, 25842, 25898, 25954, 26010, 26066, 26122, 26178, 26234, 26290, 26347, 26404, 26461, 26518, 26575, 26632, + 26689, 26746, 26804, 26862, 26920, 26978, 27036, 27094, 27152, 27210, 27269, 27328, 27387, 27446, 27505, 27564, 27623, 27682, 27742, 27802, 27862, 27922, + 27982, 28042, 28102, 28162, 28223, 28284, 28345, 28406, 28467, 28528, 28589, 28650, 28712, 28774, 28836, 28898, 28960, 29022, 29084, 29147, 29210, 29273, + 29336, 29399, 29462, 29525, 29588, 29652, 29716, 29780, 29844, 29908, 29972, 30036, 30101, 30166, 30231, 30296, 30361, 30426, 30491, 30557, 30623, 30689, + 30755, 30821, 30887, 30953, 31020, 31087, 31154, 31221, 31288, 31355, 31422, 31490, 31558, 31626, 31694, 31762, 31830, 31898, 31967, 32036, 32105, 32174, + 32243, 32312, 32381, 32451, 32521, 32591, 32661, 32731, 32801, 32872, 32943, 33014, 33085, 33156, 33227, 33298, 33370, 33442, 33514, 33586, 33658, 33730, + 33803, 33876, 33949, 34022, 34095, 34168, 34241, 34315, 34389, 34463, 34537, 34611, 34685, 34760, 34835, 34910, 34985, 35060, 35135, 35211, 35287, 35363, + 35439, 35515, 35591, 35668, 35745, 35822, 35899, 35976, 36053, 36131, 36209, 36287, 36365, 36443, 36521, 36600, 36679, 36758, 36837, 36916, 36995, 37075, + 37155, 37235, 37315, 37395, 37475, 37556, 37637, 37718, 37799, 37880, 37961, 38043, 38125, 38207, 38289, 38371, 38454, 38537, 38620, 38703, 38786, 38869, + 38953, 39037, 39121, 39205, 39289, 39373, 39458, 39543, 39628, 39713, 39798, 39884, 39970, 40056, 40142, 40228, 40315, 40402, 40489, 40576, 40663, 40750, + 40838, 40926, 41014, 41102, 41190, 41279, 41368, 41457, 41546, 41635, 41725, 41815, 41905, 41995, 42085, 42175, 42266, 42357, 42448, 42539, 42630, 42722, + 42814, 42906, 42998, 43090, 43183, 43276, 43369, 43462, 43555, 43649, 43743, 43837, 43931, 44025, 44120, 44215, 44310, 44405, 44500, 44596, 44692, 44788, + 44884, 44981, 45078, 45175, 45272, 45369, 45467, 45565, 45663, 45761, 45859, 45958, 46057, 46156, 46255, 46354, 46454, 46554, 46654, 46754, 46855, 46956, + 47057, 47158, 47259, 47361, 47463, 47565, 47667, 47770, 47873, 47976, 48079, 48182, 48286, 48390, 48494, 48598, 48703, 48808, 48913, 49018, 49123, 49229, + 49335, 49441, 49547, 49654, 49761, 49868, 49975, 50082, 50190, 50298, 50406, 50514, 50623, 50732, 50841, 50950, 51060, 51170, 51280, 51390, 51501, 51612, + 51723, 51834, 51945, 52057, 52169, 52281, 52393, 52506, 52619, 52732, 52845, 52959, 53073, 53187, 53301, 53416, 53531, 53646, 53761, 53877, 53993, 54109, + 54225, 54342, 54459, 54576, 54693, 54811, 54929, 55047, 55165, 55284, 55403, 55522, 55641, 55761, 55881, 56001, 56121, 56242, 56363, 56484, 56605, 56727, + 56849, 56971, 57094, 57217, 57340, 57463, 57587, 57711, 57835, 57959, 58084, 58209, 58334, 58459, 58585, 58711, 58837, 58964, 59091, 59218, 59345, 59473, + 59601, 59729, 59857, 59986, 60115, 60244, 60374, 60504, 60634, 60764, 60895, 61026, 61157, 61289, 61421, 61553, 61685, 61818, 61951, 62084, 62218, 62352, + 62486, 62620, 62755, 62890, 63025, 63161, 63297, 63433, 63569, 63706, 63843, 63980, 64118, 64256, 64394, 64532, 64671, 64810, 64949, 65089, 65229, 65369, + 65510, 65651, 65792, 65933, 66075, 66217, 66359, 66502, 66645, 66788, 66932, 67076, 67220, 67365, 67510, 67655, 67800, 67946, 68092, 68238, 68385, 68532, + 68679, 68827, 68975, 69123, 69272, 69421, 69570, 69720, 69870, 70020, 70171, 70322, 70473, 70625, 70777, 70929, 71082, 71235, 71388, 71542, 71696, 71850, + 72005, 72160, 72315, 72471, 72627, 72783, 72940, 73097, 73254, 73412, 73570, 73728, 73887, 74046, 74205, 74365, 74525, 74685, 74846, 75007, 75168, 75330, + 75492, 75654, 75817, 75980, 76143, 76307, 76471, 76635, 76800, 76965, 77131, 77297, 77463, 77630, 77797, 77964, 78132, 78300, 78468, 78637, 78806, 78975, + 79145, 79315, 79486, 79657, 79828, 80000, 80172, 80344, 80517, 80690, 80864, 81038, 81212, 81387, 81562, 81737, 81913, 82089, 82266, 82443, 82620, 82798, + 82976, 83154, 83333, 83512, 83692, 83872, 84052, 84233, 84414, 84596, 84778, 84960, 85143, 85326, 85509, 85693, 85877, 86062, 86247, 86432, 86618, 86804, + 86991, 87178, 87365, 87553, 87741, 87930, 88119, 88308, 88498, 88688, 88879, 89070, 89262, 89454, 89646, 89839, 90032, 90226, 90420, 90614, 90809, 91004, + 91200, 91396, 91593, 91790, 91987, 92185, 92383, 92582, 92781, 92981, 93181, 93381, 93582, 93783, 93985, 94187, 94390, 94593, 94796, 95000, 95204, 95409, + 95614, 95820, 96026, 96232, 96439, 96646, 96854, 97062, 97271, 97480, 97690, 97900, 98111, 98322, 98533, 98745, 98957, 99170, 99383, 99597, 99811, 100026, + 100241, 100457, 100673, 100889, 101106, 101323, 101541, 101759, 101978, 102197, 102417, 102637, 102858, 103079, 103301, 103523, 103746, 103969, 104193, + 104417, 104642, 104867, 105093, 105319, 105545, 105772, 105999, 106227, 106455, 106684, 106913, 107143, 107373, 107604, 107835, 108067, 108299, 108532, + 108765, 108999, 109233, 109468, 109703, 109939, 110175, 110412, 110649, 110887, 111125, 111364, 111603, 111843, 112084, 112325, 112567, 112809, 113052, + 113295, 113539, 113783, 114028, 114273, 114519, 114765, 115012, 115259, 115507, 115755, 116004, 116253, 116503, 116754, 117005, 117257, 117509, 117762, + 118015, 118269, 118523, 118778, 119033, 119289, 119546, 119803, 120061, 120319, 120578, 120837, 121097, 121357, 121618, 121880, 122142, 122405, 122668, + 122932, 123196, 123461, 123726, 123992, 124259, 124526, 124794, 125062, 125331, 125601, 125871, 126142, 126413, 126685, 126957, 127230, 127504, 127778, + 128053, 128328, 128604, 128881, 129158, 129436, 129714, 129993, 130273, 130553, 130834, 131115, 131397, 131680, 131963, 132247, 132531, 132816, 133102, + 133388, 133675, 133962, 134250, 134539, 134828, 135118, 135409, 135700, 135992, 136284, 136577, 136871, 137165, 137460, 137756, 138052, 138349, 138646, + 138944, 139243, 139542, 139842, 140143, 140444, 140746, 141049, 141352, 141656, 141961, 142266, 142572, 142879, 143186, 143494, 143803, 144112, 144422, + 144733, 145044, 145356, 145669, 145982, 146296, 146611, 146926, 147242, 147559, 147876, 148194, 148513, 148832, 149152, 149473, 149794, 150116, 150439, + 150762, 151086, 151411, 151737, 152063, 152390, 152718, 153046, 153375, 153705, 154036, 154367, 154699, 155032, 155365, 155699, 156034, 156370, 156706, + 157043, 157381, 157719, 158058, 158398, 158739, 159080, 159422, 159765, 160109, 160453, 160798, 161144, 161491, 161838, 162186, 162535, 162885, 163235, + 163586, 163938, 164291, 164644, 164998, 165353, 165709, 166065, 166422, 166780, 167139, 167498, 167858, 168219, 168581, 168944, 169307, 169671, 170036, + 170402, 170768, 171135, 171503, 171872, 172242, 172612, 172983, 173355, 173728, 174102, 174476, 174851, 175227, 175604, 175982, 176360, 176739, 177119, + 177500, 177882, 178265, 178648, 179032, 179417, 179803, 180190, 180577, 180965, 181354, 181744, 182135, 182527, 182919, 183312, 183706, 184101, 184497, + 184894, 185292, 185690, 186089, 186489, 186890, 187292, 187695, 188099, 188503, 188908, 189314, 189721, 190129, 190538, 190948, 191359, 191770, 192182, + 192595, 193009, 193424, 193840, 194257, 194675, 195094, 195514, 195934, 196355, 196777, 197200, 197624, 198049, 198475, 198902, 199330, 199759, 200189, + 200619, 201050, 201482, 201915, 202349, 202784, 203220, 203657, 204095, 204534, 204974, 205415, 205857, 206300, 206744, 207189, 207635, 208081, 208528, + 208976, 209425, 209875, 210326, 210778, 211231, 211685, 212140, 212596, 213053, 213511, 213970, 214430, 214891, 215353, 215816, 216280, 216745, 217211, + 217678, 218146, 218615, 219085, 219556, 220028, 220501, 220975, 221450, 221926, 222403, 222881, 223360, 223840, 224321, 224803, 225286, 225770, 226255, + 226742, 227230, 227719, 228209, 228700, 229192, 229685, 230179, 230674, 231170, 231667, 232165, 232664, 233164, 233665, 234167, 234671, 235176, 235682, + 236189, 236697, 237206, 237716, 238227, 238739, 239252, 239766, 240282, 240799, 241317, 241836, 242356, 242877, 243399, 243922, 244447, 244973, 245500, + 246028, 246557, 247087, 247618, 248150, 248684, 249219, 249755, 250292, 250830, 251369, 251910, 252452, 252995, 253539, 254084, 254630, 255178, 255727, + 256277, 256828, 257380, 257933, 258488, 259044, 259601, 260159, 260718, 261279, 261841, 262404, 262968, 263533, 264100, 264668, 265237, 265807, 266379, + 266952, 267526, 268101, 268678, 269256, 269835, 270415, 270996, 271579, 272163, 272748, 273335, 273923, 274512, 275102, 275694, 276287, 276881, 277476, + 278073, 278671, 279270, 279871, 280473, 281076, 281680, 282286, 282893, 283501, 284111, 284722, 285334, 285948, 286563, 287179, 287797, 288416, 289036, + 289658, 290281, 290905, 291531, 292158, 292786, 293416, 294047, 294679, 295313, 295948, 296584, 297222, 297861, 298502, 299144, 299787, 300432, 301078, + 301725, 302374, 303024, 303676, 304329, 304983, 305639, 306296, 306955, 307615, 308276, 308939, 309603, 310269, 310936, 311605, 312275, 312947, 313620, + 314294, 314970, 315647, 316326, 317006, 317688, 318371, 319056, 319742, 320430, 321119, 321810, 322502, 323195, 323890, 324586, 325284, 325983, 326684, + 327386, 328090, 328796, 329503, 330212, 330922, 331634, 332347, 333062, 333778, 334496, 335215, 335936, 336658, 337382, 338107, 338834, 339563, 340293, + 341025, 341758, 342493, 343229, 343967, 344707, 345448, 346191, 346935, 347681, 348429, 349178, 349929, 350681, 351435, 352191, 352948, 353707, 354468, + 355230, 355994, 356760, 357527, 358296, 359066, 359838, 360612, 361387, 362164, 362943, 363723, 364505, 365289, 366074, 366861, 367650, 368441, 369233, + 370027, 370823, 371620, 372419, 373220, 374023, 374827, 375633, 376441, 377250, 378061, 378874, 379689, 380505, 381323, 382143, 382965, 383789, 384614, + 385441, 386270, 387101, 387933, 388767, 389603, 390441, 391281, 392122, 392965, 393810, 394657, 395506, 396356, 397208, 398062, 398918, 399776, 400636, + 401498, 402361, 403226, 404093, 404962, 405833, 406706, 407581, 408457, 409335, 410215, 411097, 411981, 412867, 413755, 414645, 415537, 416431, 417326, + 418223, 419122, 420023, 420926, 421831, 422738, 423647, 424558, 425471, 426386, 427303, 428222, 429143, 430066, 430991, 431918, 432847, 433778, 434711, + 435646, 436583, 437522, 438463, 439406, 440351, 441298, 442247, 443198, 444151, 445106, 446063, 447022, 447983, 448946, 449911, 450878, 451848, 452820, + 453794, 454770, 455748, 456728, 457710, 458694, 459680, 460668, 461659, 462652, 463647, 464644, 465643, 466644, 467647, 468653, 469661, 470671, 471683, + 472697, 473713, 474732, 475753, 476776, 477801, 478828, 479858, 480890, 481924, 482960, 483999, 485040, 486083, 487128, 488175, 489225, 490277, 491331, + 492388, 493447, 494508, 495571, 496637, 497705, 498775, 499848, 500923, 502000, 503079, 504161, 505245, 506331, 507420, 508511, 509604, 510700, 511798, + 512899, 514002, 515107, 516215, 517325, 518437, 519552, 520669, 521789, 522911, 524035, 525162, 526291, 527423, 528557, 529694, 530833, 531974, 533118, + 534264, 535413, 536564, 537718, 538874, 540033, 541194, 542358, 543524, 544693, 545864, 547038, 548214, 549393, 550574, 551758, 552944, 554133, 555325, + 556519, 557716, 558915, 560117, 561321, 562528, 563738, 564950, 566165, 567382, 568602, 569825, 571050, 572278, 573509, 574742, 575978, 577217, 578458, + 579702, 580949, 582198, 583450, 584705, 585962, 587222, 588485, 589750, 591018, 592289, 593563, 594839, 596118, 597400, 598685, 599972, 601262, 602555, + 603851, 605149, 606450, 607754, 609061, 610371, 611684, 612999, 614317, 615638, 616962, 618289, 619619, 620951, 622286, 623624, 624965, 626309, 627656, + 629006, 630359, 631714, 633072, 634433, 635797, 637164, 638534, 639907, 641283, 642662, 644044, 645429, 646817, 648208, 649602, 650999, 652399, 653802, + 655208, 656617, 658029, 659444, 660862, 662283, 663707, 665134, 666564, 667997, 669433, 670873, 672316, 673762, 675211, 676663, 678118, 679576, 681037, + 682501, 683969, 685440, 686914, 688391, 689871, 691354, 692841, 694331, 695824, 697320, 698819, 700322, 701828, 703337, 704849, 706365, 707884, 709406, + 710931, 712460, 713992, 715527, 717066, 718608, 720153, 721702, 723254, 724809, 726368, 727930, 729495, 731064, 732636, 734211, 735790, 737372, 738958, + 740547, 742139, 743735, 745334, 746937, 748543, 750153, 751766, 753383, 755003, 756627, 758254, 759885, 761519, 763157, 764798, 766443, 768091, 769743, + 771398, 773057, 774719, 776385, 778054, 779727, 781404, 783084, 784768, 786456, 788147, 789842, 791540, 793242, 794948, 796657, 798370, 800087, 801807, + 803531, 805259, 806991, 808726, 810465, 812208, 813955, 815705, 817459, 819217, 820979, 822744, 824513, 826286, 828063, 829844, 831628, 833416, 835208, + 837004, 838804, 840608, 842416, 844227, 846042, 847861, 849684, 851511, 853342, 855177, 857016, 858859, 860706, 862557, 864412, 866271, 868134, 870001, + 871872, 873747, 875626, 877509, 879396, 881287, 883182, 885081, 886984, 888891, 890802, 892718, 894638, 896562, 898490, 900422, 902358, 904298, 906243, + 908192, 910145, 912102, 914063, 916029, 917999, 919973, 921951, 923934, 925921, 927912, 929907, 931907, 933911, 935919, 937932, 939949, 941970, 943996, + 946026, 948060, 950099, 952142, 954189, 956241, 958297, 960358, 962423, 964493, 966567, 968645, 970728, 972815, 974907, 977003, 979104, 981209, 983319, + 985433, 987552, 989676, 991804, 993937, 996074, 998216, 1000363, 1002514, 1004670, 1006830, 1008995, 1011165, 1013339, 1015518, 1017702, 1019890, 1022083, + 1024281, 1026484, 1028691, 1030903, 1033120, 1035342, 1037568, 1039799, 1042035, 1044276, 1046522, 1048772, +]; diff --git a/packages/metrics/src/opentel/mod.rs b/packages/metrics/src/opentel/mod.rs new file mode 100644 index 00000000..d19eb4bb --- /dev/null +++ b/packages/metrics/src/opentel/mod.rs @@ -0,0 +1,5 @@ +pub mod aggregation_selector; +pub mod buckets; +pub mod model; +pub mod printer; +pub mod scaling; diff --git a/packages/metrics/src/opentel/model.rs b/packages/metrics/src/opentel/model.rs new file mode 100644 index 00000000..759602b3 --- /dev/null +++ b/packages/metrics/src/opentel/model.rs @@ -0,0 +1,175 @@ +use std::time::Duration; + +use serde::Deserialize; + +/// +/// Data structures in this module are 99% compatible with opentelemetry JSON schema. +/// We just did a tiny rename where it made sense from consumer point of view. +/// This package was required to introduce a typed metrics model to our app. The Rust structs of opentelemetry model are +/// not exposed to public, they are only available as exported JSON (via serde). This data model is representation of that +/// JSON schema. +/// +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct MetricsDataContainer { + pub resource_metrics: ResourceMetrics, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ResourceMetrics { + pub resource: Resource, + pub scope_metrics: Vec, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Resource { + pub attributes: Vec, +} + +#[derive(Deserialize, Eq, PartialEq, Debug)] +#[serde(rename_all = "camelCase")] +pub struct Attribute { + pub key: String, + pub value: Option, +} + +#[derive(Deserialize, Eq, PartialEq, Debug)] +#[serde(rename_all = "camelCase")] +pub struct StringValue { + #[serde(alias = "stringValue")] + pub value: String, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ScopeMetrics { + pub scope: Scope, + pub metrics: Vec, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Scope { + pub name: String, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Metric { + pub name: String, + pub unit: String, + pub histogram: Option, + #[serde(alias = "sum")] + pub count: Option, +} + +impl Metric { + pub fn is_time_unit(&self) -> bool { + self.unit == "millis" + || self.unit == "ms" + || self.unit == "sec" + || self.unit == "s" + || self.unit == "ns" + || self.unit == "mcs" + || self.unit == "μs" + || self.unit == "h" + || self.unit == "hr" + || self.unit == "hrs" + } + + pub fn to_seconds(&self, value: f64) -> Option { + if !self.is_time_unit() { + None + } else if self.unit == "hrs" || self.unit == "hr" || self.unit == "h" { + Some(value * 60_f64) + } else if self.unit == "millis" || self.unit == "ms" { + Some(value / 1_000_f64) + } else if self.unit == "μs" || self.unit == "mcs" { + Some(value / 1_000_000_f64) + } else if self.unit == "ns" { + Some(value / 1_000_000_000_f64) + } else { + Some(value) + } + } +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Counter { + pub value: u64, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CountContainer { + pub data_points: Vec, +} + +impl CountContainer { + pub fn counter(&self) -> &Counter { + &self.data_points[0] + } +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct HistogramContainer { + pub data_points: Vec, +} + +impl HistogramContainer { + pub fn histogram(&self) -> &Histogram { + &self.data_points[0] + } +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Histogram { + pub count: u64, + pub min: f64, + pub max: f64, + pub sum: f64, + pub start_time_unix_nano: u64, + pub time_unix_nano: u64, + #[serde(alias = "explicitBounds")] + pub buckets: Vec, + #[serde(alias = "bucketCounts")] + pub counts: Vec, +} + +impl Histogram { + pub fn started(&self) -> Duration { + Duration::from_nanos(self.start_time_unix_nano) + } + pub fn time(&self) -> Duration { + Duration::from_nanos(self.time_unix_nano) + } + pub fn compact(&self) -> Vec { + let mut hist: Vec = Vec::new(); + let mut sum = 0_u64; + for (i, label) in self.buckets.iter().enumerate() { + let count = self.counts[i]; + sum += count; + let row = BucketData { + label: *label, + count, + sum, + percentage: (100.0 * sum as f64) / self.count as f64, + }; + hist.push(row); + } + + hist + } +} + +pub struct BucketData { + pub label: f64, + pub count: u64, + pub sum: u64, + pub percentage: f64, +} diff --git a/packages/metrics/src/opentel/printer.rs b/packages/metrics/src/opentel/printer.rs new file mode 100644 index 00000000..86104c15 --- /dev/null +++ b/packages/metrics/src/opentel/printer.rs @@ -0,0 +1,173 @@ +use std::collections::HashSet; + +use opentelemetry_stdout::MetricsData; + +use super::{ + model::{Attribute, MetricsDataContainer}, + scaling::ScalingConfig, +}; + +pub struct MetricsToStringPrinter { + pub print_raw_histogram: bool, + parallel_threads: u64, + resource_filter: Option>, + scope_filter: Option>, + metrics_filter: Option>, + scaling_config: ScalingConfig, +} + +/// Prints metrics as formatted text +impl MetricsToStringPrinter { + pub fn new(threads: u64, print_raw_histogram: bool, scaling_config: ScalingConfig) -> Self { + Self { + print_raw_histogram, + parallel_threads: threads, + resource_filter: None, + scope_filter: None, + metrics_filter: None, + scaling_config, + } + } + + pub fn new_with_filters( + threads: u64, + resource_filter: Option>, + scope_filter: Option>, + metrics_filter: Option>, + scaling_config: ScalingConfig, + ) -> Self { + Self { + print_raw_histogram: false, + parallel_threads: threads, + resource_filter, + scope_filter, + metrics_filter, + scaling_config, + } + } + + #[allow(clippy::single_char_add_str)] + pub fn print(&self, metrics: &MetricsData) -> Result { + let mut out: String = "".to_owned(); + let serde_value = serde_json::to_value(metrics).map_err(|e| e.to_string())?; + let container = serde_json::from_value::(serde_value).map_err(|e| e.to_string())?; + let percentile_labels = vec![25.0, 50.0, 75.0, 90.0, 95.0, 98.0, 99.0, 99.9, 99.99, 100.0]; + + if let Some(ref filter) = self.resource_filter { + let mut filter_passed = filter.is_empty(); + for attr in filter.iter() { + if container.resource_metrics.resource.attributes.iter().any(|a| a.eq(attr)) { + filter_passed = true; + break; + } + } + if !filter_passed { + return Ok("".into()); + } + } + + for scope_metrics in container.resource_metrics.scope_metrics.iter() { + if let Some(ref filter) = self.scope_filter { + let filter_passed = filter.is_empty() || filter.contains(scope_metrics.scope.name.as_str()); + if !filter_passed { + continue; + } + } + + for metric in scope_metrics.metrics.iter() { + // apply metric name filter + if let Some(ref filter) = self.metrics_filter { + let filter_passed = filter.is_empty() || filter.contains(metric.name.as_str()); + if !filter_passed { + continue; + } + } + + if let Some(hist_c) = &metric.histogram { + let hist = hist_c.histogram(); + + if metric.name == "metric_throughput" { + // this is special case metric used to track total time span of the test + out.push_str("\n---------------------------------------"); + out.push_str(format!("\n{:>11} : {}", "Metric name", "throughput").as_str()); + + let dur_sec = metric.to_seconds(hist.max - hist.min).unwrap(); + let value = (hist.count as f64 / 2_f64) / dur_sec; + out.push_str(format!("\n{:>11} : {:.2} TPS", "Value", value).as_str()); + if !self.print_raw_histogram { + continue; + } + } + + let buckets = hist.compact(); + let scale_factor = self.scaling_config.get_scale_factor(metric.name.as_str()); + + out.push_str("\n---------------------------------------"); + out.push_str(format!("\nHistogram: \"{}\"\n", metric.name).as_str()); + + if self.print_raw_histogram { + for bucket in buckets.iter() { + if bucket.count == 0 { + continue; + } + out.push_str( + format!( + "\n{:>5.0}: {:>7} | {:>7} | {:>5.2}%", + bucket.label / scale_factor as f64, + bucket.count, + bucket.sum, + bucket.percentage + ) + .as_str(), + ); + } + } + + let mut bucket_index = 0_usize; + for percentile_label in percentile_labels.iter() { + for bucket in buckets.iter().skip(bucket_index) { + bucket_index += 1; + if *percentile_label <= bucket.percentage { + bucket_index -= 1; + let scaled_label = bucket.label / scale_factor as f64; + out.push_str( + format!( + "\n{:>6.2}% | {:>7.2} {} | samples: {:>7}", + percentile_label, scaled_label, metric.unit, bucket.sum + ) + .as_str(), + ); + break; + } + } + } + + out.push_str("\n"); + out.push_str(format!("\n{:>10} : {:.2} {}", "Min", hist.min / scale_factor as f64, metric.unit).as_str()); + out.push_str(format!("\n{:>10} : {:.2} {}", "Max", hist.max / scale_factor as f64, metric.unit).as_str()); + out.push_str(format!("\n{:>10} : {}", "Count", hist.count).as_str()); + if metric.is_time_unit() { + if let Some(duration_sec) = metric.to_seconds(hist.sum / self.parallel_threads as f64 / scale_factor as f64) { + out.push_str(format!("\n{:>10} : {:.2} sec (avg per thread)", "Duration", duration_sec).as_str()); + out.push_str(format!("\n{:>10} : {:.2} tps (avg per thread)", "Throughput", hist.count as f64 / duration_sec).as_str()); + } else { + out.push_str(format!("\n{:>10} : ERROR. Unable to determine time unit from this: '{}')", "Duration", metric.unit).as_str()); + out.push_str("\nThroughput : N/A"); + } + } else { + out.push_str(format!("\n{:>10} : {:.2} {}", "Sum", hist.sum, metric.unit).as_str()); + } + } else if let Some(count_container) = &metric.count { + out.push_str("\n---------------------------------------"); + let counter = count_container.counter(); + out.push_str(format!("\n{:>11} : {}", "Metric name", metric.name).as_str()); + out.push_str(format!("\n{:>11} : {}", "Value", counter.value).as_str()); + } else { + out.push_str("\nNo data"); + } + } + } + + Ok(out) + } +} diff --git a/packages/metrics/src/opentel/scaling.rs b/packages/metrics/src/opentel/scaling.rs new file mode 100644 index 00000000..d4568a5f --- /dev/null +++ b/packages/metrics/src/opentel/scaling.rs @@ -0,0 +1,13 @@ +use std::collections::HashMap; + +/// The config for metrics scaling as map where metric name is mapped to its scaling factor. +#[derive(Default)] +pub struct ScalingConfig { + pub ratios: HashMap, +} + +impl ScalingConfig { + pub fn get_scale_factor(&self, metric_name: &str) -> f32 { + *self.ratios.get(metric_name).unwrap_or(&1_f32) + } +} diff --git a/packages/talos_agent/src/agent/core.rs b/packages/talos_agent/src/agent/core.rs index 2ebe7fbf..638b02a8 100644 --- a/packages/talos_agent/src/agent/core.rs +++ b/packages/talos_agent/src/agent/core.rs @@ -279,6 +279,7 @@ mod tests { decision: Committed, version: 1, safepoint: None, + conflict: None, }) }); (tx_response, rx_response) diff --git a/packages/talos_agent/src/agent/decision_reader.rs b/packages/talos_agent/src/agent/decision_reader.rs index b6c10860..1582f06e 100644 --- a/packages/talos_agent/src/agent/decision_reader.rs +++ b/packages/talos_agent/src/agent/decision_reader.rs @@ -211,6 +211,7 @@ mod tests { suffix_start: 0_u64, version: 0_u64, safepoint: None, + conflicts: None, metrics: None, } } diff --git a/packages/talos_agent/src/agent/state_manager.rs b/packages/talos_agent/src/agent/state_manager.rs index 5b746cf5..960271d5 100644 --- a/packages/talos_agent/src/agent/state_manager.rs +++ b/packages/talos_agent/src/agent/state_manager.rs @@ -180,6 +180,16 @@ impl + 'static> StateManager { decision: message.decision.clone(), version: message.version, safepoint: message.safepoint, + conflict: match message.conflicts { + None => None, + Some(ref list) => { + if list.is_empty() { + None + } else { + Some(list[0].clone()) + } + } + }, }; let error_message = format!( @@ -234,6 +244,7 @@ mod tests_waiting_client { decision: Committed, version: 1, safepoint: None, + conflict: None, }; let client = WaitingClient::new(Arc::new(Box::new(sender))); @@ -250,6 +261,7 @@ mod tests_waiting_client { decision: Committed, version: 1, safepoint: None, + conflict: None, }; let response = response_sample.clone(); @@ -527,6 +539,7 @@ mod tests { suffix_start: 2, version: 2, safepoint: None, + conflicts: None, metrics: None, }; @@ -554,6 +567,7 @@ mod tests { suffix_start: 2, version: 2, safepoint: None, + conflicts: None, metrics: None, }; @@ -623,6 +637,7 @@ mod tests { suffix_start: 2, version: 2, safepoint: None, + conflicts: None, metrics: Some(TxProcessingTimeline { candidate_published: candidate_time_at as i128, candidate_received: candidate_received_at as i128, diff --git a/packages/talos_agent/src/api.rs b/packages/talos_agent/src/api.rs index 82515eab..f72a9e71 100644 --- a/packages/talos_agent/src/api.rs +++ b/packages/talos_agent/src/api.rs @@ -1,5 +1,5 @@ use crate::agent::errors::AgentError; -use crate::messaging::api::Decision; +use crate::messaging::api::{ConflictMessage, Decision}; use crate::metrics::model::MetricsReport; use async_trait::async_trait; use rdkafka::config::RDKafkaLogLevel; @@ -39,6 +39,7 @@ pub struct CertificationResponse { pub decision: Decision, pub version: u64, pub safepoint: Option, + pub conflict: Option, } #[derive(Clone, Debug)] diff --git a/packages/talos_agent/src/messaging/api.rs b/packages/talos_agent/src/messaging/api.rs index 29cc5082..a4dda59d 100644 --- a/packages/talos_agent/src/messaging/api.rs +++ b/packages/talos_agent/src/messaging/api.rs @@ -54,8 +54,17 @@ pub enum Decision { Aborted, } +#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] +pub struct ConflictMessage { + pub xid: String, + pub version: u64, + pub readset: Vec, + pub readvers: Vec, + pub writeset: Vec, +} + /// Kafka message which will be published to Talos queue -#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] #[serde(rename_all = "camelCase", tag = "_typ")] pub struct DecisionMessage { pub xid: String, @@ -67,9 +76,8 @@ pub struct DecisionMessage { #[serde(skip_deserializing)] pub suffix_start: u64, pub version: u64, - #[serde(skip_serializing_if = "Option::is_none")] pub safepoint: Option, - #[serde(skip_serializing_if = "Option::is_none")] + pub conflicts: Option>, pub metrics: Option, } diff --git a/packages/talos_agent/src/messaging/kafka.rs b/packages/talos_agent/src/messaging/kafka.rs index aa5c996b..dda3d119 100644 --- a/packages/talos_agent/src/messaging/kafka.rs +++ b/packages/talos_agent/src/messaging/kafka.rs @@ -299,6 +299,7 @@ fn parse_payload_as_candidate(raw_payload: &Result<&str, Utf8Error>, decision: D suffix_start: 0, version: 0, safepoint: None, + conflicts: None, metrics: Some(metrics), }) }