diff --git a/pkg/storage/bloom/v1/bloom_tokenizer.go b/pkg/storage/bloom/v1/bloom_tokenizer.go index 7e194405be264..26ebd63006383 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer.go @@ -33,6 +33,8 @@ type BloomTokenizer struct { } const CacheSize = 150000 +const DefaultNGramLength = 4 +const DefaultNGramSkip = 0 // NewBloomTokenizer returns a new instance of the Bloom Tokenizer. // Warning: the tokens returned use the same byte slice to reduce allocations. This has two consequences: @@ -44,7 +46,7 @@ func NewBloomTokenizer(reg prometheus.Registerer) (*BloomTokenizer, error) { metrics: newMetrics(reg), } t.cache = make(map[string]interface{}, CacheSize) - t.lineTokenizer = NewNGramTokenizer(4, 5, 0) // default to 4-grams, no skip + t.lineTokenizer = NewNGramTokenizer(DefaultNGramLength, DefaultNGramLength+1, DefaultNGramSkip) // default to 4-grams, no skip t.chunkIDTokenizer = ChunkIDTokenizer(t.lineTokenizer) level.Info(util_log.Logger).Log("bloom tokenizer created") @@ -68,6 +70,7 @@ func clearCache(cache map[string]interface{}) { } } +// PopulateSeriesWithBloom is intended to be called on the write path, and is used to populate the bloom filter for a given series. func (bt *BloomTokenizer) PopulateSeriesWithBloom(seriesWithBloom *SeriesWithBloom, chunks []chunk.Chunk) { clearCache(bt.cache) for idx := range chunks { @@ -101,7 +104,7 @@ func (bt *BloomTokenizer) PopulateSeriesWithBloom(seriesWithBloom *SeriesWithBlo seriesWithBloom.Bloom.ScalableBloomFilter.TestAndAdd(tok.Key) - if len(bt.cache) > 150000 { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other + if len(bt.cache) >= CacheSize { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other clearCache(bt.cache) } } @@ -116,6 +119,32 @@ func (bt *BloomTokenizer) PopulateSeriesWithBloom(seriesWithBloom *SeriesWithBlo } // for each chunk } -func (bt *BloomTokenizer) TokenizeLine(line string) []Token { - return bt.lineTokenizer.Tokens(line) +// SearchesForTokenizerAndLine is for taking a given search string (ex: on the read/query path) and returning +// all the possible tokens, given a tokenizer. +// This is a multi-dimensional slice where the first slice is the offset into the line, and the +// second slice is the tokens for that offset. If an offset into the line returns no tokens, this first dimension +// will be less than 1 + the number of skips specified in the tokenizer +// The offset is used if the Tokenizer has a skip value being utilized. +func SearchesForTokenizerAndLine(t Tokenizer, line string) (res [][]Token) { + res = make([][]Token, 0, 10) + for i := range line { // iterate by runes + if i >= t.GetSkip()+1 { + break + } + tmpTokens := make([]Token, 0, 100) + tokens := t.Tokens(line[i:]) + // As the way the tokenizer is coded, it will reuse its internal buffers, + // but we need to save the data, hence the need for copying + for _, token := range tokens { + tmpToken := Token{} + tmpToken.Key = make([]byte, len(token.Key)) + copy(tmpToken.Key, token.Key) + tmpTokens = append(tmpTokens, tmpToken) + } + if len(tokens) > 0 { + res = append(res, tmpTokens) + } + } + + return res } diff --git a/pkg/storage/bloom/v1/bloom_tokenizer_test.go b/pkg/storage/bloom/v1/bloom_tokenizer_test.go index eaff6c783771b..034301f88c1aa 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer_test.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer_test.go @@ -2,11 +2,159 @@ package v1 import ( "fmt" + "time" + + "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/push" + "github.com/grafana/loki/pkg/storage/chunk" + "testing" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/storage/bloom/v1/filter" + "github.com/prometheus/client_golang/prometheus" ) +func TestSetLineTokenizer(t *testing.T) { + bt, _ := NewBloomTokenizer(prometheus.DefaultRegisterer) + + // Validate defaults + require.Equal(t, bt.lineTokenizer.GetMin(), DefaultNGramLength) + require.Equal(t, bt.lineTokenizer.GetMax(), DefaultNGramLength+1) + require.Equal(t, bt.lineTokenizer.GetSkip(), DefaultNGramSkip) + + require.Equal(t, bt.chunkIDTokenizer.GetMin(), DefaultNGramLength) + require.Equal(t, bt.chunkIDTokenizer.GetMax(), DefaultNGramLength+1) + require.Equal(t, bt.chunkIDTokenizer.GetSkip(), DefaultNGramSkip) + + // Set new tokenizer, and validate against that + bt.SetLineTokenizer(NewNGramTokenizer(6, 7, 2)) + require.Equal(t, bt.lineTokenizer.GetMin(), 6) + require.Equal(t, bt.lineTokenizer.GetMax(), 7) + require.Equal(t, bt.lineTokenizer.GetSkip(), 2) + + require.Equal(t, bt.chunkIDTokenizer.GetMin(), 6) + require.Equal(t, bt.chunkIDTokenizer.GetMax(), 7) + require.Equal(t, bt.chunkIDTokenizer.GetSkip(), 2) +} + +func TestSearchesForTokenizerAndLine(t *testing.T) { + for _, tc := range []struct { + desc string + input string + t Tokenizer + exp [][]Token + }{ + { + desc: "empty", + input: "", + t: four, + exp: [][]Token{}, + }, + { + desc: "single char", + input: "a", + t: four, + exp: [][]Token{}, + }, + { + desc: "four chars", + input: "abcd", + t: four, + exp: [][]Token{ + {{Key: []byte("abcd")}}}, + }, + { + desc: "uuid partial", + input: "2b1a5e46-36a2-4", + t: four, + exp: [][]Token{{ + {Key: []byte("2b1a")}, + {Key: []byte("b1a5")}, + {Key: []byte("1a5e")}, + {Key: []byte("a5e4")}, + {Key: []byte("5e46")}, + {Key: []byte("e46-")}, + {Key: []byte("46-3")}, + {Key: []byte("6-36")}, + {Key: []byte("-36a")}, + {Key: []byte("36a2")}, + {Key: []byte("6a2-")}, + {Key: []byte("a2-4")}}, + }, + }, + { + desc: "short special chars", + t: four, + input: "日本語", + exp: [][]Token{}, + }, + { + desc: "longer special chars", + t: four, + input: "日本語日本語", + exp: [][]Token{{ + {Key: []byte("日本語日")}, + {Key: []byte("本語日本")}, + {Key: []byte("語日本語")}}}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + require.Equal(t, tc.exp, SearchesForTokenizerAndLine(tc.t, tc.input)) + }) + } + +} + +func TestPopulateSeriesWithBloom(t *testing.T) { + var testLine = "this is a log line" + bt, _ := NewBloomTokenizer(prometheus.DefaultRegisterer) + + sbf := filter.NewScalableBloomFilter(1024, 0.01, 0.8) + var lbsList []labels.Labels + lbsList = append(lbsList, labels.FromStrings("foo", "bar")) + + var fpList []model.Fingerprint + for i := range lbsList { + fpList = append(fpList, model.Fingerprint(lbsList[i].Hash())) + } + + var memChunks = make([]*chunkenc.MemChunk, 0) + memChunk0 := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) + _ = memChunk0.Append(&push.Entry{ + Timestamp: time.Unix(0, 1), + Line: testLine, + }) + memChunks = append(memChunks, memChunk0) + + var chunks = make([]chunk.Chunk, 0) + for i := range memChunks { + chunks = append(chunks, chunk.NewChunk("user", fpList[i], lbsList[i], chunkenc.NewFacade(memChunks[i], 256000, 1500000), model.TimeFromUnixNano(0), model.TimeFromUnixNano(1))) + } + + bloom := Bloom{ + ScalableBloomFilter: *sbf, + } + series := Series{ + Fingerprint: model.Fingerprint(lbsList[0].Hash()), + } + swb := SeriesWithBloom{ + Bloom: &bloom, + Series: &series, + } + + bt.PopulateSeriesWithBloom(&swb, chunks) + tokens := SearchesForTokenizerAndLine(four, testLine) + for _, token := range tokens[0] { + require.True(t, swb.Bloom.Test(token.Key)) + } +} + func BenchmarkMapClear(b *testing.B) { bt, _ := NewBloomTokenizer(prometheus.DefaultRegisterer) for i := 0; i < b.N; i++ { diff --git a/pkg/storage/bloom/v1/tokenizer.go b/pkg/storage/bloom/v1/tokenizer.go index 22da439f07f42..96e51f2cd0488 100644 --- a/pkg/storage/bloom/v1/tokenizer.go +++ b/pkg/storage/bloom/v1/tokenizer.go @@ -150,8 +150,16 @@ func ChunkIDTokenizer(t Tokenizer) *WrappedTokenizer { } } +func zeroBuffer(buf []byte) { + for i := range buf { + buf[i] = 0 + } +} + func (w *WrappedTokenizer) Reinit(chk logproto.ChunkRef) { w.prefix = w.prefix[:0] + zeroBuffer(w.i64buf) + zeroBuffer(w.i32buf) binary.PutVarint(w.i64buf, int64(chk.From)) w.prefix = append(w.prefix, w.i64buf...) diff --git a/tools/tsdb/bloom-tester/readlib.go b/tools/tsdb/bloom-tester/readlib.go index ee0456a6e3ccd..eaca7a38c15bd 100644 --- a/tools/tsdb/bloom-tester/readlib.go +++ b/tools/tsdb/bloom-tester/readlib.go @@ -126,6 +126,7 @@ func analyzeRead(metrics *Metrics, sampler Sampler, shipper indexshipper.IndexSh //searchString := os.Getenv("SEARCH_STRING") //147854,148226,145541,145603,147159,147836,145551,145599,147393,147841,145265,145620,146181,147225,147167,146131,146189,146739,147510,145572,146710,148031,29,146205,147175,146984,147345 //mytenants := []string{"29"} + bloomTokenizer, _ := bt.NewBloomTokenizer(prometheus.DefaultRegisterer) for _, tenant := range tenants { level.Info(util_log.Logger).Log("Analyzing tenant", tenant, "table", tableName) err := shipper.ForEach( @@ -199,6 +200,7 @@ func analyzeRead(metrics *Metrics, sampler Sampler, shipper indexshipper.IndexSh tenant, ls.String(), objectClient) + bloomTokenizer.SetLineTokenizer(experiment.tokenizer) for gotIdx := range got { // for every chunk for _, queryExperiment := range queryExperiments { // for each search string if len(queryExperiment.searchString) >= experiment.tokenizer.GetMin()+experiment.tokenizer.GetSkip() { @@ -206,15 +208,7 @@ func analyzeRead(metrics *Metrics, sampler Sampler, shipper indexshipper.IndexSh foundInChunk := false foundInSbf := false - chunkTokenizer := bt.ChunkIDTokenizer(experiment.tokenizer) - - chunkTokenizer.Reinit(got[gotIdx].ChunkRef) - var tokenizer bt.Tokenizer = chunkTokenizer - if !experiment.encodeChunkID { - tokenizer = experiment.tokenizer - } - - foundInSbf = searchSbf(sbf, tokenizer, queryExperiment.searchString) + foundInSbf = searchSbf(sbf, experiment.tokenizer, queryExperiment.searchString) lc := got[gotIdx].Data.(*chunkenc.Facade).LokiChunk() @@ -313,22 +307,21 @@ func readSBFFromObjectStorage(location, prefix, period, tenant, series string, o } func searchSbf(sbf *filter.ScalableBloomFilter, tokenizer bt.Tokenizer, searchString string) bool { - for i := 0; i <= tokenizer.GetSkip(); i++ { + tokens := bt.SearchesForTokenizerAndLine(tokenizer, searchString) + for _, tokenSet := range tokens { numMatches := 0 - if (len(searchString) - i) >= tokenizer.GetMin() { - tokens := tokenizer.Tokens(searchString[i:]) - - for _, token := range tokens { - if sbf.Test(token.Key) { - numMatches++ - } + for _, token := range tokenSet { + if sbf.Test(token.Key) { + numMatches++ } - if numMatches > 0 { - if numMatches == len(tokens) { - return true - } + } + if numMatches > 0 { + if numMatches == len(tokenSet) { + return true } } + } + return false } diff --git a/tools/tsdb/bloom-tester/readlib_test.go b/tools/tsdb/bloom-tester/readlib_test.go index ad7b7f0b732a0..5216918010bc1 100644 --- a/tools/tsdb/bloom-tester/readlib_test.go +++ b/tools/tsdb/bloom-tester/readlib_test.go @@ -1,19 +1,16 @@ package main import ( + bt "github.com/grafana/loki/pkg/storage/bloom/v1" "testing" "github.com/stretchr/testify/require" ) func TestSearchSbf(t *testing.T) { - tokenizer := four - - searchString := "trace" - experiment := NewExperiment( "token=4skip0_error=1%_indexchunks=true", - tokenizer, + four, true, onePctError, ) @@ -69,11 +66,13 @@ func TestSearchSbf(t *testing.T) { } { t.Run(tc.desc, func(t *testing.T) { sbf := experiment.bloom() - tokens := tokenizer.Tokens(tc.inputLine) - for _, token := range tokens { - sbf.Add(token.Key) + tokens := bt.SearchesForTokenizerAndLine(four, tc.inputLine) + for _, tokenSet := range tokens { + for _, token := range tokenSet { + sbf.Add(token.Key) + } } - require.Equal(t, tc.exp, searchSbf(sbf, tokenizer, searchString)) + require.Equal(t, tc.exp, searchSbf(sbf, four, tc.inputSearch)) }) } }