You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I am encountering an bug with v3.1.0.
v3.0.3 is able to successfully consume in this scenario.
This is related to the rewrite of internal/consume/PartitionConsumer.go ( getOffsetBounds ) method.
Seems to me like the check comparing oldest offset to startOffset was removed in the move from 3.0.3 to 3.1.0
I applied the following quick fix locally which enables --tail option to succeed in this case. [ L155 ]
if flags.Tail > 0 && startOffset == sarama.OffsetNewest {
//When --tail is used compute startOffset so that it minimizes the number of messages consumed
minOffset := endOffset - int64(flags.Tail)
oldestOffset, err := (*client).GetOffset(topic, currentPartition, sarama.OffsetOldest)
if err != nil {
return -1, -1, errors.Errorf("failed to get offset for topic %s Partition %d: %v", topic, currentPartition, err)
}
if minOffset < oldestOffset {
startOffset = oldestOffset
} else if minOffset > 0 {
startOffset = minOffset
} else {
startOffset = sarama.OffsetOldest
}
}
I have a topic with 30 day DELETE retention policy which has recently had all messages deleted as per the retention policy leaving the topic described below:
~/$: kafkactl consume ingestion-topic --tail 1 -V
[kafkactl] 2023/07/03 10:59:59 Using config file: /Users/user/.config/kafkactl/config.yml
[kafkactl] 2023/07/03 10:59:59 Assuming kafkaVersion: 2.5.0
[kafkactl] 2023/07/03 10:59:59 Assuming kafkaVersion: 2.5.0
[kafkactl] 2023/07/03 10:59:59 using default admin request timeout: 3s
[kafkactl] 2023/07/03 10:59:59 TLS is enabled.
[sarama ] 2023/07/03 10:59:59 Initializing new client
[sarama ] 2023/07/03 10:59:59 client/metadata fetching metadata for all topics from broker broker1
[sarama ] 2023/07/03 10:59:59 Connected to broker at broker1 (unregistered)
[sarama ] 2023/07/03 10:59:59 client/brokers registered new broker #5 at broker1
[sarama ] 2023/07/03 10:59:59 client/brokers registered new broker #4 at broker1
[sarama ] 2023/07/03 10:59:59 client/brokers registered new broker #1 at broker1
[sarama ] 2023/07/03 10:59:59 client/brokers registered new broker #2 at broker1
[sarama ] 2023/07/03 10:59:59 client/brokers registered new broker #3 at broker1
[sarama ] 2023/07/03 10:59:59 Successfully initialized new client
[kafkactl] 2023/07/03 10:59:59 Start consuming topic: ingestion-topic
[sarama ] 2023/07/03 10:59:59 Connected to broker at broker1 (registered as #4)
[sarama ] 2023/07/03 10:59:59 Connected to broker at broker1 (registered as #2)
[sarama ] 2023/07/03 10:59:59 Connected to broker at broker1 (registered as #1)
[sarama ] 2023/07/03 10:59:59 Connected to broker at broker1 (registered as #5)
[kafkactl] 2023/07/03 11:00:00 consumer will consume offset 5058124 to 5058125 on partition 2
[kafkactl] 2023/07/03 11:00:00 consumer will consume offset 5426174 to 5426175 on partition 1
[kafkactl] 2023/07/03 11:00:00 consumer will consume offset 5100613 to 5100614 on partition 3
[kafkactl] 2023/07/03 11:00:00 consumer will consume offset 45408716 to 45408717 on partition 0
[kafkactl] 2023/07/03 11:00:00 Start consuming partition 2 from offset 5058124 to 5058125
[kafkactl] 2023/07/03 11:00:00 Start consuming partition 0 from offset 45408716 to 45408717
Failed to start consumer: Failed to start consumer for partition 3: kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition
You can see in the above logs, that the offsets being calculated are 'newestOffset - 1' but that offset doesn't exist because it has been deleted now.
Relevant part of 3.0.3 logs running the same command.
[kafkactl] 2023/07/03 11:24:51 Skipping partition 1
[kafkactl] 2023/07/03 11:24:51 Skipping partition 3
[kafkactl] 2023/07/03 11:24:51 Start consuming partition 2 from offset 5058125 to 5058125
[kafkactl] 2023/07/03 11:24:51 Start consuming partition 0 from offset 45408717 to 45408717
You can see the offsets are calculated much differently in the working 3.0.3 version.
The text was updated successfully, but these errors were encountered:
I am sorry, but we have only limited resources at the moment and I cannot predict when we will have time to analyse the problem. If you can come up with a PR, that would speed things up :)
I am encountering an bug with v3.1.0.
v3.0.3 is able to successfully consume in this scenario.
This is related to the rewrite of internal/consume/PartitionConsumer.go ( getOffsetBounds ) method.
Seems to me like the check comparing oldest offset to startOffset was removed in the move from 3.0.3 to 3.1.0
I applied the following quick fix locally which enables --tail option to succeed in this case. [ L155 ]
I have a topic with 30 day DELETE retention policy which has recently had all messages deleted as per the retention policy leaving the topic described below:
VERSION 3.1.0 LOGS
VERSION 3.1.0 LOGS
You can see in the above logs, that the offsets being calculated are 'newestOffset - 1' but that offset doesn't exist because it has been deleted now.
Relevant part of 3.0.3 logs running the same command.
You can see the offsets are calculated much differently in the working 3.0.3 version.
The text was updated successfully, but these errors were encountered: