Skip to content

Commit

Permalink
remade WaitInit method in readerReconnector
Browse files Browse the repository at this point in the history
  • Loading branch information
gingersamurai committed Aug 10, 2023
1 parent 916ebf7 commit e7e4dce
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions internal/topic/topicreaderinternal/stream_reconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type readerReconnector struct {
streamErr error
closedErr error

initErr error
initDone bool
initDoneCh empty.Chan
}
Expand Down Expand Up @@ -260,12 +261,24 @@ func (r *readerReconnector) reconnect(ctx context.Context, oldReader batchedStre
r.reconnectFromBadStream <- newReconnectRequest(oldReader, reason)
trace.TopicOnReaderReconnectRequest(r.tracer, err, true)
}(err)
} else if err != nil {
r.m.WithLock(func() {
if !r.initDone {
r.initDone = true
close(r.initDoneCh)
r.initErr = err
}
})
}

r.m.WithLock(func() {
r.streamErr = err
if err == nil {
r.streamVal = newStream
if !r.initDone {
r.initDone = true
close(r.initDoneCh)
}
}
})
return err
Expand Down Expand Up @@ -315,15 +328,6 @@ func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, err err
}

if res.err == nil {
r.m.WithLock(
func() {
if !r.initDone {
close(r.initDoneCh)
}
r.initDone = true
},
)

return res.stream, nil
}
return nil, res.err
Expand All @@ -334,7 +338,7 @@ func (r *readerReconnector) WaitInit(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case <-r.initDoneCh:
return nil
return r.initErr
}
}

Expand Down

0 comments on commit e7e4dce

Please sign in to comment.