Skip to content

Commit

Permalink
Merge pull request #169 from go-faster/feat/line-filter-offload
Browse files Browse the repository at this point in the history
feat(logqlengine): extract line filter pre-condition
  • Loading branch information
tdakkota authored Sep 27, 2023
2 parents bda753c + c0e3a6c commit 6cc232d
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 6 deletions.
4 changes: 4 additions & 0 deletions integration/lokie2e/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ func runTest(
{`{http_method=~".+"} |= "\"method\": \"HEAD\"" |= "\"status\":500"`, 2},
{`{http_method=~".+"} |~ "\"method\":\\s*\"DELETE\""`, 20},
{`{http_method=~".+"} |~ "\"method\":\\s*\"HEAD\"" |= "\"status\":500"`, 2},
// Try to not use offloading.
{`{http_method=~".+"} | line_format "{{ __line__ }}" |= "\"method\": \"DELETE\""`, 20},
{`{http_method=~".+"} | line_format "{{ __line__ }}" |= "\"method\": \"HEAD\"" |= "\"status\":500"`, 2},
{`{http_method=~".+"} |= "\"method\": \"HEAD\"" | line_format "{{ __line__ }}" |= "\"status\":500"`, 2},
// Negative line matcher.
{`{http_method=~".+"} != "\"method\": \"HEAD\""`, len(set.Records) - 22},
{`{http_method=~".+"} !~ "\"method\":\\s*\"HEAD\""`, len(set.Records) - 22},
Expand Down
2 changes: 1 addition & 1 deletion internal/logql/logqlengine/eval_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (e *Engine) selectLogs(ctx context.Context, sel logql.Selector, stages []lo
params.Start = addDuration(params.Start, e.lookbackDuration)
}

cond, err := extractQueryConditions(e.querierCaps, sel)
cond, err := extractQueryConditions(e.querierCaps, sel, stages)
if err != nil {
return nil, errors.Wrap(err, "extract preconditions")
}
Expand Down
32 changes: 31 additions & 1 deletion internal/logql/logqlengine/precondition.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type queryConditions struct {
params SelectLogsParams
}

func extractQueryConditions(caps QuerierСapabilities, sel logql.Selector) (cond queryConditions, _ error) {
func extractQueryConditions(caps QuerierСapabilities, sel logql.Selector, stages []logql.PipelineStage) (cond queryConditions, _ error) {
var prefilters []Processor

for _, lm := range sel.Matchers {
Expand All @@ -36,5 +36,35 @@ func extractQueryConditions(caps QuerierСapabilities, sel logql.Selector) (cond
cond.prefilter = &Pipeline{Stages: prefilters}
}

stageLoop:
for _, stage := range stages {
switch stage := stage.(type) {
case *logql.LineFilter:
if stage.IP {
// Do not offload IP line filter.
continue
}
if !caps.Line.Supports(stage.Op) {
continue
}
cond.params.Line = append(cond.params.Line, *stage)
case *logql.JSONExpressionParser,
*logql.LogfmtExpressionParser,
*logql.RegexpLabelParser,
*logql.PatternLabelParser,
*logql.LabelFilter,
*logql.LabelFormatExpr,
*logql.DropLabelsExpr,
*logql.KeepLabelsExpr,
*logql.DistinctFilter:
// Do nothing on line, just skip.
case *logql.LineFormat,
*logql.DecolorizeExpr,
*logql.UnpackLabelParser:
// Stage modify the line, can't offload line filters after this stage.
break stageLoop
}
}

return cond, nil
}
75 changes: 73 additions & 2 deletions internal/logql/logqlengine/precondition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/go-faster/oteldb/internal/logql"
)

func Test_extractQueryConditions(t *testing.T) {
func TestExtractLabelQueryConditions(t *testing.T) {
tests := []struct {
sel logql.Selector
labelCaps []logql.BinOp
Expand Down Expand Up @@ -70,7 +70,7 @@ func Test_extractQueryConditions(t *testing.T) {
var caps QuerierСapabilities
caps.Label.Add(tt.labelCaps...)

conds, err := extractQueryConditions(caps, tt.sel)
conds, err := extractQueryConditions(caps, tt.sel, nil)
if tt.wantErr {
require.Error(t, err)
return
Expand All @@ -86,3 +86,74 @@ func Test_extractQueryConditions(t *testing.T) {
})
}
}

func TestExtractLineQueryConditions(t *testing.T) {
tests := []struct {
stages []logql.PipelineStage
lineCaps []logql.BinOp
conds SelectLogsParams
wantErr bool
}{
{
[]logql.PipelineStage{
&logql.DropLabelsExpr{},
&logql.LineFilter{Op: logql.OpEq, Value: "first"},
&logql.LineFilter{Op: logql.OpRe, Value: "regular.+", Re: regexp.MustCompile(`regular.+`)},
&logql.DecolorizeExpr{},
// These would not be offloaded.
&logql.LineFilter{Op: logql.OpEq, Value: "second"},
&logql.LineFilter{Op: logql.OpRe, Value: "no+", Re: regexp.MustCompile(`no.+`)},
},
[]logql.BinOp{
logql.OpEq,
logql.OpRe,
},
SelectLogsParams{
Line: []logql.LineFilter{
{Op: logql.OpEq, Value: "first"},
{Op: logql.OpRe, Value: "regular.+", Re: regexp.MustCompile(`regular.+`)},
},
},
false,
},
{
[]logql.PipelineStage{
&logql.LineFilter{Op: logql.OpRe, Value: "a.+", Re: regexp.MustCompile(`a.+`)},
&logql.DecolorizeExpr{},
&logql.LineFilter{Op: logql.OpRe, Value: "b+", Re: regexp.MustCompile(`b.+`)},
},
[]logql.BinOp{
logql.OpEq,
},
SelectLogsParams{},
false,
},
{
[]logql.PipelineStage{
&logql.LineFilter{Op: logql.OpEq, Value: "127.0.0.1", IP: true},
},
[]logql.BinOp{
logql.OpEq,
},
SelectLogsParams{},
false,
},
}
for i, tt := range tests {
tt := tt
t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) {
var caps QuerierСapabilities
caps.Line.Add(tt.lineCaps...)

conds, err := extractQueryConditions(caps, logql.Selector{}, tt.stages)
if tt.wantErr {
require.Error(t, err)
return
}
require.NoError(t, err)

require.Equal(t, NopProcessor, conds.prefilter)
require.Equal(t, tt.conds, conds.params)
})
}
}
2 changes: 2 additions & 0 deletions internal/logql/logqlengine/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func (caps SupportedOps) Supports(op logql.BinOp) bool {
// QuerierСapabilities defines what operations storage can do.
type QuerierСapabilities struct {
Label SupportedOps
Line SupportedOps
}

// Querier does queries to storage.
Expand All @@ -44,4 +45,5 @@ type Querier interface {
// SelectLogsParams is a storage query params.
type SelectLogsParams struct {
Labels []logql.LabelMatcher
Line []logql.LineFilter
}
36 changes: 34 additions & 2 deletions internal/ytstorage/yql_querier_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var _ logqlengine.Querier = (*YQLQuerier)(nil)
// Сapabilities defines storage capabilities.
func (q *YQLQuerier) Сapabilities() (caps logqlengine.QuerierСapabilities) {
caps.Label.Add(logql.OpEq, logql.OpNotEq, logql.OpRe, logql.OpNotRe)
caps.Line.Add(logql.OpEq, logql.OpNotEq, logql.OpRe, logql.OpNotRe)
return caps
}

Expand Down Expand Up @@ -59,7 +60,15 @@ func (q *YQLQuerier) SelectLogs(ctx context.Context, start, end otelstorage.Time
//
// See https://ytsaurus.tech/docs/en/yql/udf/list/pire#match
// See https://ytsaurus.tech/docs/en/yql/udf/list/re2#match
fmt.Fprintf(&query, "$matcher_%d = Re2::Match(%q);\n", matcherIdx, m.Re)
fmt.Fprintf(&query, "$label_matcher_%d = Re2::Match(%q);\n", matcherIdx, m.Re)
}
}
for matcherIdx, m := range params.Line {
if m.Op.IsRegex() {
// Note that Re2::Grep is used.
//
// Line filter is looking for substring, not for exact match.
fmt.Fprintf(&query, "$line_matcher_%d = Re2::Grep(%q);\n", matcherIdx, m.Re)
}
}

Expand Down Expand Up @@ -97,14 +106,37 @@ func (q *YQLQuerier) SelectLogs(ctx context.Context, start, end otelstorage.Time
case logql.OpEq, logql.OpNotEq:
fmt.Fprintf(&query, "Yson::ConvertToString(Yson::YPath(%s, %q)) = %q", column, yp, m.Value)
case logql.OpRe, logql.OpNotRe:
fmt.Fprintf(&query, "$matcher_%d(Yson::ConvertToString(Yson::YPath(%s, %q)))", matcherIdx, column, yp)
fmt.Fprintf(&query, "$label_matcher_%d(Yson::ConvertToString(Yson::YPath(%s, %q)))", matcherIdx, column, yp)
default:
return nil, errors.Errorf("unexpected op %q", m.Op)
}
query.WriteString(" )\n")
}
query.WriteString("\t)\n")
}
for matcherIdx, m := range params.Line {
query.WriteString("\t")
switch m.Op {
case logql.OpEq, logql.OpRe:
query.WriteString("AND ")
case logql.OpNotEq, logql.OpNotRe:
query.WriteString("AND NOT ")
default:
return nil, errors.Errorf("unexpected op %q", m.Op)
}

// Note: predicate negated above.
switch m.Op {
case logql.OpEq, logql.OpNotEq:
fmt.Fprintf(&query, "String::Contains(body, %q)", m.Value)
case logql.OpRe, logql.OpNotRe:
fmt.Fprintf(&query, "$line_matcher_%d(body)", matcherIdx)
default:
return nil, errors.Errorf("unexpected op %q", m.Op)
}
query.WriteString("\n")
}

query.WriteString("ORDER BY `timestamp`")

return yqlclient.YQLQuery[logstorage.Record](ctx, q.client, query.String())
Expand Down
14 changes: 14 additions & 0 deletions internal/ytstorage/ytql_querier_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (q *YTQLQuerier) Сapabilities() (caps logqlengine.QuerierСapabilities) {
// FIXME(tdakkota): we don't add OpRe and OpNotRe because YT QL query executer throws an exception
// when regexp function are used.
caps.Label.Add(logql.OpEq, logql.OpNotEq)
caps.Line.Add(logql.OpEq, logql.OpNotEq)
return caps
}

Expand Down Expand Up @@ -131,6 +132,19 @@ func (q *YTQLQuerier) SelectLogs(ctx context.Context, start, end otelstorage.Tim
}
query.WriteByte(')')
}
for _, m := range params.Line {
switch m.Op {
case logql.OpEq:
query.WriteString(" AND ")
case logql.OpNotEq:
query.WriteString(" AND NOT ")
default:
return nil, errors.Errorf("unexpected op %q", m.Op)
}

// Line filter checks if line contains given value.
fmt.Fprintf(&query, "is_substr(%q, body)", m.Value)
}

r, err := q.yc.SelectRows(ctx, query.String(), nil)
if err != nil {
Expand Down

0 comments on commit 6cc232d

Please sign in to comment.