Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tokenizer tests and TokenizeLine updates #11133

Merged
merged 16 commits into from
Nov 8, 2023
58 changes: 54 additions & 4 deletions pkg/storage/bloom/v1/bloom_tokenizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type BloomTokenizer struct {
}

const CacheSize = 150000
const DefaultNGramLength = 4
const DefaultNGramSkip = 0

// NewBloomTokenizer returns a new instance of the Bloom Tokenizer.
// Warning: the tokens returned use the same byte slice to reduce allocations. This has two consequences:
Expand All @@ -44,7 +46,7 @@ func NewBloomTokenizer(reg prometheus.Registerer) (*BloomTokenizer, error) {
metrics: newMetrics(reg),
}
t.cache = make(map[string]interface{}, CacheSize)
t.lineTokenizer = NewNGramTokenizer(4, 5, 0) // default to 4-grams, no skip
t.lineTokenizer = NewNGramTokenizer(DefaultNGramLength, DefaultNGramLength+1, DefaultNGramSkip) // default to 4-grams, no skip
t.chunkIDTokenizer = ChunkIDTokenizer(t.lineTokenizer)

level.Info(util_log.Logger).Log("bloom tokenizer created")
Expand Down Expand Up @@ -101,7 +103,7 @@ func (bt *BloomTokenizer) PopulateSeriesWithBloom(seriesWithBloom *SeriesWithBlo

seriesWithBloom.Bloom.ScalableBloomFilter.TestAndAdd(tok.Key)

if len(bt.cache) > 150000 { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other
if len(bt.cache) >= CacheSize { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other
clearCache(bt.cache)
}
}
Expand All @@ -116,6 +118,54 @@ func (bt *BloomTokenizer) PopulateSeriesWithBloom(seriesWithBloom *SeriesWithBlo
} // for each chunk
}

func (bt *BloomTokenizer) TokenizeLine(line string) []Token {
return bt.lineTokenizer.Tokens(line)
// TokenizeLine returns a slice of tokens for the given line, based on the current value of the tokenizer
// If the tokenizer has a skip value, then the line will be tokenized multiple times,
owen-d marked this conversation as resolved.
Show resolved Hide resolved
// starting at the beginning of the line, with "skip" number of iterations, offset by one each time
// Each offset is kept as a separate slice of tokens, and all are returned in a slice of slices
func (bt *BloomTokenizer) TokenizeLine(line string) [][]Token {
allTokens := make([][]Token, 0, 10)
owen-d marked this conversation as resolved.
Show resolved Hide resolved
if len(line) >= bt.lineTokenizer.GetMin() && len(line) >= bt.lineTokenizer.GetSkip() {
owen-d marked this conversation as resolved.
Show resolved Hide resolved
for i := 0; i <= bt.lineTokenizer.GetSkip(); i++ {
tmpTokens := make([]Token, 0, 100)
tokens := bt.lineTokenizer.Tokens(line[i:])
owen-d marked this conversation as resolved.
Show resolved Hide resolved
for _, token := range tokens {
tmpToken := Token{}
tmpToken.Key = make([]byte, len(token.Key))
copy(tmpToken.Key, token.Key)
tmpTokens = append(tmpTokens, tmpToken)
}
if len(tokens) > 0 {
allTokens = append(allTokens, tmpTokens)
}
}
}
return allTokens
}

// TokenizeLineWithChunkPrefix returns a slice of tokens for the given line, based on the current value of the tokenizer,
// and prepends the chunk ID to the tokens
// If the tokenizer has a skip value, then the line will be tokenized multiple times,
// starting at the beginning of the line, with "skip" number of iterations, offset by one each time
// Each offset is kept as a separate slice of tokens, and all are returned in a slice of slices
func (bt *BloomTokenizer) TokenizeLineWithChunkPrefix(line string, chk logproto.ChunkRef) [][]Token {
Copy link
Member

@owen-d owen-d Nov 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need for two functions here -- you can just use something like the following and apply it to any tokenizer (chunk_prefix_tokenizer or regular)

func SearchesForTokenizerAndLine(t Tokenizer, line string) (res [][]Token) {
  for i := 0; i < t.Skip()+1; i++ {
    res = append(res, t.Tokens(line[i:])) // this needs to account for runes vs bytes, but you get the idea 
  }
  return
}

allTokens := make([][]Token, 0, 10)

if len(line) >= bt.chunkIDTokenizer.GetMin() && len(line) >= bt.chunkIDTokenizer.GetSkip() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things:

  • This actually needs to ensure the length is >= min + skip.
  • len(str) doesn't return the number of runes, but the number of bytes. We need to account for runes since that's how we index. See this for more detail.

for i := 0; i <= bt.chunkIDTokenizer.GetSkip(); i++ {
bt.chunkIDTokenizer.Reinit(chk)

tmpTokens := make([]Token, 0, 100)
tokens := bt.chunkIDTokenizer.Tokens(line[i:])
for _, token := range tokens {
tmpToken := Token{}
tmpToken.Key = make([]byte, len(token.Key))
copy(tmpToken.Key, token.Key)
tmpTokens = append(tmpTokens, tmpToken)
}
if len(tokens) > 0 {
allTokens = append(allTokens, tmpTokens)
}
}
}
return allTokens
}
299 changes: 299 additions & 0 deletions pkg/storage/bloom/v1/bloom_tokenizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,310 @@ package v1

import (
"fmt"
"time"

"github.com/grafana/loki/pkg/logproto"

"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/pkg/storage/chunk"

"testing"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/storage/bloom/v1/filter"

"github.com/prometheus/client_golang/prometheus"
)

func TestSetLineTokenizer(t *testing.T) {
bt, _ := NewBloomTokenizer(prometheus.DefaultRegisterer)

// Validate defaults
require.Equal(t, bt.lineTokenizer.GetMin(), DefaultNGramLength)
require.Equal(t, bt.lineTokenizer.GetMax(), DefaultNGramLength+1)
require.Equal(t, bt.lineTokenizer.GetSkip(), DefaultNGramSkip)

require.Equal(t, bt.chunkIDTokenizer.GetMin(), DefaultNGramLength)
require.Equal(t, bt.chunkIDTokenizer.GetMax(), DefaultNGramLength+1)
require.Equal(t, bt.chunkIDTokenizer.GetSkip(), DefaultNGramSkip)

// Set new tokenizer, and validate against that
bt.SetLineTokenizer(NewNGramTokenizer(6, 7, 2))
require.Equal(t, bt.lineTokenizer.GetMin(), 6)
require.Equal(t, bt.lineTokenizer.GetMax(), 7)
require.Equal(t, bt.lineTokenizer.GetSkip(), 2)

require.Equal(t, bt.chunkIDTokenizer.GetMin(), 6)
require.Equal(t, bt.chunkIDTokenizer.GetMax(), 7)
require.Equal(t, bt.chunkIDTokenizer.GetSkip(), 2)
}

func TestDefaultTokenizeLine(t *testing.T) {
bt, _ := NewBloomTokenizer(prometheus.DefaultRegisterer)

for _, tc := range []struct {
desc string
input string
exp [][]Token
}{
{
desc: "empty",
input: "",
exp: [][]Token{},
},
{
desc: "single char",
input: "a",
exp: [][]Token{},
},
{
desc: "four chars",
input: "abcd",
exp: [][]Token{
{{Key: []byte("abcd")}}},
},
{
desc: "uuid partial",
input: "2b1a5e46-36a2-4",
exp: [][]Token{{
{Key: []byte("2b1a")},
{Key: []byte("b1a5")},
{Key: []byte("1a5e")},
{Key: []byte("a5e4")},
{Key: []byte("5e46")},
{Key: []byte("e46-")},
{Key: []byte("46-3")},
{Key: []byte("6-36")},
{Key: []byte("-36a")},
{Key: []byte("36a2")},
{Key: []byte("6a2-")},
{Key: []byte("a2-4")}},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
require.Equal(t, tc.exp, bt.TokenizeLine(tc.input))
})
}
}

func TestTokenizeLineWithSkips(t *testing.T) {
bt, _ := NewBloomTokenizer(prometheus.DefaultRegisterer)
bt.SetLineTokenizer(NewNGramTokenizer(DefaultNGramLength, DefaultNGramLength+1, 2))

for _, tc := range []struct {
desc string
input string
exp [][]Token
}{
{
desc: "empty",
input: "",
exp: [][]Token{},
},
{
desc: "single char",
input: "a",
exp: [][]Token{},
},
{
desc: "four chars",
input: "abcd",
exp: [][]Token{{
{Key: []byte("abcd")}}},
},
{
desc: "longer string",
owen-d marked this conversation as resolved.
Show resolved Hide resolved
input: "abcdefghijkl",
exp: [][]Token{
{{Key: []byte("abcd")},
{Key: []byte("defg")},
{Key: []byte("ghij")}},
{{Key: []byte("bcde")},
{Key: []byte("efgh")},
{Key: []byte("hijk")}},
{{Key: []byte("cdef")},
{Key: []byte("fghi")},
{Key: []byte("ijkl")}},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
require.Equal(t, tc.exp, bt.TokenizeLine(tc.input))
})
}
}

func TestDefaultTokenizeLineWithChunkPrefix(t *testing.T) {
bt, _ := NewBloomTokenizer(prometheus.DefaultRegisterer)
chk := logproto.ChunkRef{From: 0, Through: 999999, Checksum: 1}

for _, tc := range []struct {
desc string
input string
exp [][]Token
}{
{
desc: "empty",
input: "",
exp: [][]Token{},
},
{
desc: "single char",
input: "a",
exp: [][]Token{},
},
{
desc: "four chars",
input: "abcd",
exp: [][]Token{{
{Key: append(makeBuf(0, 999999, 1), []byte("abcd")...)},
{Key: []byte("abcd")}}},
},
{
desc: "uuid partial",
input: "2b1a5e46-36a2-4",
exp: [][]Token{{
{Key: append(makeBuf(0, 999999, 1), []byte("2b1a")...)},
{Key: []byte("2b1a")},
{Key: append(makeBuf(0, 999999, 1), []byte("b1a5")...)},
{Key: []byte("b1a5")},
{Key: append(makeBuf(0, 999999, 1), []byte("1a5e")...)},
{Key: []byte("1a5e")},
{Key: append(makeBuf(0, 999999, 1), []byte("a5e4")...)},
{Key: []byte("a5e4")},
{Key: append(makeBuf(0, 999999, 1), []byte("5e46")...)},
{Key: []byte("5e46")},
{Key: append(makeBuf(0, 999999, 1), []byte("e46-")...)},
{Key: []byte("e46-")},
{Key: append(makeBuf(0, 999999, 1), []byte("46-3")...)},
{Key: []byte("46-3")},
{Key: append(makeBuf(0, 999999, 1), []byte("6-36")...)},
{Key: []byte("6-36")},
{Key: append(makeBuf(0, 999999, 1), []byte("-36a")...)},
{Key: []byte("-36a")},
{Key: append(makeBuf(0, 999999, 1), []byte("36a2")...)},
{Key: []byte("36a2")},
{Key: append(makeBuf(0, 999999, 1), []byte("6a2-")...)},
{Key: []byte("6a2-")},
{Key: append(makeBuf(0, 999999, 1), []byte("a2-4")...)},
{Key: []byte("a2-4")}},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
require.Equal(t, tc.exp, bt.TokenizeLineWithChunkPrefix(tc.input, chk))
})
}
}

func TestTokenizeLineWithSkipsWithChunkPrefix(t *testing.T) {
bt, _ := NewBloomTokenizer(prometheus.DefaultRegisterer)
chk := logproto.ChunkRef{From: 0, Through: 999999, Checksum: 1}
bt.SetLineTokenizer(NewNGramTokenizer(DefaultNGramLength, DefaultNGramLength+1, 2))

for _, tc := range []struct {
desc string
input string
exp [][]Token
}{
{
desc: "empty",
input: "",
exp: [][]Token{},
},
{
desc: "single char",
input: "a",
exp: [][]Token{},
},
{
desc: "four chars",
input: "abcd",
exp: [][]Token{{
{Key: append(makeBuf(0, 999999, 1), []byte("abcd")...)},
{Key: []byte("abcd")}}},
},
{
desc: "longer string",
input: "abcdefghijkl",
exp: [][]Token{
{{Key: append(makeBuf(0, 999999, 1), []byte("abcd")...)},
{Key: []byte("abcd")},
{Key: append(makeBuf(0, 999999, 1), []byte("defg")...)},
{Key: []byte("defg")},
{Key: append(makeBuf(0, 999999, 1), []byte("ghij")...)},
{Key: []byte("ghij")}},
{{Key: append(makeBuf(0, 999999, 1), []byte("bcde")...)},
{Key: []byte("bcde")},
{Key: append(makeBuf(0, 999999, 1), []byte("efgh")...)},
{Key: []byte("efgh")},
{Key: append(makeBuf(0, 999999, 1), []byte("hijk")...)},
{Key: []byte("hijk")}},
{{Key: append(makeBuf(0, 999999, 1), []byte("cdef")...)},
{Key: []byte("cdef")},
{Key: append(makeBuf(0, 999999, 1), []byte("fghi")...)},
{Key: []byte("fghi")},
{Key: append(makeBuf(0, 999999, 1), []byte("ijkl")...)},
{Key: []byte("ijkl")}},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
require.Equal(t, tc.exp, bt.TokenizeLineWithChunkPrefix(tc.input, chk))
})
}
}

func TestPopulateSeriesWithBloom(t *testing.T) {
var testLine = "this is a log line"
bt, _ := NewBloomTokenizer(prometheus.DefaultRegisterer)

sbf := filter.NewScalableBloomFilter(1024, 0.01, 0.8)
var lbsList []labels.Labels
lbsList = append(lbsList, labels.FromStrings("foo", "bar"))

var fpList []model.Fingerprint
for i := range lbsList {
fpList = append(fpList, model.Fingerprint(lbsList[i].Hash()))
}

var memChunks = make([]*chunkenc.MemChunk, 0)
memChunk0 := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000)
_ = memChunk0.Append(&push.Entry{
Timestamp: time.Unix(0, 1),
Line: testLine,
})
memChunks = append(memChunks, memChunk0)

var chunks = make([]chunk.Chunk, 0)
for i := range memChunks {
chunks = append(chunks, chunk.NewChunk("user", fpList[i], lbsList[i], chunkenc.NewFacade(memChunks[i], 256000, 1500000), model.TimeFromUnixNano(0), model.TimeFromUnixNano(1)))
}

bloom := Bloom{
ScalableBloomFilter: *sbf,
}
series := Series{
Fingerprint: model.Fingerprint(lbsList[0].Hash()),
}
swb := SeriesWithBloom{
Bloom: &bloom,
Series: &series,
}

bt.PopulateSeriesWithBloom(&swb, chunks)
tokens := bt.TokenizeLine(testLine)
for _, token := range tokens[0] {
require.True(t, swb.Bloom.Test(token.Key))
}
}

func BenchmarkMapClear(b *testing.B) {
bt, _ := NewBloomTokenizer(prometheus.DefaultRegisterer)
for i := 0; i < b.N; i++ {
Expand Down
Loading
Loading