diff --git a/br/pkg/lightning/backend/kv/session.go b/br/pkg/lightning/backend/kv/session.go index fb49514bcd879..1fc45e498929c 100644 --- a/br/pkg/lightning/backend/kv/session.go +++ b/br/pkg/lightning/backend/kv/session.go @@ -37,6 +37,8 @@ import ( "go.uber.org/zap" ) +const maxAvailableBufSize int = 20 + // invalidIterator is a trimmed down Iterator type which is invalid. type invalidIterator struct { kv.Iterator @@ -91,6 +93,12 @@ func (mb *kvMemBuf) Recycle(buf *bytesBuf) { buf.idx = 0 buf.cap = len(buf.buf) mb.Lock() + if len(mb.availableBufs) >= maxAvailableBufSize { + // too many byte buffers, evict one byte buffer and continue + evictedByteBuf := mb.availableBufs[0] + evictedByteBuf.destroy() + mb.availableBufs = mb.availableBufs[1:] + } mb.availableBufs = append(mb.availableBufs, buf) mb.Unlock() } @@ -98,8 +106,20 @@ func (mb *kvMemBuf) Recycle(buf *bytesBuf) { func (mb *kvMemBuf) AllocateBuf(size int) { mb.Lock() size = utils.MaxInt(units.MiB, int(utils.NextPowerOfTwo(int64(size)))*2) - if len(mb.availableBufs) > 0 && mb.availableBufs[0].cap >= size { - mb.buf = mb.availableBufs[0] + var ( + existingBuf *bytesBuf + existingBufIdx int + ) + for i, buf := range mb.availableBufs { + if buf.cap >= size { + existingBuf = buf + existingBufIdx = i + break + } + } + if existingBuf != nil { + mb.buf = existingBuf + mb.availableBufs[existingBufIdx] = mb.availableBufs[0] mb.availableBufs = mb.availableBufs[1:] } else { mb.buf = newBytesBuf(size) diff --git a/br/pkg/lightning/backend/kv/session_test.go b/br/pkg/lightning/backend/kv/session_test.go index 2917a6afa747e..d1ac0292067a3 100644 --- a/br/pkg/lightning/backend/kv/session_test.go +++ b/br/pkg/lightning/backend/kv/session_test.go @@ -17,6 +17,7 @@ package kv import ( "testing" + "github.com/docker/go-units" . "github.com/pingcap/check" "github.com/pingcap/tidb/parser/mysql" ) @@ -34,3 +35,103 @@ func (s *kvSuite) TestSession(c *C) { _, err := session.Txn(true) c.Assert(err, IsNil) } + +func (s *kvSuite) TestKVMemBufInterweaveAllocAndRecycle(c *C) { + type testCase struct { + AllocSizes []int + FinalAvailableByteBufCaps []int + } + for _, tc := range []testCase{ + { + AllocSizes: []int{ + 1 * units.MiB, + 2 * units.MiB, + 3 * units.MiB, + 4 * units.MiB, + 5 * units.MiB, + }, + // [2] => [2,4] => [2,4,8] => [4,2,8] => [4,2,8,16] + FinalAvailableByteBufCaps: []int{ + 4 * units.MiB, + 2 * units.MiB, + 8 * units.MiB, + 16 * units.MiB, + }, + }, + { + AllocSizes: []int{ + 5 * units.MiB, + 4 * units.MiB, + 3 * units.MiB, + 2 * units.MiB, + 1 * units.MiB, + }, + // [16] => [16] => [16] => [16] => [16] + FinalAvailableByteBufCaps: []int{16 * units.MiB}, + }, + { + AllocSizes: []int{5, 4, 3, 2, 1}, + // [1] => [1] => [1] => [1] => [1] + FinalAvailableByteBufCaps: []int{1 * units.MiB}, + }, + { + AllocSizes: []int{ + 1 * units.MiB, + 2 * units.MiB, + 3 * units.MiB, + 2 * units.MiB, + 1 * units.MiB, + 5 * units.MiB, + }, + // [2] => [2,4] => [2,4,8] => [2,8,4] => [8,4,2] => [8,4,2,16] + FinalAvailableByteBufCaps: []int{ + 8 * units.MiB, + 4 * units.MiB, + 2 * units.MiB, + 16 * units.MiB, + }, + }, + } { + testKVMemBuf := &kvMemBuf{} + for _, allocSize := range tc.AllocSizes { + testKVMemBuf.AllocateBuf(allocSize) + testKVMemBuf.Recycle(testKVMemBuf.buf) + } + c.Assert(len(testKVMemBuf.availableBufs), Equals, len(tc.FinalAvailableByteBufCaps)) + for i, bb := range testKVMemBuf.availableBufs { + c.Assert(bb.cap, Equals, tc.FinalAvailableByteBufCaps[i]) + } + } +} + +func (s *kvSuite) TestKVMemBufBatchAllocAndRecycle(c *C) { + testKVMemBuf := &kvMemBuf{} + bBufs := []*bytesBuf{} + for i := 0; i < maxAvailableBufSize; i++ { + testKVMemBuf.AllocateBuf(1 * units.MiB) + bBufs = append(bBufs, testKVMemBuf.buf) + } + for i := 0; i < maxAvailableBufSize; i++ { + testKVMemBuf.AllocateBuf(2 * units.MiB) + bBufs = append(bBufs, testKVMemBuf.buf) + } + for _, bb := range bBufs { + testKVMemBuf.Recycle(bb) + } + c.Assert(len(testKVMemBuf.availableBufs), Equals, maxAvailableBufSize) + for _, bb := range testKVMemBuf.availableBufs { + c.Assert(bb.cap, Equals, 4*units.MiB) + } + bBufs = bBufs[:0] + for i := 0; i < maxAvailableBufSize; i++ { + testKVMemBuf.AllocateBuf(1 * units.MiB) + bb := testKVMemBuf.buf + c.Assert(bb.cap, Equals, 4*units.MiB) + bBufs = append(bBufs, bb) + c.Assert(len(testKVMemBuf.availableBufs), Equals, maxAvailableBufSize-i-1) + } + for _, bb := range bBufs { + testKVMemBuf.Recycle(bb) + } + c.Assert(len(testKVMemBuf.availableBufs), Equals, maxAvailableBufSize) +}