Skip to content

Commit

Permalink
fix(blooms): Match series to newest block only
Browse files Browse the repository at this point in the history
based on meta's TSDB timestamp

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed Dec 18, 2024
1 parent ad85cc2 commit a1aac43
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 28 deletions.
63 changes: 38 additions & 25 deletions pkg/bloomgateway/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bloomgateway

import (
"context"
"slices"
"sort"
"time"

Expand Down Expand Up @@ -61,36 +62,48 @@ func (r *defaultBlockResolver) Resolve(ctx context.Context, tenant string, inter
}

func blocksMatchingSeries(metas []bloomshipper.Meta, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) []blockWithSeries {
result := make([]blockWithSeries, 0, len(metas))

for _, meta := range metas {
for _, block := range meta.Blocks {
slices.SortFunc(series, func(a, b *logproto.GroupedChunkRefs) int { return int(a.Fingerprint - b.Fingerprint) })

// skip blocks that are not within time interval
if !interval.Overlaps(block.Interval()) {
continue
result := make([]blockWithSeries, 0, len(metas))
cache := make(map[bloomshipper.BlockRef]int)

// find the newest block for each series
for _, s := range series {
var b *bloomshipper.BlockRef
var ts time.Time

for i := range metas {
for j := range metas[i].Blocks {
block := metas[i].Blocks[j]
version := metas[i].Sources[j].TS
// skip blocks that are not within time interval
if !interval.Overlaps(block.Interval()) {
continue
}
// skip blocks that do not contain the series
if block.Cmp(s.Fingerprint) != v1.Overlap {
continue
}
// only use the block if it is newer than the previous
if version.After(ts) {
b = &block
ts = version
}
}
}

min := sort.Search(len(series), func(i int) bool {
return block.Cmp(series[i].Fingerprint) > v1.Before
})

max := sort.Search(len(series), func(i int) bool {
return block.Cmp(series[i].Fingerprint) == v1.After
})

// All fingerprints fall outside of the consumer's range
if min == len(series) || max == 0 || min == max {
continue
}
if b == nil {
continue
}

// At least one fingerprint is within bounds of the blocks
// so append to results
dst := make([]*logproto.GroupedChunkRefs, max-min)
_ = copy(dst, series[min:max])
idx, ok := cache[*b]
if ok {
result[idx].series = append(result[idx].series, s)
} else {
cache[*b] = len(result)
result = append(result, blockWithSeries{
block: block,
series: dst,
block: *b,
series: []*logproto.GroupedChunkRefs{s},
})
}
}
Expand Down
16 changes: 13 additions & 3 deletions pkg/bloomgateway/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/grafana/loki/v3/pkg/logproto"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
)

func makeBlockRef(minFp, maxFp model.Fingerprint, from, through model.Time) bloomshipper.BlockRef {
Expand All @@ -28,6 +29,9 @@ func makeMeta(minFp, maxFp model.Fingerprint, from, through model.Time) bloomshi
Blocks: []bloomshipper.BlockRef{
makeBlockRef(minFp, maxFp, from, through),
},
Sources: []tsdb.SingleTenantTSDBIdentifier{
{TS: through.Time()},
},
}
}

Expand Down Expand Up @@ -100,14 +104,20 @@ func TestBlockResolver_BlocksMatchingSeries(t *testing.T) {

t.Run("multiple overlapping blocks within time range covering full keyspace", func(t *testing.T) {
metas := []bloomshipper.Meta{
makeMeta(0x00, 0xdf, 1000, 1999),
makeMeta(0xc0, 0xff, 1000, 1999),
makeMeta(0x00, 0xdf, 1000, 1499),
makeMeta(0xc0, 0xff, 1500, 1999),
}
res := blocksMatchingSeries(metas, interval, series)
for i := range res {
t.Logf("%s", res[i].block)
for j := range res[i].series {
t.Logf(" %016x", res[i].series[j].Fingerprint)
}
}
expected := []blockWithSeries{
{
block: metas[0].Blocks[0],
series: series[0:4],
series: series[0:2], // series 0x00c0 and 0x00d0 are covered in the newer block
},
{
block: metas[1].Blocks[0],
Expand Down

0 comments on commit a1aac43

Please sign in to comment.