Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support usage trackers for received and discarded bytes. #11840

Merged
merged 37 commits into from
Feb 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
154a6d1
Support custom trackers for received bytes.
jeschkies Jan 31, 2024
79fff6b
Implement custom trackers config
jeschkies Jan 31, 2024
fd1ff01
Note todos
jeschkies Feb 1, 2024
a787cf4
Fix test
jeschkies Feb 1, 2024
ac8ae3a
Move code block
jeschkies Feb 1, 2024
dcbd58e
Merge remote-tracking branch 'grafana/main' into karsten/custom-trackers
jeschkies Feb 1, 2024
d37343b
Start tracking discarded bytes
jeschkies Feb 1, 2024
d5be74f
Calrify todo
jeschkies Feb 2, 2024
77efea5
Change return type
jeschkies Feb 2, 2024
51cbaa4
Comment discarded bytes
jeschkies Feb 2, 2024
b0b4c7c
generate docs
jeschkies Feb 2, 2024
fe34b32
Update docs
jeschkies Feb 2, 2024
be4d2c8
Add changelog entry
jeschkies Feb 2, 2024
e15746c
Merge remote-tracking branch 'grafana/main' into karsten/custom-trackers
jeschkies Feb 2, 2024
659a9ef
Test push metrics
jeschkies Feb 2, 2024
90099ef
Return empty trackers instead of nil
jeschkies Feb 2, 2024
4f0697a
Test oltp push
jeschkies Feb 2, 2024
b6b47a5
Merge remote-tracking branch 'grafana/main' into karsten/custom-trackers
jeschkies Feb 19, 2024
457b002
Merge remote-tracking branch 'grafana/main' into karsten/custom-trackers
jeschkies Feb 20, 2024
4c3292d
Use label names intead of regex matchers
jeschkies Feb 20, 2024
ca417c2
correct doc
jeschkies Feb 20, 2024
ee947b3
Test unmarshalling
jeschkies Feb 20, 2024
e650cf0
Introduce custom tracker interface
jeschkies Feb 21, 2024
4ce9aaf
fix tests
jeschkies Feb 21, 2024
c57c493
Pass empty limits
jeschkies Feb 21, 2024
97a5af9
test tracker
jeschkies Feb 21, 2024
d077823
Merge remote-tracking branch 'grafana/main' into karsten/custom-trackers
jeschkies Feb 21, 2024
22e1d8e
Report on discarded stream bytes
jeschkies Feb 22, 2024
a9a17fa
Merge remote-tracking branch 'grafana/main' into karsten/custom-trackers
jeschkies Feb 22, 2024
c564f40
Document custom tracker structs and methods
jeschkies Feb 22, 2024
fede4cf
Rename tracker interface and methods
jeschkies Feb 22, 2024
9498628
Remove labels filtering and configuration.
jeschkies Feb 23, 2024
ab4f152
Rename tracker
jeschkies Feb 23, 2024
9df9532
Correct changelog
jeschkies Feb 23, 2024
c6a7cb4
Inject usage tracker
jeschkies Feb 23, 2024
d07ac85
Do not presist label string
jeschkies Feb 23, 2024
c01afd7
Correct push test
jeschkies Feb 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

##### Enhancements

* [11840](https://github.com/grafana/loki/pull/11840) **jeschkies**: Allow custom usage trackers for ingested and discarded bytes metric.
* [11814](https://github.com/grafana/loki/pull/11814) **kavirajk**: feat: Support split align and caching for instant metric query results
* [11851](https://github.com/grafana/loki/pull/11851) **elcomtik**: Helm: Allow the definition of resources for GrafanaAgent pods.
* [11819](https://github.com/grafana/loki/pull/11819) **jburnham**: Ruler: Add the ability to disable the `X-Scope-OrgId` tenant identification header in remote write requests.
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/promtail/targets/lokipush/pushtarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (t *PushTarget) run() error {
func (t *PushTarget) handleLoki(w http.ResponseWriter, r *http.Request) {
logger := util_log.WithContext(r.Context(), util_log.Logger)
userID, _ := tenant.TenantID(r.Context())
req, err := push.ParseRequest(logger, userID, r, nil, nil, push.ParseLokiRequest)
req, err := push.ParseRequest(logger, userID, r, nil, push.EmptyLimits{}, push.ParseLokiRequest, nil)
if err != nil {
level.Warn(t.logger).Log("msg", "failed to parse incoming push request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (c *Compactor) ownsTenant(tenant string) ([]v1.FingerprintBounds, bool, err
return nil, false, nil
}

// TOOD(owen-d): use <ReadRing>.GetTokenRangesForInstance()
// TODO(owen-d): use <ReadRing>.GetTokenRangesForInstance()
// when it's supported for non zone-aware rings
// instead of doing all this manually

Expand Down
47 changes: 35 additions & 12 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/grafana/loki/pkg/distributor/writefailures"
"github.com/grafana/loki/pkg/ingester"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/loghttp/push"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/runtime"
Expand Down Expand Up @@ -126,6 +127,8 @@ type Distributor struct {
ingesterAppendTimeouts *prometheus.CounterVec
replicationFactor prometheus.Gauge
streamShardCount prometheus.Counter

usageTracker push.UsageTracker
}

// New a distributor creates.
Expand All @@ -138,6 +141,7 @@ func New(
registerer prometheus.Registerer,
metricsNamespace string,
tee Tee,
usageTracker push.UsageTracker,
logger log.Logger,
) (*Distributor, error) {
factory := cfg.factory
Expand All @@ -153,7 +157,7 @@ func New(
return client.New(internalCfg, addr)
}

validator, err := NewValidator(overrides)
validator, err := NewValidator(overrides, usageTracker)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -185,6 +189,7 @@ func New(
healthyInstancesCount: atomic.NewUint32(0),
rateLimitStrat: rateLimitStrat,
tee: tee,
usageTracker: usageTracker,
ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "distributor_ingester_appends_total",
Expand Down Expand Up @@ -337,7 +342,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
// Truncate first so subsequent steps have consistent line lengths
d.truncateLines(validationContext, &stream)

stream.Labels, stream.Hash, err = d.parseStreamLabels(validationContext, stream.Labels, &stream)
var lbs labels.Labels
lbs, stream.Labels, stream.Hash, err = d.parseStreamLabels(validationContext, stream.Labels, &stream)
if err != nil {
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
Expand All @@ -354,7 +360,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
pushSize := 0
prevTs := stream.Entries[0].Timestamp
for _, entry := range stream.Entries {
if err := d.validator.ValidateEntry(validationContext, stream.Labels, entry); err != nil {
if err := d.validator.ValidateEntry(validationContext, lbs, entry); err != nil {
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
continue
Expand Down Expand Up @@ -412,6 +418,24 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, tenantID).Add(float64(validatedLineCount))
validation.DiscardedBytes.WithLabelValues(validation.RateLimited, tenantID).Add(float64(validatedLineSize))

if d.usageTracker != nil {
for _, stream := range req.Streams {
lbs, _, _, err := d.parseStreamLabels(validationContext, stream.Labels, &stream)
if err != nil {
continue
}

discardedStreamBytes := 0
for _, e := range stream.Entries {
discardedStreamBytes += len(e.Line)
}

if d.usageTracker != nil {
d.usageTracker.DiscardedBytesAdd(tenantID, validation.RateLimited, lbs, float64(discardedStreamBytes))
}
}
}

err = fmt.Errorf(validation.RateLimitedErrorMsg, tenantID, int(d.ingestionRateLimiter.Limit(now, tenantID)), validatedLineCount, validatedLineSize)
d.writeFailuresManager.Log(tenantID, err)
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, err.Error())
Expand Down Expand Up @@ -684,30 +708,29 @@ func (d *Distributor) sendStreamsErr(ctx context.Context, ingester ring.Instance
}

type labelData struct {
labels string
hash uint64
ls labels.Labels
hash uint64
}

func (d *Distributor) parseStreamLabels(vContext validationContext, key string, stream *logproto.Stream) (string, uint64, error) {
func (d *Distributor) parseStreamLabels(vContext validationContext, key string, stream *logproto.Stream) (labels.Labels, string, uint64, error) {
if val, ok := d.labelCache.Get(key); ok {
labelVal := val.(labelData)
return labelVal.labels, labelVal.hash, nil
return labelVal.ls, labelVal.ls.String(), labelVal.hash, nil
}

ls, err := syntax.ParseLabels(key)
if err != nil {
return "", 0, fmt.Errorf(validation.InvalidLabelsErrorMsg, key, err)
return nil, "", 0, fmt.Errorf(validation.InvalidLabelsErrorMsg, key, err)
}

if err := d.validator.ValidateLabels(vContext, ls, *stream); err != nil {
return "", 0, err
return nil, "", 0, err
}

lsVal := ls.String()
lsHash := ls.Hash()

d.labelCache.Add(key, labelData{lsVal, lsHash})
return lsVal, lsHash, nil
d.labelCache.Add(key, labelData{ls, lsHash})
return ls, ls.String(), lsHash, nil
}

// shardCountFor returns the right number of shards to be used by the given stream.
Expand Down
10 changes: 5 additions & 5 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ func TestStreamShard(t *testing.T) {
overrides, err := validation.NewOverrides(*distributorLimits, nil)
require.NoError(t, err)

validator, err := NewValidator(overrides)
validator, err := NewValidator(overrides, nil)
require.NoError(t, err)

d := Distributor{
Expand Down Expand Up @@ -656,7 +656,7 @@ func TestStreamShardAcrossCalls(t *testing.T) {
overrides, err := validation.NewOverrides(*distributorLimits, nil)
require.NoError(t, err)

validator, err := NewValidator(overrides)
validator, err := NewValidator(overrides, nil)
require.NoError(t, err)

t.Run("it generates 4 shards across 2 calls when calculated shards = 2 * entries per call", func(t *testing.T) {
Expand Down Expand Up @@ -721,7 +721,7 @@ func BenchmarkShardStream(b *testing.B) {
overrides, err := validation.NewOverrides(*distributorLimits, nil)
require.NoError(b, err)

validator, err := NewValidator(overrides)
validator, err := NewValidator(overrides, nil)
require.NoError(b, err)

distributorBuilder := func(shards int) *Distributor {
Expand Down Expand Up @@ -788,7 +788,7 @@ func Benchmark_SortLabelsOnPush(b *testing.B) {
for n := 0; n < b.N; n++ {
stream := request.Streams[0]
stream.Labels = `{buzz="f", a="b"}`
_, _, err := d.parseStreamLabels(vCtx, stream.Labels, &stream)
_, _, _, err := d.parseStreamLabels(vCtx, stream.Labels, &stream)
if err != nil {
panic("parseStreamLabels fail,err:" + err.Error())
}
Expand Down Expand Up @@ -1159,7 +1159,7 @@ func prepare(t *testing.T, numDistributors, numIngesters int, limits *validation
overrides, err := validation.NewOverrides(*limits, nil)
require.NoError(t, err)

d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, overrides, prometheus.NewPedanticRegistry(), constants.Loki, nil, log.NewNopLogger())
d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, overrides, prometheus.NewPedanticRegistry(), constants.Loki, nil, nil, log.NewNopLogger())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), d))
distributors[i] = d
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, d.validator.Limits, pushRequestParser)
req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, d.validator.Limits, pushRequestParser, d.usageTracker)
if err != nil {
if d.tenantConfigs.LogPushRequest(tenantID) {
level.Debug(logger).Log(
Expand Down
26 changes: 23 additions & 3 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/loghttp/push"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/validation"
)
Expand All @@ -18,13 +19,14 @@ const (

type Validator struct {
Limits
usageTracker push.UsageTracker
}

func NewValidator(l Limits) (*Validator, error) {
func NewValidator(l Limits, t push.UsageTracker) (*Validator, error) {
if l == nil {
return nil, errors.New("nil Limits")
}
return &Validator{l}, nil
return &Validator{l, t}, nil
}

type validationContext struct {
Expand Down Expand Up @@ -67,7 +69,7 @@ func (v Validator) getValidationContextForTime(now time.Time, userID string) val
}

// ValidateEntry returns an error if the entry is invalid and report metrics for invalid entries accordingly.
func (v Validator) ValidateEntry(ctx validationContext, labels string, entry logproto.Entry) error {
func (v Validator) ValidateEntry(ctx validationContext, labels labels.Labels, entry logproto.Entry) error {
ts := entry.Timestamp.UnixNano()
validation.LineLengthHist.Observe(float64(len(entry.Line)))

Expand All @@ -77,13 +79,19 @@ func (v Validator) ValidateEntry(ctx validationContext, labels string, entry log
formatedRejectMaxAgeTime := time.Unix(0, ctx.rejectOldSampleMaxAge).Format(timeFormat)
validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Add(float64(len(entry.Line)))
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx.userID, validation.GreaterThanMaxSampleAge, labels, float64(len(entry.Line)))
}
return fmt.Errorf(validation.GreaterThanMaxSampleAgeErrorMsg, labels, formatedEntryTime, formatedRejectMaxAgeTime)
}

if ts > ctx.creationGracePeriod {
formatedEntryTime := entry.Timestamp.Format(timeFormat)
validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, ctx.userID).Add(float64(len(entry.Line)))
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx.userID, validation.TooFarInFuture, labels, float64(len(entry.Line)))
}
return fmt.Errorf(validation.TooFarInFutureErrorMsg, labels, formatedEntryTime)
}

Expand All @@ -94,13 +102,19 @@ func (v Validator) ValidateEntry(ctx validationContext, labels string, entry log
// for parity.
validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, ctx.userID).Add(float64(len(entry.Line)))
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx.userID, validation.LineTooLong, labels, float64(len(entry.Line)))
}
return fmt.Errorf(validation.LineTooLongErrorMsg, maxSize, labels, len(entry.Line))
}

if len(entry.StructuredMetadata) > 0 {
if !ctx.allowStructuredMetadata {
validation.DiscardedSamples.WithLabelValues(validation.DisallowedStructuredMetadata, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.DisallowedStructuredMetadata, ctx.userID).Add(float64(len(entry.Line)))
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx.userID, validation.DisallowedStructuredMetadata, labels, float64(len(entry.Line)))
}
return fmt.Errorf(validation.DisallowedStructuredMetadataErrorMsg, labels)
}

Expand All @@ -113,12 +127,18 @@ func (v Validator) ValidateEntry(ctx validationContext, labels string, entry log
if maxSize := ctx.maxStructuredMetadataSize; maxSize != 0 && structuredMetadataSizeBytes > maxSize {
validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooLarge, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooLarge, ctx.userID).Add(float64(len(entry.Line)))
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx.userID, validation.StructuredMetadataTooLarge, labels, float64(len(entry.Line)))
}
return fmt.Errorf(validation.StructuredMetadataTooLargeErrorMsg, labels, structuredMetadataSizeBytes, ctx.maxStructuredMetadataSize)
}

if maxCount := ctx.maxStructuredMetadataCount; maxCount != 0 && structuredMetadataCount > maxCount {
validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooMany, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooMany, ctx.userID).Add(float64(len(entry.Line)))
if v.usageTracker != nil {
v.usageTracker.DiscardedBytesAdd(ctx.userID, validation.StructuredMetadataTooMany, labels, float64(len(entry.Line)))
}
return fmt.Errorf(validation.StructuredMetadataTooManyErrorMsg, labels, structuredMetadataCount, ctx.maxStructuredMetadataCount)
}
}
Expand Down
21 changes: 11 additions & 10 deletions pkg/distributor/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ import (
)

var (
testStreamLabels = "FIXME"
testTime = time.Now()
testStreamLabels = labels.Labels{{Name: "my", Value: "label"}}
testStreamLabelsString = testStreamLabels.String()
testTime = time.Now()
)

type fakeLimits struct {
Expand Down Expand Up @@ -61,7 +62,7 @@ func TestValidator_ValidateEntry(t *testing.T) {
},
logproto.Entry{Timestamp: testTime.Add(-time.Hour * 5), Line: "test"},
fmt.Errorf(validation.GreaterThanMaxSampleAgeErrorMsg,
testStreamLabels,
testStreamLabelsString,
testTime.Add(-time.Hour*5).Format(timeFormat),
testTime.Add(-1*time.Hour).Format(timeFormat), // same as RejectOldSamplesMaxAge
),
Expand All @@ -71,7 +72,7 @@ func TestValidator_ValidateEntry(t *testing.T) {
"test",
nil,
logproto.Entry{Timestamp: testTime.Add(time.Hour * 5), Line: "test"},
fmt.Errorf(validation.TooFarInFutureErrorMsg, testStreamLabels, testTime.Add(time.Hour*5).Format(timeFormat)),
fmt.Errorf(validation.TooFarInFutureErrorMsg, testStreamLabelsString, testTime.Add(time.Hour*5).Format(timeFormat)),
},
{
"line too long",
Expand All @@ -82,7 +83,7 @@ func TestValidator_ValidateEntry(t *testing.T) {
},
},
logproto.Entry{Timestamp: testTime, Line: "12345678901"},
fmt.Errorf(validation.LineTooLongErrorMsg, 10, testStreamLabels, 11),
fmt.Errorf(validation.LineTooLongErrorMsg, 10, testStreamLabelsString, 11),
},
{
"disallowed structured metadata",
Expand All @@ -93,7 +94,7 @@ func TestValidator_ValidateEntry(t *testing.T) {
},
},
logproto.Entry{Timestamp: testTime, Line: "12345678901", StructuredMetadata: push.LabelsAdapter{{Name: "foo", Value: "bar"}}},
fmt.Errorf(validation.DisallowedStructuredMetadataErrorMsg, testStreamLabels),
fmt.Errorf(validation.DisallowedStructuredMetadataErrorMsg, testStreamLabelsString),
},
{
"structured metadata too big",
Expand All @@ -105,7 +106,7 @@ func TestValidator_ValidateEntry(t *testing.T) {
},
},
logproto.Entry{Timestamp: testTime, Line: "12345678901", StructuredMetadata: push.LabelsAdapter{{Name: "foo", Value: "bar"}}},
fmt.Errorf(validation.StructuredMetadataTooLargeErrorMsg, testStreamLabels, 6, 4),
fmt.Errorf(validation.StructuredMetadataTooLargeErrorMsg, testStreamLabelsString, 6, 4),
},
{
"structured metadata too many",
Expand All @@ -117,7 +118,7 @@ func TestValidator_ValidateEntry(t *testing.T) {
},
},
logproto.Entry{Timestamp: testTime, Line: "12345678901", StructuredMetadata: push.LabelsAdapter{{Name: "foo", Value: "bar"}, {Name: "too", Value: "many"}}},
fmt.Errorf(validation.StructuredMetadataTooManyErrorMsg, testStreamLabels, 2, 1),
fmt.Errorf(validation.StructuredMetadataTooManyErrorMsg, testStreamLabelsString, 2, 1),
},
}
for _, tt := range tests {
Expand All @@ -126,7 +127,7 @@ func TestValidator_ValidateEntry(t *testing.T) {
flagext.DefaultValues(l)
o, err := validation.NewOverrides(*l, tt.overrides)
assert.NoError(t, err)
v, err := NewValidator(o)
v, err := NewValidator(o, nil)
assert.NoError(t, err)

err = v.ValidateEntry(v.getValidationContextForTime(testTime, tt.userID), testStreamLabels, tt.entry)
Expand Down Expand Up @@ -224,7 +225,7 @@ func TestValidator_ValidateLabels(t *testing.T) {
flagext.DefaultValues(l)
o, err := validation.NewOverrides(*l, tt.overrides)
assert.NoError(t, err)
v, err := NewValidator(o)
v, err := NewValidator(o, nil)
assert.NoError(t, err)

err = v.ValidateLabels(v.getValidationContextForTime(testTime, tt.userID), mustParseLabels(tt.labels), logproto.Stream{Labels: tt.labels})
Expand Down
Loading
Loading