Skip to content

Commit

Permalink
Provide ConsumerGroupsRegister with tests for key functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
detro committed Feb 19, 2024
1 parent 7d4c8ae commit 8c589e1
Showing 1 changed file with 218 additions and 1 deletion.
219 changes: 218 additions & 1 deletion src/consumer_groups/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,5 +274,222 @@ impl Awaitable for ConsumerGroupsRegister {

#[cfg(test)]
mod test {
// TODO...
use std::{
collections::{HashMap, HashSet},
future::Future,
sync::Arc,
};

use prometheus::Registry;
use tokio::{
sync::mpsc,
time::{sleep, Duration, Instant},
};

use crate::consumer_groups::{emitter::ConsumerGroups, ConsumerGroupsRegister};
use crate::kafka_types::{Group, GroupWithMembers, Member, MemberWithAssignment};

fn make_group_with_members(
g_name: &str,
m_prefix: &str,
count: usize,
) -> (String, GroupWithMembers) {
let mut gwm = GroupWithMembers {
group: Group {
name: format!("{}", g_name),
state: format!("{}-state", g_name),
protocol: format!("{}-protocol", g_name),
protocol_type: format!("{}-protocol_type", g_name),
},
members: HashMap::with_capacity(count),
};

for i in 0..count {
let m_id = format!("{}{}{}", g_name, m_prefix, i);
gwm.members.insert(
m_id.clone(),
MemberWithAssignment {
member: Member {
client_id: format!("{}-client_id", m_id),
client_host: format!("{}-client_host", m_id),
id: m_id,
},
assignment: HashSet::new(),
},
);
}

(g_name.to_string(), gwm)
}

const BLOCK_ON_CONDITION_CHECK_FREQ: Duration = Duration::from_millis(10);

async fn block_on<F, Fut>(f: F, timeout: Duration)
where
F: Fn() -> Fut,
Fut: Future<Output = bool>,
{
let start = Instant::now();
loop {
if f().await {
break;
}

assert!(
Instant::now().duration_since(start) < timeout,
"Timed out waiting on desired condition"
);
sleep(BLOCK_ON_CONDITION_CHECK_FREQ).await;
}
}

#[tokio::test(flavor = "multi_thread")]
async fn should_forget_after() {
let (sx, rx) = mpsc::channel::<ConsumerGroups>(10);
let forget_after = Duration::from_secs(3);
let metrics_reg_arc = Arc::new(Registry::new());

let reg_under_test = ConsumerGroupsRegister::new(rx, forget_after, metrics_reg_arc);

// Start with 3 groups
sx.send(ConsumerGroups {
groups: HashMap::from([
make_group_with_members("g1", "m", 10),
make_group_with_members("g2", "m", 5),
make_group_with_members("g3", "m", 2),
]),
})
.await
.unwrap();

// Wait for 3 groups to be visible when querying Register
block_on(|| async { reg_under_test.get_groups_count().await == 3 }, Duration::from_secs(5))
.await;
assert!(reg_under_test.contains_group("g1").await);
assert!(reg_under_test.contains_group("g2").await);
assert!(reg_under_test.contains_group("g3").await);

// A new group is detected
sx.send(ConsumerGroups {
groups: HashMap::from([
make_group_with_members("g1", "m", 10),
make_group_with_members("g2", "m", 5),
make_group_with_members("g3", "m", 2),
make_group_with_members("g4", "m", 15),
]),
})
.await
.unwrap();

// Confirm new group is visible
block_on(|| async { reg_under_test.get_groups_count().await == 4 }, Duration::from_secs(5))
.await;
assert!(reg_under_test.contains_group("g4").await);

// Wait 2 seconds, then remove 2 groups
sleep(Duration::from_secs(2)).await;
sx.send(ConsumerGroups {
groups: HashMap::from([
make_group_with_members("g1", "m", 10),
make_group_with_members("g3", "m", 2),
]),
})
.await
.unwrap();

// Confirm all 4 groups are still visible, for a little while longer
block_on(|| async { reg_under_test.get_groups_count().await == 4 }, Duration::from_secs(5))
.await;
assert!(reg_under_test.contains_group("g1").await);
assert!(reg_under_test.contains_group("g2").await);
assert!(reg_under_test.contains_group("g3").await);
assert!(reg_under_test.contains_group("g4").await);

// Confirm that, 2 groups removed earlier are now finally gone
block_on(
|| async { reg_under_test.get_groups_count().await == 2 },
Duration::from_secs(20),
)
.await;
assert!(reg_under_test.contains_group("g1").await);
assert!(reg_under_test.contains_group("g3").await);
}

#[tokio::test(flavor = "multi_thread")]
async fn should_track_hash() {
let (sx, rx) = mpsc::channel::<ConsumerGroups>(10);
let forget_after = Duration::from_secs(3);
let metrics_reg_arc = Arc::new(Registry::new());

let reg = ConsumerGroupsRegister::new(rx, forget_after, metrics_reg_arc);

// Start with no groups and grab the "empty" hash
sx.send(ConsumerGroups {
groups: HashMap::new(),
})
.await
.unwrap();
let initial_hash = reg.get_hash().await;
let mut curr_hash = initial_hash;

// Start with 1 group
sx.send(ConsumerGroups {
groups: HashMap::from([make_group_with_members("g1", "m", 10)]),
})
.await
.unwrap();
block_on(|| async { reg.get_groups_count().await == 1 }, Duration::from_secs(5)).await;
let mut new_hash = reg.get_hash().await;
assert_ne!(curr_hash, new_hash);
curr_hash = new_hash;

// Unchanged
sx.send(ConsumerGroups {
groups: HashMap::from([make_group_with_members("g1", "m", 10)]),
})
.await
.unwrap();
block_on(|| async { reg.get_groups_count().await == 1 }, Duration::from_secs(5)).await;
new_hash = reg.get_hash().await;
assert_eq!(curr_hash, new_hash);

// Add a group
sx.send(ConsumerGroups {
groups: HashMap::from([
make_group_with_members("g1", "m", 10),
make_group_with_members("g2", "m", 20),
]),
})
.await
.unwrap();
block_on(|| async { reg.get_groups_count().await == 2 }, Duration::from_secs(5)).await;
new_hash = reg.get_hash().await;
assert_ne!(curr_hash, new_hash);
curr_hash = new_hash;

// Wait 2 seconds, then remove 1 groups
sleep(Duration::from_secs(2)).await;
sx.send(ConsumerGroups {
groups: HashMap::from([make_group_with_members("g2", "m", 20)]),
})
.await
.unwrap();

// Confirm 2 groups are still visible, for a little while longer, and hash hasn't yet changed
block_on(|| async { reg.get_groups_count().await == 2 }, Duration::from_secs(5)).await;
new_hash = reg.get_hash().await;
assert_eq!(curr_hash, new_hash);

// Confirm that, eventually, only 1 group is left and hash gets updated
block_on(|| async { reg.get_groups_count().await == 1 }, Duration::from_secs(20)).await;
new_hash = reg.get_hash().await;
assert_ne!(curr_hash, new_hash);
curr_hash = new_hash;

// Confirm that, eventually, all groups are removed and the hash goes back to the "empty" one
block_on(|| async { reg.get_groups_count().await == 0 }, Duration::from_secs(20)).await;
new_hash = reg.get_hash().await;
assert_ne!(curr_hash, new_hash);
assert_eq!(new_hash, initial_hash);
}
}

0 comments on commit 8c589e1

Please sign in to comment.