diff --git a/src/consumer_groups/register.rs b/src/consumer_groups/register.rs index d897e4b..9c4413d 100644 --- a/src/consumer_groups/register.rs +++ b/src/consumer_groups/register.rs @@ -274,5 +274,222 @@ impl Awaitable for ConsumerGroupsRegister { #[cfg(test)] mod test { - // TODO... + use std::{ + collections::{HashMap, HashSet}, + future::Future, + sync::Arc, + }; + + use prometheus::Registry; + use tokio::{ + sync::mpsc, + time::{sleep, Duration, Instant}, + }; + + use crate::consumer_groups::{emitter::ConsumerGroups, ConsumerGroupsRegister}; + use crate::kafka_types::{Group, GroupWithMembers, Member, MemberWithAssignment}; + + fn make_group_with_members( + g_name: &str, + m_prefix: &str, + count: usize, + ) -> (String, GroupWithMembers) { + let mut gwm = GroupWithMembers { + group: Group { + name: format!("{}", g_name), + state: format!("{}-state", g_name), + protocol: format!("{}-protocol", g_name), + protocol_type: format!("{}-protocol_type", g_name), + }, + members: HashMap::with_capacity(count), + }; + + for i in 0..count { + let m_id = format!("{}{}{}", g_name, m_prefix, i); + gwm.members.insert( + m_id.clone(), + MemberWithAssignment { + member: Member { + client_id: format!("{}-client_id", m_id), + client_host: format!("{}-client_host", m_id), + id: m_id, + }, + assignment: HashSet::new(), + }, + ); + } + + (g_name.to_string(), gwm) + } + + const BLOCK_ON_CONDITION_CHECK_FREQ: Duration = Duration::from_millis(10); + + async fn block_on(f: F, timeout: Duration) + where + F: Fn() -> Fut, + Fut: Future, + { + let start = Instant::now(); + loop { + if f().await { + break; + } + + assert!( + Instant::now().duration_since(start) < timeout, + "Timed out waiting on desired condition" + ); + sleep(BLOCK_ON_CONDITION_CHECK_FREQ).await; + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn should_forget_after() { + let (sx, rx) = mpsc::channel::(10); + let forget_after = Duration::from_secs(3); + let metrics_reg_arc = Arc::new(Registry::new()); + + let reg_under_test = ConsumerGroupsRegister::new(rx, forget_after, metrics_reg_arc); + + // Start with 3 groups + sx.send(ConsumerGroups { + groups: HashMap::from([ + make_group_with_members("g1", "m", 10), + make_group_with_members("g2", "m", 5), + make_group_with_members("g3", "m", 2), + ]), + }) + .await + .unwrap(); + + // Wait for 3 groups to be visible when querying Register + block_on(|| async { reg_under_test.get_groups_count().await == 3 }, Duration::from_secs(5)) + .await; + assert!(reg_under_test.contains_group("g1").await); + assert!(reg_under_test.contains_group("g2").await); + assert!(reg_under_test.contains_group("g3").await); + + // A new group is detected + sx.send(ConsumerGroups { + groups: HashMap::from([ + make_group_with_members("g1", "m", 10), + make_group_with_members("g2", "m", 5), + make_group_with_members("g3", "m", 2), + make_group_with_members("g4", "m", 15), + ]), + }) + .await + .unwrap(); + + // Confirm new group is visible + block_on(|| async { reg_under_test.get_groups_count().await == 4 }, Duration::from_secs(5)) + .await; + assert!(reg_under_test.contains_group("g4").await); + + // Wait 2 seconds, then remove 2 groups + sleep(Duration::from_secs(2)).await; + sx.send(ConsumerGroups { + groups: HashMap::from([ + make_group_with_members("g1", "m", 10), + make_group_with_members("g3", "m", 2), + ]), + }) + .await + .unwrap(); + + // Confirm all 4 groups are still visible, for a little while longer + block_on(|| async { reg_under_test.get_groups_count().await == 4 }, Duration::from_secs(5)) + .await; + assert!(reg_under_test.contains_group("g1").await); + assert!(reg_under_test.contains_group("g2").await); + assert!(reg_under_test.contains_group("g3").await); + assert!(reg_under_test.contains_group("g4").await); + + // Confirm that, 2 groups removed earlier are now finally gone + block_on( + || async { reg_under_test.get_groups_count().await == 2 }, + Duration::from_secs(20), + ) + .await; + assert!(reg_under_test.contains_group("g1").await); + assert!(reg_under_test.contains_group("g3").await); + } + + #[tokio::test(flavor = "multi_thread")] + async fn should_track_hash() { + let (sx, rx) = mpsc::channel::(10); + let forget_after = Duration::from_secs(3); + let metrics_reg_arc = Arc::new(Registry::new()); + + let reg = ConsumerGroupsRegister::new(rx, forget_after, metrics_reg_arc); + + // Start with no groups and grab the "empty" hash + sx.send(ConsumerGroups { + groups: HashMap::new(), + }) + .await + .unwrap(); + let initial_hash = reg.get_hash().await; + let mut curr_hash = initial_hash; + + // Start with 1 group + sx.send(ConsumerGroups { + groups: HashMap::from([make_group_with_members("g1", "m", 10)]), + }) + .await + .unwrap(); + block_on(|| async { reg.get_groups_count().await == 1 }, Duration::from_secs(5)).await; + let mut new_hash = reg.get_hash().await; + assert_ne!(curr_hash, new_hash); + curr_hash = new_hash; + + // Unchanged + sx.send(ConsumerGroups { + groups: HashMap::from([make_group_with_members("g1", "m", 10)]), + }) + .await + .unwrap(); + block_on(|| async { reg.get_groups_count().await == 1 }, Duration::from_secs(5)).await; + new_hash = reg.get_hash().await; + assert_eq!(curr_hash, new_hash); + + // Add a group + sx.send(ConsumerGroups { + groups: HashMap::from([ + make_group_with_members("g1", "m", 10), + make_group_with_members("g2", "m", 20), + ]), + }) + .await + .unwrap(); + block_on(|| async { reg.get_groups_count().await == 2 }, Duration::from_secs(5)).await; + new_hash = reg.get_hash().await; + assert_ne!(curr_hash, new_hash); + curr_hash = new_hash; + + // Wait 2 seconds, then remove 1 groups + sleep(Duration::from_secs(2)).await; + sx.send(ConsumerGroups { + groups: HashMap::from([make_group_with_members("g2", "m", 20)]), + }) + .await + .unwrap(); + + // Confirm 2 groups are still visible, for a little while longer, and hash hasn't yet changed + block_on(|| async { reg.get_groups_count().await == 2 }, Duration::from_secs(5)).await; + new_hash = reg.get_hash().await; + assert_eq!(curr_hash, new_hash); + + // Confirm that, eventually, only 1 group is left and hash gets updated + block_on(|| async { reg.get_groups_count().await == 1 }, Duration::from_secs(20)).await; + new_hash = reg.get_hash().await; + assert_ne!(curr_hash, new_hash); + curr_hash = new_hash; + + // Confirm that, eventually, all groups are removed and the hash goes back to the "empty" one + block_on(|| async { reg.get_groups_count().await == 0 }, Duration::from_secs(20)).await; + new_hash = reg.get_hash().await; + assert_ne!(curr_hash, new_hash); + assert_eq!(new_hash, initial_hash); + } }