Skip to content

Commit

Permalink
Merge pull request #1131 from mhmtszr/fix-unnecessary-topic-metadata-…
Browse files Browse the repository at this point in the history
…request

Fix unnecessary topic metadata request
  • Loading branch information
rhansen2 authored Jul 7, 2023
2 parents 8492075 + 2937d82 commit e2a66a2
Showing 1 changed file with 26 additions and 17 deletions.
43 changes: 26 additions & 17 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ type Transport struct {
// Default to 6s.
MetadataTTL time.Duration

// Topic names for the metadata cached by this transport. If this field is left blank,
// metadata information of all topics in the cluster will be retrieved.
MetadataTopics []string

// Unique identifier that the transport communicates to the brokers when it
// sends requests.
ClientID string
Expand Down Expand Up @@ -235,14 +239,15 @@ func (t *Transport) grabPool(addr net.Addr) *connPool {
p = &connPool{
refc: 2,

dial: t.dial(),
dialTimeout: t.dialTimeout(),
idleTimeout: t.idleTimeout(),
metadataTTL: t.metadataTTL(),
clientID: t.ClientID,
tls: t.TLS,
sasl: t.SASL,
resolver: t.Resolver,
dial: t.dial(),
dialTimeout: t.dialTimeout(),
idleTimeout: t.idleTimeout(),
metadataTTL: t.metadataTTL(),
metadataTopics: t.MetadataTopics,
clientID: t.ClientID,
tls: t.TLS,
sasl: t.SASL,
resolver: t.Resolver,

ready: make(event),
wake: make(chan event),
Expand Down Expand Up @@ -276,14 +281,15 @@ type connPool struct {
// Immutable fields of the connection pool. Connections access these field
// on their parent pool in a ready-only fashion, so no synchronization is
// required.
dial func(context.Context, string, string) (net.Conn, error)
dialTimeout time.Duration
idleTimeout time.Duration
metadataTTL time.Duration
clientID string
tls *tls.Config
sasl sasl.Mechanism
resolver BrokerResolver
dial func(context.Context, string, string) (net.Conn, error)
dialTimeout time.Duration
idleTimeout time.Duration
metadataTTL time.Duration
metadataTopics []string
clientID string
tls *tls.Config
sasl sasl.Mechanism
resolver BrokerResolver
// Signaling mechanisms to orchestrate communications between the pool and
// the rest of the program.
once sync.Once // ensure that `ready` is triggered only once
Expand Down Expand Up @@ -592,13 +598,16 @@ func (p *connPool) discover(ctx context.Context, wake <-chan event) {
var notify event
done := ctx.Done()

req := &meta.Request{
TopicNames: p.metadataTopics,
}

for {
c, err := p.grabClusterConn(ctx)
if err != nil {
p.update(ctx, nil, err)
} else {
res := make(async, 1)
req := &meta.Request{}
deadline, cancel := context.WithTimeout(ctx, p.metadataTTL)
c.reqs <- connRequest{
ctx: deadline,
Expand Down

0 comments on commit e2a66a2

Please sign in to comment.