Skip to content

Commit

Permalink
fix(blooms): fix reference leak and resulting race condition in Bloom…
Browse files Browse the repository at this point in the history
…PageDecoder (grafana#12050)
  • Loading branch information
owen-d authored and rhnasc committed Apr 12, 2024
1 parent 2bdcb71 commit b688201
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 32 deletions.
41 changes: 21 additions & 20 deletions pkg/storage/bloom/v1/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ func (b *Bloom) Encode(enc *encoding.Encbuf) error {
// divide by 8 b/c bloom capacity is measured in bits, but we want bytes
buf := bytes.NewBuffer(BlockPool.Get(int(b.Capacity() / 8)))

// TODO(owen-d): have encoder implement writer directly so we don't need
// to indirect via a buffer
_, err := b.WriteTo(buf)
if err != nil {
return errors.Wrap(err, "encoding bloom filter")
Expand Down Expand Up @@ -56,7 +58,16 @@ func (b *Bloom) Decode(dec *encoding.Decbuf) error {
return nil
}

func LazyDecodeBloomPage(dec *encoding.Decbuf, pool chunkenc.ReaderPool, decompressedSize int) (*BloomPageDecoder, error) {
func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHeader) (*BloomPageDecoder, error) {
data := BlockPool.Get(page.Len)[:page.Len]
defer BlockPool.Put(data)

_, err := io.ReadFull(r, data)
if err != nil {
return nil, errors.Wrap(err, "reading bloom page")
}
dec := encoding.DecWith(data)

if err := dec.CheckCrc(castagnoliTable); err != nil {
return nil, errors.Wrap(err, "checksumming bloom page")
}
Expand All @@ -67,7 +78,7 @@ func LazyDecodeBloomPage(dec *encoding.Decbuf, pool chunkenc.ReaderPool, decompr
}
defer pool.PutReader(decompressor)

b := BlockPool.Get(decompressedSize)[:decompressedSize]
b := make([]byte, page.DecompressedLen)

if _, err = io.ReadFull(decompressor, b); err != nil {
return nil, errors.Wrap(err, "decompressing bloom page")
Expand Down Expand Up @@ -98,6 +109,13 @@ func NewBloomPageDecoder(data []byte) *BloomPageDecoder {
}

// Decoder is a seekable, reset-able iterator
// TODO(owen-d): use buffer pools. The reason we don't currently
// do this is because the `data` slice currently escapes the decoder
// via the returned bloom, so we can't know when it's safe to return it to the pool.
// This happens via `data ([]byte) -> dec (*encoding.Decbuf) -> bloom (Bloom)` where
// the final Bloom has a reference to the data slice.
// We could optimize this by encoding the mode (read, write) into our structs
// and doing copy-on-write shenannigans, but I'm avoiding this for now.
type BloomPageDecoder struct {
data []byte
dec *encoding.Decbuf
Expand All @@ -107,15 +125,6 @@ type BloomPageDecoder struct {
err error
}

// Drop returns the underlying byte slice to the pool
// for efficiency. It's intended to be used as a
// perf optimization prior to garbage collection.
func (d *BloomPageDecoder) Drop() {
if cap(d.data) > 0 {
BlockPool.Put(d.data)
}
}

func (d *BloomPageDecoder) Reset() {
d.err = nil
d.cur = nil
Expand Down Expand Up @@ -234,13 +243,5 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int) (*BloomPageD
return nil, errors.Wrap(err, "seeking to bloom page")
}

data := BlockPool.Get(page.Len)[:page.Len]
_, err := io.ReadFull(r, data)
if err != nil {
return nil, errors.Wrap(err, "reading bloom page")
}

dec := encoding.DecWith(data)

return LazyDecodeBloomPage(&dec, b.schema.DecompressorPool(), page.DecompressedLen)
return LazyDecodeBloomPage(r, b.schema.DecompressorPool(), page)
}
6 changes: 0 additions & 6 deletions pkg/storage/bloom/v1/bloom_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ func (it *LazyBloomIter) Seek(offset BloomOffset) {
// load the desired page
if it.curPageIndex != offset.Page || it.curPage == nil {

// drop the current page if it exists
if it.curPage != nil {
it.curPage.Drop()
}

r, err := it.b.reader.Blooms()
if err != nil {
it.err = errors.Wrap(err, "getting blooms reader")
Expand Down Expand Up @@ -103,7 +98,6 @@ func (it *LazyBloomIter) next() bool {
}
// we've exhausted the current page, progress to next
it.curPageIndex++
it.curPage.Drop()
it.curPage = nil
continue
}
Expand Down
7 changes: 1 addition & 6 deletions pkg/storage/bloom/v1/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,19 +173,14 @@ func (b *BlockIndex) NewSeriesPageDecoder(r io.ReadSeeker, header SeriesPageHead
return nil, errors.Wrap(err, "getting decompressor")
}

decompressed := BlockPool.Get(header.DecompressedLen)[:header.DecompressedLen]
decompressed := make([]byte, header.DecompressedLen)
if _, err = io.ReadFull(decompressor, decompressed); err != nil {
return nil, errors.Wrap(err, "decompressing series page")
}

// replace decoder's input with the now-decompressed data
dec.B = decompressed

res := &SeriesPageDecoder{
data: decompressed,
header: header.SeriesHeader,

i: -1,
}

res.Reset()
Expand Down

0 comments on commit b688201

Please sign in to comment.