Skip to content

Commit

Permalink
Merge pull request #61 from kafkesc/detro/60-map_todos_in_github_issues
Browse files Browse the repository at this point in the history
All TODOs are now mapped to GitHub issues
  • Loading branch information
detro authored Sep 24, 2023
2 parents ac42894 + 15c4587 commit 1df216f
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 44 deletions.
2 changes: 1 addition & 1 deletion src/cluster_status/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl ClusterStatusRegister {
}

/// Current Brokers constituting the Kafka cluster.
#[allow(unused)] // TODO Remove
#[allow(unused)]
pub async fn get_brokers(&self) -> Vec<Broker> {
match &*(self.latest_status.read().await) {
None => Vec::new(),
Expand Down
47 changes: 10 additions & 37 deletions src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,11 @@ use crate::lag_register::LagRegister;
use crate::partition_offsets::PartitionOffsetsRegister;
use crate::prometheus_metrics::bespoke::*;

// TODO HTTP Endpoints
// GET / - Landing page
// GET /metrics - List Prometheus Metrics
// GET /brokers - Cluster meta and list of Brokers
// GET /topics - List of Topics
// GET /topics/{t} - List of Partitions for Topic t
// GET /groups - List of Consumer Groups
// GET /groups/{g} - List of Members for Consumer group g
// GET /status/healthy - Service healthy
// GET /status/ready - Service (metrics) ready
//
// TODO Add a layer of compression for GZip (optional for Prometheus)
// TODO https://github.com/kafkesc/kommitted/issues/47
// TODO https://github.com/kafkesc/kommitted/issues/48
// TODO https://github.com/kafkesc/kommitted/issues/50
// TODO https://github.com/kafkesc/kommitted/issues/51
// TODO https://github.com/kafkesc/kommitted/issues/49

#[derive(Clone)]
struct HttpServiceState {
Expand Down Expand Up @@ -216,34 +209,14 @@ async fn prometheus_metrics(State(state): State<HttpServiceState>) -> impl IntoR
//
// --- CLUSTER METRICS ---
//
// TODO `kommitted_consumer_groups_total`
// LABELS: cluster_id?
//
// TODO `kommitted_consumer_group_members_total`
// LABELS: cluster_id?
//
// TODO `kommitted_cluster_status_brokers_total`
// LABELS: cluster_id?
//
// TODO `kommitted_cluster_status_topics_total`
// LABELS: cluster_id?
//
// TODO `kommitted_cluster_status_partitions_total`
// LABELS: cluster_id?
// TODO https://github.com/kafkesc/kommitted/issues/53
// TODO https://github.com/kafkesc/kommitted/issues/54
//
// --- KOMMITTED INTERNAL METRICS ---
//
// TODO `kommitted_consumer_groups_poll_time_seconds`
// HELP: Time taken to fetch information about all consumer groups in the cluster.
// LABELS: cluster_id?
//
// TODO `kommitted_cluster_status_poll_time_ms`
// HELP: Time taken to fetch information about the composition of the cluster (brokers, topics, partitions).
// LABELS: cluster_id?
//
// TODO `kommitted_partitions_watermark_offsets_poll_time_ms`
// HELP: Time taken to fetch earliest/latest (watermark) offsets of all the topic partitions of the cluster.
// LABELS: cluster_id?
// TODO https://github.com/kafkesc/kommitted/issues/55
// TODO https://github.com/kafkesc/kommitted/issues/56
// TODO https://github.com/kafkesc/kommitted/issues/57

// Turn the bespoke metrics created so far, into a String
let mut body = body.join("\n");
Expand Down
3 changes: 3 additions & 0 deletions src/internals/emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ pub trait Emitter {

// TODO Each `Emitter` implementation should report a metric about
// the current saturation of its emitting channel.
// See https://github.com/kafkesc/kommitted/issues/55
// See https://github.com/kafkesc/kommitted/issues/56
// See https://github.com/kafkesc/kommitted/issues/57

// Send the object
sender.send(emitted).await
Expand Down
8 changes: 2 additions & 6 deletions src/lag_register/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ pub struct LagWithOwner {
#[derive(Debug, Clone, Default)]
pub struct GroupWithLag {
pub(crate) group: Group,
// TODO Wrap in a `RwLock` so we can modify a specific group lag,
// without holding a w-lock on the whole register
// TODO https://github.com/kafkesc/kommitted/issues/58
pub(crate) lag_by_topic_partition: HashMap<TopicPartition, LagWithOwner>,
}

Expand Down Expand Up @@ -338,10 +337,7 @@ async fn process_group_metadata(
#[async_trait]
impl Awaitable for LagRegister {
async fn is_ready(&self) -> bool {
// TODO this is pretty "weak" as readyness-check.
// Something better would be to check that the registry has reached as "stable" number
// of groups with a stable number of registered lags against it.
// But that requires tracking changes over multiple checks: it can wait.
// TODO https://github.com/kafkesc/kommitted/issues/59
self.lag_by_group.read().await.len() > 0
}
}

0 comments on commit 1df216f

Please sign in to comment.