From 4a8a6449fc67dd76bf51b0da957ab13bd7c6396a Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 20 Dec 2024 10:28:41 +0100 Subject: [PATCH] Switch to RoundRobbinBalancer --- pkg/usage/service.go | 10 ++++++++++ pkg/usage/stats.go | 5 +++++ 2 files changed, 15 insertions(+) diff --git a/pkg/usage/service.go b/pkg/usage/service.go index b3e1404f4b8e1..ab5a5b025bd02 100644 --- a/pkg/usage/service.go +++ b/pkg/usage/service.go @@ -37,8 +37,18 @@ func NewService(kafkaCfg kafka.Config, consumerGroup string, logger log.Logger, client, err := client.NewReaderClient(kafkaCfg, kprom, logger, kgo.ConsumerGroup(consumerGroup), kgo.ConsumeTopics(topic), + kgo.Balancers(kgo.RoundRobinBalancer()), kgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(time.Now().Add(-windowSize).UnixMilli())), kgo.DisableAutoCommit(), + kgo.OnPartitionsAssigned(func(_ context.Context, _ *kgo.Client, partitions map[string][]int32) { + level.Info(logger).Log("msg", "assigned partitions", "partitions", partitions) + }), + kgo.OnPartitionsRevoked(func(_ context.Context, _ *kgo.Client, partitions map[string][]int32) { + level.Info(logger).Log("msg", "revoked partitions", "partitions", partitions) + }), + kgo.OnPartitionsLost(func(_ context.Context, _ *kgo.Client, partitions map[string][]int32) { + level.Info(logger).Log("msg", "lost partitions", "partitions", partitions) + }), ) if err != nil { return nil, err diff --git a/pkg/usage/stats.go b/pkg/usage/stats.go index 9e40b6dbde7e8..715caecd71073 100644 --- a/pkg/usage/stats.go +++ b/pkg/usage/stats.go @@ -55,6 +55,11 @@ func (u *usageStats) addEntry(partition int32, tenantID string, streamHash uint6 u.stats[partition] = pStats } + // If the partition has already processed this offset, skip it + if pStats.offset != 0 && pStats.offset >= offset { + return + } + pStats.offset = offset pStats.timestamp = timestamp