You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
We need to see which topics do not exist in Kafka clusters if we get an error from WriteMessages. We can't enable the "Auto Topic Creation" config because of our business needs. We can't extract the error from "WriteErrors" because error happens before batch operations.
Describe the solution you would like
Like MessageTooLargeError
func (w*Writer) partitions(ctx context.Context, topicstring) (int, error) {
client:=w.client(w.readTimeout())
// Here we use the transport directly as an optimization to avoid the// construction of temporary request and response objects made by the// (*Client).Metadata API.//// It is expected that the transport will optimize this request by// caching recent results (the kafka.Transport types does).r, err:=client.transport().RoundTrip(ctx, client.Addr, &metadataAPI.Request{
TopicNames: []string{topic},
AllowAutoTopicCreation: w.AllowAutoTopicCreation,
})
iferr!=nil {
return0, err
}
for_, t:=ranger.(*metadataAPI.Response).Topics {
ift.Name==topic {
// This should always hit, unless kafka has a bug.ift.ErrorCode!=0 {
return0, Error(t.ErrorCode)
}
returnlen(t.Partitions), nil
}
}
return0, UnknownTopicOrPartition
}
The text was updated successfully, but these errors were encountered:
We need to see which topics do not exist in Kafka clusters if we get an error from WriteMessages. We can't enable the "Auto Topic Creation" config because of our business needs. We can't extract the error from "WriteErrors" because error happens before batch operations.
Describe the solution you would like
Like MessageTooLargeError
We need to implement a custom type error.
The text was updated successfully, but these errors were encountered: