From 2937d82e44be34ef117b0b93c63f832add3e91e8 Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Thu, 18 May 2023 21:47:37 +0300 Subject: [PATCH] Fix unnecessary topic metadata requests --- transport.go | 43 ++++++++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/transport.go b/transport.go index b66422966..685bdddb1 100644 --- a/transport.go +++ b/transport.go @@ -81,6 +81,10 @@ type Transport struct { // Default to 6s. MetadataTTL time.Duration + // Topic names for the metadata cached by this transport. If this field is left blank, + // metadata information of all topics in the cluster will be retrieved. + MetadataTopics []string + // Unique identifier that the transport communicates to the brokers when it // sends requests. ClientID string @@ -235,14 +239,15 @@ func (t *Transport) grabPool(addr net.Addr) *connPool { p = &connPool{ refc: 2, - dial: t.dial(), - dialTimeout: t.dialTimeout(), - idleTimeout: t.idleTimeout(), - metadataTTL: t.metadataTTL(), - clientID: t.ClientID, - tls: t.TLS, - sasl: t.SASL, - resolver: t.Resolver, + dial: t.dial(), + dialTimeout: t.dialTimeout(), + idleTimeout: t.idleTimeout(), + metadataTTL: t.metadataTTL(), + metadataTopics: t.MetadataTopics, + clientID: t.ClientID, + tls: t.TLS, + sasl: t.SASL, + resolver: t.Resolver, ready: make(event), wake: make(chan event), @@ -276,14 +281,15 @@ type connPool struct { // Immutable fields of the connection pool. Connections access these field // on their parent pool in a ready-only fashion, so no synchronization is // required. - dial func(context.Context, string, string) (net.Conn, error) - dialTimeout time.Duration - idleTimeout time.Duration - metadataTTL time.Duration - clientID string - tls *tls.Config - sasl sasl.Mechanism - resolver BrokerResolver + dial func(context.Context, string, string) (net.Conn, error) + dialTimeout time.Duration + idleTimeout time.Duration + metadataTTL time.Duration + metadataTopics []string + clientID string + tls *tls.Config + sasl sasl.Mechanism + resolver BrokerResolver // Signaling mechanisms to orchestrate communications between the pool and // the rest of the program. once sync.Once // ensure that `ready` is triggered only once @@ -592,13 +598,16 @@ func (p *connPool) discover(ctx context.Context, wake <-chan event) { var notify event done := ctx.Done() + req := &meta.Request{ + TopicNames: p.metadataTopics, + } + for { c, err := p.grabClusterConn(ctx) if err != nil { p.update(ctx, nil, err) } else { res := make(async, 1) - req := &meta.Request{} deadline, cancel := context.WithTimeout(ctx, p.metadataTTL) c.reqs <- connRequest{ ctx: deadline,