Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tokenizer tests and TokenizeLine updates #11133

Merged
merged 16 commits into from
Nov 8, 2023
35 changes: 31 additions & 4 deletions pkg/storage/bloom/v1/bloom_tokenizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type BloomTokenizer struct {
}

const CacheSize = 150000
const DefaultNGramLength = 4
const DefaultNGramSkip = 0

// NewBloomTokenizer returns a new instance of the Bloom Tokenizer.
// Warning: the tokens returned use the same byte slice to reduce allocations. This has two consequences:
Expand All @@ -44,7 +46,7 @@ func NewBloomTokenizer(reg prometheus.Registerer) (*BloomTokenizer, error) {
metrics: newMetrics(reg),
}
t.cache = make(map[string]interface{}, CacheSize)
t.lineTokenizer = NewNGramTokenizer(4, 5, 0) // default to 4-grams, no skip
t.lineTokenizer = NewNGramTokenizer(DefaultNGramLength, DefaultNGramLength+1, DefaultNGramSkip) // default to 4-grams, no skip
t.chunkIDTokenizer = ChunkIDTokenizer(t.lineTokenizer)

level.Info(util_log.Logger).Log("bloom tokenizer created")
Expand All @@ -68,6 +70,7 @@ func clearCache(cache map[string]interface{}) {
}
}

// PopulateSeriesWithBloom is intended to be called on the write path, and is used to populate the bloom filter for a given series.
func (bt *BloomTokenizer) PopulateSeriesWithBloom(seriesWithBloom *SeriesWithBloom, chunks []chunk.Chunk) {
clearCache(bt.cache)
for idx := range chunks {
Expand Down Expand Up @@ -101,7 +104,7 @@ func (bt *BloomTokenizer) PopulateSeriesWithBloom(seriesWithBloom *SeriesWithBlo

seriesWithBloom.Bloom.ScalableBloomFilter.TestAndAdd(tok.Key)

if len(bt.cache) > 150000 { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other
if len(bt.cache) >= CacheSize { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other
clearCache(bt.cache)
}
}
Expand All @@ -116,6 +119,30 @@ func (bt *BloomTokenizer) PopulateSeriesWithBloom(seriesWithBloom *SeriesWithBlo
} // for each chunk
}

func (bt *BloomTokenizer) TokenizeLine(line string) []Token {
return bt.lineTokenizer.Tokens(line)
// SearchesForTokenizerAndLine is for taking a given search string (ex: on the read/query path) and returning
// all the possible tokens, given a tokenizer.
// This is a multi-dimensional slice where the first slice is the offset into the line, and the
// second slice is the tokens for that offset.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only true if all of the skip offsets return at least one token. Otherwise, the length of the result will be less than the number of skips+1.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, I've updated the doc accordingly, good catch

// The offset is used if the Tokenizer has a skip value being utilized.
func SearchesForTokenizerAndLine(t Tokenizer, line string) (res [][]Token) {
res = make([][]Token, 0, 10)
for i := range line { // iterate by runes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unnecessarily iterates all runes in the line, including offsets beyond skip+1.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack, added a break clause

if i < t.GetSkip()+1 {
tmpTokens := make([]Token, 0, 100)
tokens := t.Tokens(line[i:])
// As the way the tokenizer is coded, it will reuse its internal buffers,
// but we need to save the data, hence the need for copying
for _, token := range tokens {
tmpToken := Token{}
tmpToken.Key = make([]byte, len(token.Key))
copy(tmpToken.Key, token.Key)
tmpTokens = append(tmpTokens, tmpToken)
}
if len(tokens) > 0 {
res = append(res, tmpTokens)
}
}
}

return res
}
148 changes: 148 additions & 0 deletions pkg/storage/bloom/v1/bloom_tokenizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,159 @@ package v1

import (
"fmt"
"time"

"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/pkg/storage/chunk"

"testing"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/storage/bloom/v1/filter"

"github.com/prometheus/client_golang/prometheus"
)

func TestSetLineTokenizer(t *testing.T) {
bt, _ := NewBloomTokenizer(prometheus.DefaultRegisterer)

// Validate defaults
require.Equal(t, bt.lineTokenizer.GetMin(), DefaultNGramLength)
require.Equal(t, bt.lineTokenizer.GetMax(), DefaultNGramLength+1)
require.Equal(t, bt.lineTokenizer.GetSkip(), DefaultNGramSkip)

require.Equal(t, bt.chunkIDTokenizer.GetMin(), DefaultNGramLength)
require.Equal(t, bt.chunkIDTokenizer.GetMax(), DefaultNGramLength+1)
require.Equal(t, bt.chunkIDTokenizer.GetSkip(), DefaultNGramSkip)

// Set new tokenizer, and validate against that
bt.SetLineTokenizer(NewNGramTokenizer(6, 7, 2))
require.Equal(t, bt.lineTokenizer.GetMin(), 6)
require.Equal(t, bt.lineTokenizer.GetMax(), 7)
require.Equal(t, bt.lineTokenizer.GetSkip(), 2)

require.Equal(t, bt.chunkIDTokenizer.GetMin(), 6)
require.Equal(t, bt.chunkIDTokenizer.GetMax(), 7)
require.Equal(t, bt.chunkIDTokenizer.GetSkip(), 2)
}

func TestSearchesForTokenizerAndLine(t *testing.T) {
for _, tc := range []struct {
desc string
input string
t Tokenizer
exp [][]Token
}{
{
desc: "empty",
input: "",
t: four,
exp: [][]Token{},
},
{
desc: "single char",
input: "a",
t: four,
exp: [][]Token{},
},
{
desc: "four chars",
input: "abcd",
t: four,
exp: [][]Token{
{{Key: []byte("abcd")}}},
},
{
desc: "uuid partial",
input: "2b1a5e46-36a2-4",
t: four,
exp: [][]Token{{
{Key: []byte("2b1a")},
{Key: []byte("b1a5")},
{Key: []byte("1a5e")},
{Key: []byte("a5e4")},
{Key: []byte("5e46")},
{Key: []byte("e46-")},
{Key: []byte("46-3")},
{Key: []byte("6-36")},
{Key: []byte("-36a")},
{Key: []byte("36a2")},
{Key: []byte("6a2-")},
{Key: []byte("a2-4")}},
},
},
{
desc: "short special chars",
t: four,
input: "日本語",
exp: [][]Token{},
},
{
desc: "longer special chars",
t: four,
input: "日本語日本語",
exp: [][]Token{{
{Key: []byte("日本語日")},
{Key: []byte("本語日本")},
{Key: []byte("語日本語")}}},
},
} {
t.Run(tc.desc, func(t *testing.T) {
require.Equal(t, tc.exp, SearchesForTokenizerAndLine(tc.t, tc.input))
})
}

}

func TestPopulateSeriesWithBloom(t *testing.T) {
var testLine = "this is a log line"
bt, _ := NewBloomTokenizer(prometheus.DefaultRegisterer)

sbf := filter.NewScalableBloomFilter(1024, 0.01, 0.8)
var lbsList []labels.Labels
lbsList = append(lbsList, labels.FromStrings("foo", "bar"))

var fpList []model.Fingerprint
for i := range lbsList {
fpList = append(fpList, model.Fingerprint(lbsList[i].Hash()))
}

var memChunks = make([]*chunkenc.MemChunk, 0)
memChunk0 := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000)
_ = memChunk0.Append(&push.Entry{
Timestamp: time.Unix(0, 1),
Line: testLine,
})
memChunks = append(memChunks, memChunk0)

var chunks = make([]chunk.Chunk, 0)
for i := range memChunks {
chunks = append(chunks, chunk.NewChunk("user", fpList[i], lbsList[i], chunkenc.NewFacade(memChunks[i], 256000, 1500000), model.TimeFromUnixNano(0), model.TimeFromUnixNano(1)))
}

bloom := Bloom{
ScalableBloomFilter: *sbf,
}
series := Series{
Fingerprint: model.Fingerprint(lbsList[0].Hash()),
}
swb := SeriesWithBloom{
Bloom: &bloom,
Series: &series,
}

bt.PopulateSeriesWithBloom(&swb, chunks)
tokens := SearchesForTokenizerAndLine(four, testLine)
for _, token := range tokens[0] {
require.True(t, swb.Bloom.Test(token.Key))
}
}

func BenchmarkMapClear(b *testing.B) {
bt, _ := NewBloomTokenizer(prometheus.DefaultRegisterer)
for i := 0; i < b.N; i++ {
Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/bloom/v1/tokenizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,16 @@ func ChunkIDTokenizer(t Tokenizer) *WrappedTokenizer {
}
}

func zeroBuffer(buf []byte) {
for i := range buf {
buf[i] = 0
}
}

func (w *WrappedTokenizer) Reinit(chk logproto.ChunkRef) {
w.prefix = w.prefix[:0]
zeroBuffer(w.i64buf)
zeroBuffer(w.i32buf)

binary.PutVarint(w.i64buf, int64(chk.From))
w.prefix = append(w.prefix, w.i64buf...)
Expand Down
35 changes: 14 additions & 21 deletions tools/tsdb/bloom-tester/readlib.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func analyzeRead(metrics *Metrics, sampler Sampler, shipper indexshipper.IndexSh
//searchString := os.Getenv("SEARCH_STRING")
//147854,148226,145541,145603,147159,147836,145551,145599,147393,147841,145265,145620,146181,147225,147167,146131,146189,146739,147510,145572,146710,148031,29,146205,147175,146984,147345
//mytenants := []string{"29"}
bloomTokenizer, _ := bt.NewBloomTokenizer(prometheus.DefaultRegisterer)
for _, tenant := range tenants {
level.Info(util_log.Logger).Log("Analyzing tenant", tenant, "table", tableName)
err := shipper.ForEach(
Expand Down Expand Up @@ -199,22 +200,15 @@ func analyzeRead(metrics *Metrics, sampler Sampler, shipper indexshipper.IndexSh
tenant,
ls.String(),
objectClient)
bloomTokenizer.SetLineTokenizer(experiment.tokenizer)
for gotIdx := range got { // for every chunk
for _, queryExperiment := range queryExperiments { // for each search string
if len(queryExperiment.searchString) >= experiment.tokenizer.GetMin()+experiment.tokenizer.GetSkip() {

foundInChunk := false
foundInSbf := false

chunkTokenizer := bt.ChunkIDTokenizer(experiment.tokenizer)

chunkTokenizer.Reinit(got[gotIdx].ChunkRef)
var tokenizer bt.Tokenizer = chunkTokenizer
if !experiment.encodeChunkID {
tokenizer = experiment.tokenizer
}

foundInSbf = searchSbf(sbf, tokenizer, queryExperiment.searchString)
foundInSbf = searchSbf(sbf, experiment.tokenizer, queryExperiment.searchString)

lc := got[gotIdx].Data.(*chunkenc.Facade).LokiChunk()

Expand Down Expand Up @@ -313,22 +307,21 @@ func readSBFFromObjectStorage(location, prefix, period, tenant, series string, o
}

func searchSbf(sbf *filter.ScalableBloomFilter, tokenizer bt.Tokenizer, searchString string) bool {
for i := 0; i <= tokenizer.GetSkip(); i++ {
tokens := bt.SearchesForTokenizerAndLine(tokenizer, searchString)
for _, tokenSet := range tokens {
numMatches := 0
if (len(searchString) - i) >= tokenizer.GetMin() {
tokens := tokenizer.Tokens(searchString[i:])

for _, token := range tokens {
if sbf.Test(token.Key) {
numMatches++
}
for _, token := range tokenSet {
if sbf.Test(token.Key) {
numMatches++
}
if numMatches > 0 {
if numMatches == len(tokens) {
return true
}
}
if numMatches > 0 {
if numMatches == len(tokenSet) {
return true
}
}

}

return false
}
17 changes: 8 additions & 9 deletions tools/tsdb/bloom-tester/readlib_test.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
package main

import (
bt "github.com/grafana/loki/pkg/storage/bloom/v1"
"testing"

"github.com/stretchr/testify/require"
)

func TestSearchSbf(t *testing.T) {
tokenizer := four

searchString := "trace"

experiment := NewExperiment(
"token=4skip0_error=1%_indexchunks=true",
tokenizer,
four,
true,
onePctError,
)
Expand Down Expand Up @@ -69,11 +66,13 @@ func TestSearchSbf(t *testing.T) {
} {
t.Run(tc.desc, func(t *testing.T) {
sbf := experiment.bloom()
tokens := tokenizer.Tokens(tc.inputLine)
for _, token := range tokens {
sbf.Add(token.Key)
tokens := bt.SearchesForTokenizerAndLine(four, tc.inputLine)
for _, tokenSet := range tokens {
for _, token := range tokenSet {
sbf.Add(token.Key)
}
}
require.Equal(t, tc.exp, searchSbf(sbf, tokenizer, searchString))
require.Equal(t, tc.exp, searchSbf(sbf, four, tc.inputSearch))
})
}
}
Loading