diff --git a/pkg/rpcinfo/rpcstats.go b/pkg/rpcinfo/rpcstats.go index 1f8ba7c2ca..338fc79843 100644 --- a/pkg/rpcinfo/rpcstats.go +++ b/pkg/rpcinfo/rpcstats.go @@ -27,14 +27,16 @@ import ( ) var ( - _ RPCStats = (*rpcStats)(nil) - _ MutableRPCStats = &rpcStats{} - _ internal.Reusable = (*rpcStats)(nil) - _ internal.Reusable = (*event)(nil) - rpcStatsPool sync.Pool - eventPool sync.Pool - once sync.Once - maxEventNum int + _ RPCStats = (*rpcStats)(nil) + _ MutableRPCStats = (*rpcStats)(nil) + _ internal.Reusable = (*rpcStats)(nil) + _ internal.Reusable = (*event)(nil) + + rpcStatsPool = sync.Pool{New: func() interface{} { return newRPCStats() }} + eventPool = sync.Pool{New: func() interface{} { return &event{} }} + + once sync.Once + maxEventNum int ) type event struct { @@ -69,10 +71,6 @@ func (e *event) IsNil() bool { return e == nil } -func newEvent() interface{} { - return &event{} -} - func (e *event) zero() { e.event = nil e.status = 0 @@ -95,10 +93,10 @@ type atomicPanicErr struct { } type rpcStats struct { - sync.RWMutex level stats.Level - eventMap []Event + eventStatus []uint32 // see comments of eventXXX consts below + eventMap []event sendSize uint64 lastSendSize uint64 // for Streaming APIs, record the size of the last sent message @@ -107,32 +105,64 @@ type rpcStats struct { err atomic.Value panicErr atomic.Value -} -func init() { - rpcStatsPool.New = newRPCStats - eventPool.New = newEvent -} + // true if rpcStats is from CopyForRetry + copied bool +} + +const ( // for (*rpcStats).eventStatus + eventUnset uint32 = 0b0000 // unset + eventUpdating uint32 = 0b0001 // updating, it's considered to be unset + eventRecorded uint32 = 0b0010 // updated, GetEvent will return the event + + // eventStale is only set by CopyForRetry, + // it represents data is recorded and copied from last rpcstat. + // + // FIXME: + // it may be overwritten later and would cause an issue below: + // - before retry, A, B are recorded, and user may use (B - A) for measuring latency. + // - after retry, only A is recorded, then (B - A) will be a negative number. + // - this is NOT a new issue introduced by eventStatus but caused by CopyForRetry. + // it may also cause data race issue if Record and GetEvent at the same time. + eventStale uint32 = 0b0100 | eventRecorded +) -func newRPCStats() interface{} { +func newRPCStats() *rpcStats { return &rpcStats{ - eventMap: make([]Event, maxEventNum), + eventStatus: make([]uint32, maxEventNum), + eventMap: make([]event, maxEventNum), } } +// NewRPCStats creates a new RPCStats. +func NewRPCStats() RPCStats { + once.Do(func() { + stats.FinishInitialization() + maxEventNum = stats.MaxEventNum() + }) + return rpcStatsPool.Get().(*rpcStats) +} + // Record implements the RPCStats interface. +// It only record once for each event, it will ignore follow events with the same Index() func (r *rpcStats) Record(ctx context.Context, e stats.Event, status stats.Status, info string) { if e.Level() > r.level { return } - eve := NewEvent(e, status, info) idx := e.Index() - r.Lock() - r.eventMap[idx] = eve - r.Unlock() + p := &r.eventStatus[idx] + if atomic.CompareAndSwapUint32(p, eventUnset, eventUpdating) || + (r.copied && atomic.CompareAndSwapUint32(p, eventStale, eventUpdating)) { + r.eventMap[idx] = event{event: e, status: status, info: info, time: time.Now()} + atomic.StoreUint32(p, eventRecorded) // done, make it visible to GetEvent + } else { + // eventRecorded? panic? + } } // NewEvent creates a new Event based on the given event, status and info. +// +// It's only used by ReportStreamEvent func NewEvent(statsEvent stats.Event, status stats.Status, info string) Event { eve := eventPool.Get().(*event) eve.event = statsEvent @@ -177,13 +207,11 @@ func (r *rpcStats) Panicked() (bool, interface{}) { // GetEvent implements the RPCStats interface. func (r *rpcStats) GetEvent(e stats.Event) Event { idx := e.Index() - r.RLock() - evt := r.eventMap[idx] - r.RUnlock() - if evt == nil || evt.IsNil() { - return nil + if atomic.LoadUint32(&r.eventStatus[idx])&eventRecorded != 0 { + // no need to check (*event).IsNil() ... it's useless + return &r.eventMap[idx] } - return evt + return nil } // Level implements the RPCStats interface. @@ -195,17 +223,23 @@ func (r *rpcStats) Level() stats.Level { // to pass through info of the first request to retrying requests. func (r *rpcStats) CopyForRetry() RPCStats { // Copied rpc stats is for request retrying and cannot be reused, so no need to get from pool. - nr := newRPCStats().(*rpcStats) - r.Lock() + nr := newRPCStats() + nr.level = r.level // it will be written before retry by client.Options + nr.copied = true // for GetEvent when status=eventStale + + // RPCStart is the only internal event we need to keep startIdx := int(stats.RPCStart.Index()) + // user-defined events start index userIdx := stats.PredefinedEventNum() for i := 0; i < len(nr.eventMap); i++ { // Ignore none RPCStart events to avoid incorrect tracing. if i == startIdx || i >= userIdx { - nr.eventMap[i] = r.eventMap[i] + if atomic.LoadUint32(&r.eventStatus[i]) == eventRecorded { + nr.eventMap[i] = r.eventMap[i] + nr.eventStatus[i] = eventStale + } } } - r.Unlock() return nr } @@ -262,10 +296,7 @@ func (r *rpcStats) Reset() { atomic.StoreUint64(&r.recvSize, 0) atomic.StoreUint64(&r.sendSize, 0) for i := range r.eventMap { - if r.eventMap[i] != nil { - r.eventMap[i].(*event).Recycle() - r.eventMap[i] = nil - } + r.eventStatus[i] = eventUnset // no need atomic.StoreUint32? } } @@ -279,12 +310,3 @@ func (r *rpcStats) Recycle() { r.Reset() rpcStatsPool.Put(r) } - -// NewRPCStats creates a new RPCStats. -func NewRPCStats() RPCStats { - once.Do(func() { - stats.FinishInitialization() - maxEventNum = stats.MaxEventNum() - }) - return rpcStatsPool.Get().(*rpcStats) -} diff --git a/pkg/rpcinfo/rpcstats_test.go b/pkg/rpcinfo/rpcstats_test.go index d0af2916c0..1b430fc9f9 100644 --- a/pkg/rpcinfo/rpcstats_test.go +++ b/pkg/rpcinfo/rpcstats_test.go @@ -88,6 +88,28 @@ func TestRPCStats(t *testing.T) { }) } +func TestRPCStats_Record(t *testing.T) { + ctx := context.Background() + s := rpcinfo.NewRPCStats() + rpcinfo.AsMutableRPCStats(s).SetLevel(stats.LevelDetailed) + test.Assert(t, s.GetEvent(stats.RPCStart) == nil) + s.Record(ctx, stats.RPCStart, stats.StatusInfo, "hello") + test.Assert(t, s.GetEvent(stats.RPCStart).Info() == "hello") + + // will not change, the event already recorded + s.Record(ctx, stats.RPCStart, stats.StatusInfo, "world") + test.Assert(t, s.GetEvent(stats.RPCStart).Info() == "hello") + + s = s.CopyForRetry() + test.Assert(t, s.GetEvent(stats.RPCStart).Info() == "hello") + + // it's from CopyForRetry, it can be changed once. + s.Record(ctx, stats.RPCStart, stats.StatusInfo, "world") + test.Assert(t, s.GetEvent(stats.RPCStart).Info() == "world") + s.Record(ctx, stats.RPCStart, stats.StatusInfo, "hello") + test.Assert(t, s.GetEvent(stats.RPCStart).Info() == "world") +} + func BenchmarkCopyForRetry(b *testing.B) { b.Run("BenchmarkNewRPCStats", func(b *testing.B) { for i := 0; i < b.N; i++ { diff --git a/pkg/rpcinfo/stats_util.go b/pkg/rpcinfo/stats_util.go index d16f7e131a..29094a1645 100644 --- a/pkg/rpcinfo/stats_util.go +++ b/pkg/rpcinfo/stats_util.go @@ -27,13 +27,17 @@ import ( // Record records the event to RPCStats. func Record(ctx context.Context, ri RPCInfo, event stats.Event, err error) { - if ctx == nil || ri.Stats() == nil { + if ctx == nil { + return + } + st := ri.Stats() + if st == nil { return } if err != nil { - ri.Stats().Record(ctx, event, stats.StatusError, err.Error()) + st.Record(ctx, event, stats.StatusError, err.Error()) } else { - ri.Stats().Record(ctx, event, stats.StatusInfo, "") + st.Record(ctx, event, stats.StatusInfo, "") } }