Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v3.1.0 - tail not properly calculating offsets #154

Open
daburch opened this issue Jul 3, 2023 · 3 comments
Open

v3.1.0 - tail not properly calculating offsets #154

daburch opened this issue Jul 3, 2023 · 3 comments
Labels
bug Something isn't working

Comments

@daburch
Copy link

daburch commented Jul 3, 2023

I am encountering an bug with v3.1.0.
v3.0.3 is able to successfully consume in this scenario.

This is related to the rewrite of internal/consume/PartitionConsumer.go ( getOffsetBounds ) method.

Seems to me like the check comparing oldest offset to startOffset was removed in the move from 3.0.3 to 3.1.0

I applied the following quick fix locally which enables --tail option to succeed in this case. [ L155 ]

if flags.Tail > 0 && startOffset == sarama.OffsetNewest {
        //When --tail is used compute startOffset so that it minimizes the number of messages consumed
        minOffset := endOffset - int64(flags.Tail)

        oldestOffset, err := (*client).GetOffset(topic, currentPartition, sarama.OffsetOldest)
        if err != nil {
            return -1, -1, errors.Errorf("failed to get offset for topic %s Partition %d: %v", topic, currentPartition, err)
        }

        if minOffset < oldestOffset {
            startOffset = oldestOffset
        } else if minOffset > 0 {
            startOffset = minOffset
        } else {
            startOffset = sarama.OffsetOldest
        }
    }

I have a topic with 30 day DELETE retention policy which has recently had all messages deleted as per the retention policy leaving the topic described below:

VERSION 3.1.0 LOGS

CONFIG                   VALUE
compression.type          gzip
min.insync.replicas       2
cleanup.policy            delete
retention.ms              2592000000
message.timestamp.type    LogAppendTime

PARTITION     OLDEST_OFFSET     NEWEST_OFFSET     EMPTY     LEADER                REPLICAS       IN_SYNC_REPLICAS
0              45408711             45408718       false    broker1:12345          3,4,5              3,4,5
1              5426176              5426176        true     broker1:12345          1,2,4              1,2,4
2              5058115              5058126        false    broker1:12345          1,2,5              1,2,5
3              5100615              5100615        true     broker1:12345          2,3,5              2,3,5

VERSION 3.1.0 LOGS

~/$: kafkactl consume ingestion-topic --tail 1 -V

[kafkactl] 2023/07/03 10:59:59 Using config file: /Users/user/.config/kafkactl/config.yml
[kafkactl] 2023/07/03 10:59:59 Assuming kafkaVersion: 2.5.0
[kafkactl] 2023/07/03 10:59:59 Assuming kafkaVersion: 2.5.0
[kafkactl] 2023/07/03 10:59:59 using default admin request timeout: 3s
[kafkactl] 2023/07/03 10:59:59 TLS is enabled.
[sarama  ] 2023/07/03 10:59:59 Initializing new client

[sarama  ] 2023/07/03 10:59:59 client/metadata fetching metadata for all topics from broker broker1
[sarama  ] 2023/07/03 10:59:59 Connected to broker at broker1 (unregistered)
[sarama  ] 2023/07/03 10:59:59 client/brokers registered new broker #5 at broker1
[sarama  ] 2023/07/03 10:59:59 client/brokers registered new broker #4 at broker1
[sarama  ] 2023/07/03 10:59:59 client/brokers registered new broker #1 at broker1
[sarama  ] 2023/07/03 10:59:59 client/brokers registered new broker #2 at broker1
[sarama  ] 2023/07/03 10:59:59 client/brokers registered new broker #3 at broker1

[sarama  ] 2023/07/03 10:59:59 Successfully initialized new client
[kafkactl] 2023/07/03 10:59:59 Start consuming topic: ingestion-topic

[sarama  ] 2023/07/03 10:59:59 Connected to broker at broker1 (registered as #4)
[sarama  ] 2023/07/03 10:59:59 Connected to broker at broker1 (registered as #2)
[sarama  ] 2023/07/03 10:59:59 Connected to broker at broker1 (registered as #1)
[sarama  ] 2023/07/03 10:59:59 Connected to broker at broker1 (registered as #5)

[kafkactl] 2023/07/03 11:00:00 consumer will consume offset 5058124 to 5058125 on partition 2
[kafkactl] 2023/07/03 11:00:00 consumer will consume offset 5426174 to 5426175 on partition 1
[kafkactl] 2023/07/03 11:00:00 consumer will consume offset 5100613 to 5100614 on partition 3
[kafkactl] 2023/07/03 11:00:00 consumer will consume offset 45408716 to 45408717 on partition 0

[kafkactl] 2023/07/03 11:00:00 Start consuming partition 2 from offset 5058124 to 5058125
[kafkactl] 2023/07/03 11:00:00 Start consuming partition 0 from offset 45408716 to 45408717

Failed to start consumer: Failed to start consumer for partition 3: kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition

You can see in the above logs, that the offsets being calculated are 'newestOffset - 1' but that offset doesn't exist because it has been deleted now.

Relevant part of 3.0.3 logs running the same command.

[kafkactl] 2023/07/03 11:24:51 Skipping partition 1
[kafkactl] 2023/07/03 11:24:51 Skipping partition 3
[kafkactl] 2023/07/03 11:24:51 Start consuming partition 2 from offset 5058125 to 5058125
[kafkactl] 2023/07/03 11:24:51 Start consuming partition 0 from offset 45408717 to 45408717

You can see the offsets are calculated much differently in the working 3.0.3 version.

@daburch
Copy link
Author

daburch commented Jul 3, 2023

I am also seeing a similar but different issue when attempting to consume using --offset.

:~/$ kafkactl consume ingestion-topic --offset 0=45408717
Failed to start consumer: unable to find offset parameter for partition 3: [0=45408717]

Seems like error handling on L251 is triggering for partition 3 which isn't even being consumed from.

@pando85
Copy link

pando85 commented Jul 7, 2023

Same problem here with offset parameter when using Kubernetes. In local execution runs perfect.

@d-rk d-rk added the bug Something isn't working label Jul 14, 2023
@d-rk
Copy link
Collaborator

d-rk commented Jul 14, 2023

Hey,

I am sorry, but we have only limited resources at the moment and I cannot predict when we will have time to analyse the problem. If you can come up with a PR, that would speed things up :)

Regard

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants