Skip to content

Commit

Permalink
Fix logic that makes LagRegister forget groups
Browse files Browse the repository at this point in the history
  • Loading branch information
detro committed Feb 15, 2024
1 parent 386a24a commit ff32275
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 5 deletions.
10 changes: 6 additions & 4 deletions src/consumer_groups/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::internals::Awaitable;
use crate::kafka_types::GroupWithMembers;
use crate::prometheus_metrics::LABEL_GROUP;

const REMOVE_EXPIRED_INTERVAL: Duration = Duration::from_millis(500);
const REMOVE_EXPIRED_INTERVAL: Duration = Duration::from_secs(1);

const MET_TOT_NAME: &str = "consumer_groups_total";
const MET_TOT_HELP: &str = "Consumer groups currently in the cluster";
Expand Down Expand Up @@ -193,9 +193,11 @@ impl ConsumerGroupsRegister {
},
}

info!(
"Updated (Known) Consumer Groups: {}",
known_groups_arc_clone.read().await.map.len()
let r_guard = known_groups_arc_clone.read().await;
trace!(
"Updated (Known) Consumer Groups: {} (hash: {})",
r_guard.map.len(),
r_guard.hash,
);
}
});
Expand Down
2 changes: 1 addition & 1 deletion src/lag_register/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ async fn process_consumer_groups(
}

// ... then, remove groups that are in `lag_register_groups` but are not known (anymore)
lag_register_groups.write().await.retain(|g, _| !known_groups.contains(g));
lag_register_groups.write().await.retain(|g, _| known_groups.contains(g));
}

async fn process_offset_commit(
Expand Down

0 comments on commit ff32275

Please sign in to comment.