Skip to content

Commit

Permalink
perf(rpcinfo): rm lock for rpcStats (cloudwego#1557)
Browse files Browse the repository at this point in the history
* use atomic for updating and retrieving events
* use embedded events. No pool needed for rpcStats
  • Loading branch information
xiaost authored Sep 27, 2024
1 parent ff5c245 commit 4943f34
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 51 deletions.
118 changes: 70 additions & 48 deletions pkg/rpcinfo/rpcstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ import (
)

var (
_ RPCStats = (*rpcStats)(nil)
_ MutableRPCStats = &rpcStats{}
_ internal.Reusable = (*rpcStats)(nil)
_ internal.Reusable = (*event)(nil)
rpcStatsPool sync.Pool
eventPool sync.Pool
once sync.Once
maxEventNum int
_ RPCStats = (*rpcStats)(nil)
_ MutableRPCStats = (*rpcStats)(nil)
_ internal.Reusable = (*rpcStats)(nil)
_ internal.Reusable = (*event)(nil)

rpcStatsPool = sync.Pool{New: func() interface{} { return newRPCStats() }}
eventPool = sync.Pool{New: func() interface{} { return &event{} }}

once sync.Once
maxEventNum int
)

type event struct {
Expand Down Expand Up @@ -69,10 +71,6 @@ func (e *event) IsNil() bool {
return e == nil
}

func newEvent() interface{} {
return &event{}
}

func (e *event) zero() {
e.event = nil
e.status = 0
Expand All @@ -95,10 +93,10 @@ type atomicPanicErr struct {
}

type rpcStats struct {
sync.RWMutex
level stats.Level

eventMap []Event
eventStatus []uint32 // see comments of eventXXX consts below
eventMap []event

sendSize uint64
lastSendSize uint64 // for Streaming APIs, record the size of the last sent message
Expand All @@ -107,32 +105,64 @@ type rpcStats struct {

err atomic.Value
panicErr atomic.Value
}

func init() {
rpcStatsPool.New = newRPCStats
eventPool.New = newEvent
}
// true if rpcStats is from CopyForRetry
copied bool
}

const ( // for (*rpcStats).eventStatus
eventUnset uint32 = 0b0000 // unset
eventUpdating uint32 = 0b0001 // updating, it's considered to be unset
eventRecorded uint32 = 0b0010 // updated, GetEvent will return the event

// eventStale is only set by CopyForRetry,
// it represents data is recorded and copied from last rpcstat.
//
// FIXME:
// it may be overwritten later and would cause an issue below:
// - before retry, A, B are recorded, and user may use (B - A) for measuring latency.
// - after retry, only A is recorded, then (B - A) will be a negative number.
// - this is NOT a new issue introduced by eventStatus but caused by CopyForRetry.
// it may also cause data race issue if Record and GetEvent at the same time.
eventStale uint32 = 0b0100 | eventRecorded
)

func newRPCStats() interface{} {
func newRPCStats() *rpcStats {
return &rpcStats{
eventMap: make([]Event, maxEventNum),
eventStatus: make([]uint32, maxEventNum),
eventMap: make([]event, maxEventNum),
}
}

// NewRPCStats creates a new RPCStats.
func NewRPCStats() RPCStats {
once.Do(func() {
stats.FinishInitialization()
maxEventNum = stats.MaxEventNum()
})
return rpcStatsPool.Get().(*rpcStats)
}

// Record implements the RPCStats interface.
// It only record once for each event, it will ignore follow events with the same Index()
func (r *rpcStats) Record(ctx context.Context, e stats.Event, status stats.Status, info string) {
if e.Level() > r.level {
return
}
eve := NewEvent(e, status, info)
idx := e.Index()
r.Lock()
r.eventMap[idx] = eve
r.Unlock()
p := &r.eventStatus[idx]
if atomic.CompareAndSwapUint32(p, eventUnset, eventUpdating) ||
(r.copied && atomic.CompareAndSwapUint32(p, eventStale, eventUpdating)) {
r.eventMap[idx] = event{event: e, status: status, info: info, time: time.Now()}
atomic.StoreUint32(p, eventRecorded) // done, make it visible to GetEvent
} else {
// eventRecorded? panic?
}
}

// NewEvent creates a new Event based on the given event, status and info.
//
// It's only used by ReportStreamEvent
func NewEvent(statsEvent stats.Event, status stats.Status, info string) Event {
eve := eventPool.Get().(*event)
eve.event = statsEvent
Expand Down Expand Up @@ -177,13 +207,11 @@ func (r *rpcStats) Panicked() (bool, interface{}) {
// GetEvent implements the RPCStats interface.
func (r *rpcStats) GetEvent(e stats.Event) Event {
idx := e.Index()
r.RLock()
evt := r.eventMap[idx]
r.RUnlock()
if evt == nil || evt.IsNil() {
return nil
if atomic.LoadUint32(&r.eventStatus[idx])&eventRecorded != 0 {
// no need to check (*event).IsNil() ... it's useless
return &r.eventMap[idx]
}
return evt
return nil
}

// Level implements the RPCStats interface.
Expand All @@ -195,17 +223,23 @@ func (r *rpcStats) Level() stats.Level {
// to pass through info of the first request to retrying requests.
func (r *rpcStats) CopyForRetry() RPCStats {
// Copied rpc stats is for request retrying and cannot be reused, so no need to get from pool.
nr := newRPCStats().(*rpcStats)
r.Lock()
nr := newRPCStats()
nr.level = r.level // it will be written before retry by client.Options
nr.copied = true // for GetEvent when status=eventStale

// RPCStart is the only internal event we need to keep
startIdx := int(stats.RPCStart.Index())
// user-defined events start index
userIdx := stats.PredefinedEventNum()
for i := 0; i < len(nr.eventMap); i++ {
// Ignore none RPCStart events to avoid incorrect tracing.
if i == startIdx || i >= userIdx {
nr.eventMap[i] = r.eventMap[i]
if atomic.LoadUint32(&r.eventStatus[i]) == eventRecorded {
nr.eventMap[i] = r.eventMap[i]
nr.eventStatus[i] = eventStale
}
}
}
r.Unlock()
return nr
}

Expand Down Expand Up @@ -262,10 +296,7 @@ func (r *rpcStats) Reset() {
atomic.StoreUint64(&r.recvSize, 0)
atomic.StoreUint64(&r.sendSize, 0)
for i := range r.eventMap {
if r.eventMap[i] != nil {
r.eventMap[i].(*event).Recycle()
r.eventMap[i] = nil
}
r.eventStatus[i] = eventUnset // no need atomic.StoreUint32?
}
}

Expand All @@ -279,12 +310,3 @@ func (r *rpcStats) Recycle() {
r.Reset()
rpcStatsPool.Put(r)
}

// NewRPCStats creates a new RPCStats.
func NewRPCStats() RPCStats {
once.Do(func() {
stats.FinishInitialization()
maxEventNum = stats.MaxEventNum()
})
return rpcStatsPool.Get().(*rpcStats)
}
22 changes: 22 additions & 0 deletions pkg/rpcinfo/rpcstats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,28 @@ func TestRPCStats(t *testing.T) {
})
}

func TestRPCStats_Record(t *testing.T) {
ctx := context.Background()
s := rpcinfo.NewRPCStats()
rpcinfo.AsMutableRPCStats(s).SetLevel(stats.LevelDetailed)
test.Assert(t, s.GetEvent(stats.RPCStart) == nil)
s.Record(ctx, stats.RPCStart, stats.StatusInfo, "hello")
test.Assert(t, s.GetEvent(stats.RPCStart).Info() == "hello")

// will not change, the event already recorded
s.Record(ctx, stats.RPCStart, stats.StatusInfo, "world")
test.Assert(t, s.GetEvent(stats.RPCStart).Info() == "hello")

s = s.CopyForRetry()
test.Assert(t, s.GetEvent(stats.RPCStart).Info() == "hello")

// it's from CopyForRetry, it can be changed once.
s.Record(ctx, stats.RPCStart, stats.StatusInfo, "world")
test.Assert(t, s.GetEvent(stats.RPCStart).Info() == "world")
s.Record(ctx, stats.RPCStart, stats.StatusInfo, "hello")
test.Assert(t, s.GetEvent(stats.RPCStart).Info() == "world")
}

func BenchmarkCopyForRetry(b *testing.B) {
b.Run("BenchmarkNewRPCStats", func(b *testing.B) {
for i := 0; i < b.N; i++ {
Expand Down
10 changes: 7 additions & 3 deletions pkg/rpcinfo/stats_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@ import (

// Record records the event to RPCStats.
func Record(ctx context.Context, ri RPCInfo, event stats.Event, err error) {
if ctx == nil || ri.Stats() == nil {
if ctx == nil {
return
}
st := ri.Stats()
if st == nil {
return
}
if err != nil {
ri.Stats().Record(ctx, event, stats.StatusError, err.Error())
st.Record(ctx, event, stats.StatusError, err.Error())
} else {
ri.Stats().Record(ctx, event, stats.StatusInfo, "")
st.Record(ctx, event, stats.StatusInfo, "")
}
}

Expand Down

0 comments on commit 4943f34

Please sign in to comment.