Skip to content

Commit

Permalink
Removed max_width = 120 from .rustfmt.toml
Browse files Browse the repository at this point in the history
  • Loading branch information
detro committed Aug 2, 2023
1 parent ff51805 commit b23d0c5
Show file tree
Hide file tree
Showing 21 changed files with 226 additions and 72 deletions.
1 change: 0 additions & 1 deletion .rustfmt.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,5 @@ reorder_imports = true
reorder_modules = true
edition = "2021"
match_block_trailing_comma = true
max_width = 120
use_field_init_shorthand = true
use_try_shorthand = true
4 changes: 3 additions & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ impl Cli {

pub fn build_client_config(&self) -> ClientConfig {
let mut config = ClientConfig::new();
config.set("bootstrap.servers", self.bootstrap_brokers.clone()).set("client.id", self.client_id.clone());
config
.set("bootstrap.servers", self.bootstrap_brokers.clone())
.set("client.id", self.client_id.clone());
for cfg in &self.kafka_config {
config.set(cfg.0.clone(), cfg.1.clone());
}
Expand Down
13 changes: 7 additions & 6 deletions src/cluster_status/emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ impl Emitter for ClusterStatusEmitter {
///
/// * `shutdown_token`: A [`CancellationToken`] that, when cancelled, will make the internal loop terminate.
///
fn spawn(&self, shutdown_token: CancellationToken) -> (mpsc::Receiver<Self::Emitted>, JoinHandle<()>) {
fn spawn(
&self,
shutdown_token: CancellationToken,
) -> (mpsc::Receiver<Self::Emitted>, JoinHandle<()>) {
let admin_client: AdminClient<DefaultClientContext> =
self.admin_client_config.create().expect("Failed to allocate Admin Client");

Expand All @@ -99,11 +102,9 @@ impl Emitter for ClusterStatusEmitter {
let mut interval = interval(FETCH_INTERVAL);

loop {
match admin_client
.inner()
.fetch_metadata(None, FETCH_TIMEOUT)
.map(|m| Self::Emitted::from(admin_client.inner().fetch_cluster_id(FETCH_TIMEOUT), m))
{
match admin_client.inner().fetch_metadata(None, FETCH_TIMEOUT).map(|m| {
Self::Emitted::from(admin_client.inner().fetch_cluster_id(FETCH_TIMEOUT), m)
}) {
Ok(status) => {
tokio::select! {
res = Self::emit_with_interval(&sx, status, &mut interval) => {
Expand Down
6 changes: 5 additions & 1 deletion src/cluster_status/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ impl ClusterStatusRegister {
pub async fn get_partitions_for_topic(&self, topic: &str) -> Option<Vec<u32>> {
match &*(self.latest_status.read().await) {
None => None,
Some(cs) => cs.topics.iter().find(|t| t.name == topic).map(|t| t.partitions.iter().map(|p| p.id).collect()),
Some(cs) => cs
.topics
.iter()
.find(|t| t.name == topic)
.map(|t| t.partitions.iter().map(|p| p.id).collect()),
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ pub(crate) const DEFAULT_HTTP_HOST: &str = "localhost";
pub(crate) const DEFAULT_HTTP_PORT: &str = "9090";

/// The default host:port to bind to when launching internal HTTP server.
pub(crate) const DEFAULT_HTTP_HOST_PORT: &str = formatcp!("{DEFAULT_HTTP_HOST}:{DEFAULT_HTTP_PORT}");
pub(crate) const DEFAULT_HTTP_HOST_PORT: &str =
formatcp!("{DEFAULT_HTTP_HOST}:{DEFAULT_HTTP_PORT}");
10 changes: 8 additions & 2 deletions src/consumer_groups/emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ impl Emitter for ConsumerGroupsEmitter {
///
/// * `shutdown_token`: A [`CancellationToken`] that, when cancelled, will make the internal loop terminate.
///
fn spawn(&self, shutdown_token: CancellationToken) -> (mpsc::Receiver<Self::Emitted>, JoinHandle<()>) {
fn spawn(
&self,
shutdown_token: CancellationToken,
) -> (mpsc::Receiver<Self::Emitted>, JoinHandle<()>) {
let admin_client: AdminClient<DefaultClientContext> =
self.admin_client_config.create().expect("Failed to allocate Admin Client");

Expand All @@ -134,7 +137,10 @@ impl Emitter for ConsumerGroupsEmitter {
let mut interval = interval(FETCH_INTERVAL);

loop {
let res_groups = admin_client.inner().fetch_group_list(None, FETCH_TIMEOUT).map(Self::Emitted::from);
let res_groups = admin_client
.inner()
.fetch_group_list(None, FETCH_TIMEOUT)
.map(Self::Emitted::from);

match res_groups {
Ok(groups) => {
Expand Down
41 changes: 34 additions & 7 deletions src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ pub async fn init(
.with_graceful_shutdown(shutdown_token.cancelled());

info!("Begin listening on '{}'...", socket_addr);
server.await.expect("HTTP Graceful Shutdown handler returned an error - this should never happen");
server
.await
.expect("HTTP Graceful Shutdown handler returned an error - this should never happen");
}

async fn root() -> &'static str {
Expand All @@ -88,26 +90,45 @@ async fn prometheus_metrics(State(state): State<HttpServiceState>) -> impl IntoR
//
// The capacity is necessarily a function of the number of metric types produced,
// and the number of topic partitions.
let tp_count: usize =
state.lag_reg.lag_by_group.read().await.iter().map(|(_, gwl)| gwl.lag_by_topic_partition.len()).sum();
let tp_count: usize = state
.lag_reg
.lag_by_group
.read()
.await
.iter()
.map(|(_, gwl)| gwl.lag_by_topic_partition.len())
.sum();
let metric_types_count: usize = 3;
let headers_footers_count: usize = metric_types_count * 2;
let metrics_count: usize = tp_count * metric_types_count;
let mut body: Vec<String> = Vec::with_capacity(metrics_count + headers_footers_count);

// ----------------------------------------------------------- METRIC: consumer_partition_offset
consumer_partition_offset::append_headers(&mut body);
iter_lag_reg(&state.lag_reg, &mut body, &cluster_id, consumer_partition_offset::append_metric).await;
iter_lag_reg(&state.lag_reg, &mut body, &cluster_id, consumer_partition_offset::append_metric)
.await;
body.push(String::new());

// ------------------------------------------------------- METRIC: consumer_partition_lag_offset
consumer_partition_lag_offset::append_headers(&mut body);
iter_lag_reg(&state.lag_reg, &mut body, &cluster_id, consumer_partition_lag_offset::append_metric).await;
iter_lag_reg(
&state.lag_reg,
&mut body,
&cluster_id,
consumer_partition_lag_offset::append_metric,
)
.await;
body.push(String::new());

// ------------------------------------------------- METRIC: consumer_partition_lag_milliseconds
consumer_partition_lag_milliseconds::append_headers(&mut body);
iter_lag_reg(&state.lag_reg, &mut body, &cluster_id, consumer_partition_lag_milliseconds::append_metric).await;
iter_lag_reg(
&state.lag_reg,
&mut body,
&cluster_id,
consumer_partition_lag_milliseconds::append_metric,
)
.await;
body.push(String::new());

// ------------------------------------------------- METRIC: partition_earliest_available_offset
Expand Down Expand Up @@ -135,7 +156,13 @@ async fn prometheus_metrics(State(state): State<HttpServiceState>) -> impl IntoR
for tp in tps.iter() {
match state.po_reg.get_latest_available_offset(tp).await {
Ok(lao) => {
partition_latest_available_offset::append_metric(&cluster_id, &tp.topic, tp.partition, lao, &mut body);
partition_latest_available_offset::append_metric(
&cluster_id,
&tp.topic,
tp.partition,
lao,
&mut body,
);
},
Err(e) => {
warn!("Unable to generate 'partition_latest_available_offset': {e}");
Expand Down
10 changes: 8 additions & 2 deletions src/internals/emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use tokio_util::sync::CancellationToken;
pub trait Emitter {
type Emitted: Send;

fn spawn(&self, shutdown_token: CancellationToken) -> (mpsc::Receiver<Self::Emitted>, JoinHandle<()>);
fn spawn(
&self,
shutdown_token: CancellationToken,
) -> (mpsc::Receiver<Self::Emitted>, JoinHandle<()>);

/// Emit the `Self::Emitted`, but first wait for the next `interval` tick.
///
Expand Down Expand Up @@ -45,7 +48,10 @@ pub trait Emitter {
) -> Result<(), mpsc::error::SendError<Self::Emitted>> {
// Warn in case channel is saturated
if sender.capacity() == 0 {
warn!("Channel to emit {} saturated: receiver too slow?", std::any::type_name::<Self::Emitted>());
warn!(
"Channel to emit {} saturated: receiver too slow?",
std::any::type_name::<Self::Emitted>()
);
}

// Send the object
Expand Down
20 changes: 16 additions & 4 deletions src/konsumer_offsets_data/emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,19 @@ impl KonsumerOffsetsDataEmitter {
)))?;

// Prepare desired assignment, setting offset to earliest available for each partition
let mut desired_assignment = TopicPartitionList::with_capacity(topic_meta.partitions().len());
let mut desired_assignment =
TopicPartitionList::with_capacity(topic_meta.partitions().len());
for partition_meta in topic_meta.partitions().iter() {
let (earliest, _) = consumer.fetch_watermarks(topic, partition_meta.id(), Duration::from_millis(500))?;
desired_assignment.add_partition_offset(topic, partition_meta.id(), Offset::Offset(earliest))?;
let (earliest, _) = consumer.fetch_watermarks(
topic,
partition_meta.id(),
Duration::from_millis(500),
)?;
desired_assignment.add_partition_offset(
topic,
partition_meta.id(),
Offset::Offset(earliest),
)?;
}

// Finally, self-assign
Expand Down Expand Up @@ -130,7 +139,10 @@ impl Emitter for KonsumerOffsetsDataEmitter {
///
/// * `shutdown_token`: A [`CancellationToken`] that, when cancelled, will make the internal loop terminate.
///
fn spawn(&self, shutdown_token: CancellationToken) -> (mpsc::Receiver<Self::Emitted>, JoinHandle<()>) {
fn spawn(
&self,
shutdown_token: CancellationToken,
) -> (mpsc::Receiver<Self::Emitted>, JoinHandle<()>) {
let consumer_context = KonsumerOffsetsDataContext;

let consumer_client: KonsumerOffsetsDataConsumer =
Expand Down
55 changes: 40 additions & 15 deletions src/lag_register/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ impl LagRegister {
name,
gwl.lag_by_topic_partition.len(),
gwl.lag_by_topic_partition.iter().filter(|x| x.1.lag.is_some()).count(),
gwl.lag_by_topic_partition.iter().filter(|x| x.1.owner.is_some()).count(),
gwl.lag_by_topic_partition
.iter()
.filter(|x| x.1.owner.is_some())
.count(),
);
}
}
Expand All @@ -121,7 +124,10 @@ impl LagRegister {
}
}

async fn process_consumer_groups(cg: ConsumerGroups, lag_register_groups: Arc<RwLock<HashMap<String, GroupWithLag>>>) {
async fn process_consumer_groups(
cg: ConsumerGroups,
lag_register_groups: Arc<RwLock<HashMap<String, GroupWithLag>>>,
) {
for (group_name, group_with_members) in cg.groups.into_iter() {
// Ignore own consumer of `__consumer_offsets` topic.
if group_name == KONSUMER_OFFSETS_KCL_CONSUMER {
Expand Down Expand Up @@ -179,12 +185,13 @@ async fn process_consumer_groups(cg: ConsumerGroups, lag_register_groups: Arc<Rw
// either update the owner Member of an existing one,
// or create a new entry with no Lag set.
for (tp, m) in members_by_topic_partition.into_iter() {
gwl.lag_by_topic_partition.entry(tp).and_modify(|lwo| lwo.owner = Some(m.clone())).or_insert_with(
|| LagWithOwner {
gwl.lag_by_topic_partition
.entry(tp)
.and_modify(|lwo| lwo.owner = Some(m.clone()))
.or_insert_with(|| LagWithOwner {
owner: Some(m),
..Default::default()
},
);
});
}
};
}
Expand Down Expand Up @@ -220,7 +227,10 @@ async fn process_offset_commit(
0
},
},
time_lag: match po_reg.estimate_time_lag(&tp, oc.offset as u64, oc.commit_timestamp).await {
time_lag: match po_reg
.estimate_time_lag(&tp, oc.offset as u64, oc.commit_timestamp)
.await
{
Ok(tl) => tl,
Err(e) => {
error!(
Expand All @@ -235,20 +245,28 @@ async fn process_offset_commit(
// Create or update entry `TopicPartition -> LagWithOwner`:
// either update the Lag of an existing one,
// or create a new entry with no owner set.
gwl.lag_by_topic_partition.entry(tp).and_modify(|lwo| lwo.lag = Some(l.clone())).or_insert_with(|| {
LagWithOwner {
gwl.lag_by_topic_partition
.entry(tp)
.and_modify(|lwo| lwo.lag = Some(l.clone()))
.or_insert_with(|| LagWithOwner {
lag: Some(l),
owner: None,
}
});
});
},
None => {
warn!("Received {} about unknown Group '{}': ignoring", std::any::type_name::<OffsetCommit>(), oc.group);
warn!(
"Received {} about unknown Group '{}': ignoring",
std::any::type_name::<OffsetCommit>(),
oc.group
);
},
}
}

async fn process_group_metadata(gm: GroupMetadata, lag_register_groups: Arc<RwLock<HashMap<String, GroupWithLag>>>) {
async fn process_group_metadata(
gm: GroupMetadata,
lag_register_groups: Arc<RwLock<HashMap<String, GroupWithLag>>>,
) {
// Ignore own consumer of `__consumer_offsets` topic.
if gm.group == KONSUMER_OFFSETS_KCL_CONSUMER {
return;
Expand Down Expand Up @@ -287,7 +305,10 @@ async fn process_group_metadata(gm: GroupMetadata, lag_register_groups: Arc<RwLo
.map(|tp| (tp, owner.clone()))
.collect::<HashMap<TopicPartition, Member>>();

assignment_tps.into_iter().chain(subscription_tps).collect::<HashMap<TopicPartition, Member>>()
assignment_tps
.into_iter()
.chain(subscription_tps)
.collect::<HashMap<TopicPartition, Member>>()
})
.collect::<HashMap<TopicPartition, Member>>();

Expand All @@ -305,7 +326,11 @@ async fn process_group_metadata(gm: GroupMetadata, lag_register_groups: Arc<RwLo
}
},
None => {
warn!("Received {} about unknown Group '{}': ignoring", std::any::type_name::<GroupMetadata>(), gm.group);
warn!(
"Received {} about unknown Group '{}': ignoring",
std::any::type_name::<GroupMetadata>(),
gm.group
);
},
}
}
Expand Down
9 changes: 6 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
let shutdown_token = build_shutdown_token();

// Init `cluster_status` module, and await registry to be ready
let (cs_reg, cs_join) = cluster_status::init(admin_client_config.clone(), cli.cluster_id, shutdown_token.clone());
let (cs_reg, cs_join) =
cluster_status::init(admin_client_config.clone(), cli.cluster_id, shutdown_token.clone());
cs_reg.await_ready(shutdown_token.clone()).await?;
let cs_reg_arc = Arc::new(cs_reg);

Expand All @@ -44,10 +45,12 @@ async fn main() -> Result<(), Box<dyn Error>> {
let po_reg_arc = Arc::new(po_reg);

// Init `konsumer_offsets_data` module
let (kod_rx, kod_join) = konsumer_offsets_data::init(admin_client_config.clone(), shutdown_token.clone());
let (kod_rx, kod_join) =
konsumer_offsets_data::init(admin_client_config.clone(), shutdown_token.clone());

// Init `consumer_groups` module
let (cg_rx, cg_join) = consumer_groups::init(admin_client_config.clone(), shutdown_token.clone());
let (cg_rx, cg_join) =
consumer_groups::init(admin_client_config.clone(), shutdown_token.clone());

// Init `lag_register` module, and await registry to be ready
let lag_reg = lag_register::init(cg_rx, kod_rx, po_reg_arc.clone());
Expand Down
Loading

0 comments on commit b23d0c5

Please sign in to comment.