From e7e4dcec787e8e75bf323ecc5c44679c416b38d5 Mon Sep 17 00:00:00 2001 From: Nazim Date: Thu, 10 Aug 2023 13:20:13 +0300 Subject: [PATCH] remade WaitInit method in readerReconnector --- .../topicreaderinternal/stream_reconnector.go | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/internal/topic/topicreaderinternal/stream_reconnector.go b/internal/topic/topicreaderinternal/stream_reconnector.go index 1a9663592..6705fcbde 100644 --- a/internal/topic/topicreaderinternal/stream_reconnector.go +++ b/internal/topic/topicreaderinternal/stream_reconnector.go @@ -50,6 +50,7 @@ type readerReconnector struct { streamErr error closedErr error + initErr error initDone bool initDoneCh empty.Chan } @@ -260,12 +261,24 @@ func (r *readerReconnector) reconnect(ctx context.Context, oldReader batchedStre r.reconnectFromBadStream <- newReconnectRequest(oldReader, reason) trace.TopicOnReaderReconnectRequest(r.tracer, err, true) }(err) + } else if err != nil { + r.m.WithLock(func() { + if !r.initDone { + r.initDone = true + close(r.initDoneCh) + r.initErr = err + } + }) } r.m.WithLock(func() { r.streamErr = err if err == nil { r.streamVal = newStream + if !r.initDone { + r.initDone = true + close(r.initDoneCh) + } } }) return err @@ -315,15 +328,6 @@ func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, err err } if res.err == nil { - r.m.WithLock( - func() { - if !r.initDone { - close(r.initDoneCh) - } - r.initDone = true - }, - ) - return res.stream, nil } return nil, res.err @@ -334,7 +338,7 @@ func (r *readerReconnector) WaitInit(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() case <-r.initDoneCh: - return nil + return r.initErr } }