Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support usage trackers for received and discarded bytes. #11840

Merged
merged 37 commits into from
Feb 24, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
154a6d1
Support custom trackers for received bytes.
jeschkies Jan 31, 2024
79fff6b
Implement custom trackers config
jeschkies Jan 31, 2024
fd1ff01
Note todos
jeschkies Feb 1, 2024
a787cf4
Fix test
jeschkies Feb 1, 2024
ac8ae3a
Move code block
jeschkies Feb 1, 2024
dcbd58e
Merge remote-tracking branch 'grafana/main' into karsten/custom-trackers
jeschkies Feb 1, 2024
d37343b
Start tracking discarded bytes
jeschkies Feb 1, 2024
d5be74f
Calrify todo
jeschkies Feb 2, 2024
77efea5
Change return type
jeschkies Feb 2, 2024
51cbaa4
Comment discarded bytes
jeschkies Feb 2, 2024
b0b4c7c
generate docs
jeschkies Feb 2, 2024
fe34b32
Update docs
jeschkies Feb 2, 2024
be4d2c8
Add changelog entry
jeschkies Feb 2, 2024
e15746c
Merge remote-tracking branch 'grafana/main' into karsten/custom-trackers
jeschkies Feb 2, 2024
659a9ef
Test push metrics
jeschkies Feb 2, 2024
90099ef
Return empty trackers instead of nil
jeschkies Feb 2, 2024
4f0697a
Test oltp push
jeschkies Feb 2, 2024
b6b47a5
Merge remote-tracking branch 'grafana/main' into karsten/custom-trackers
jeschkies Feb 19, 2024
457b002
Merge remote-tracking branch 'grafana/main' into karsten/custom-trackers
jeschkies Feb 20, 2024
4c3292d
Use label names intead of regex matchers
jeschkies Feb 20, 2024
ca417c2
correct doc
jeschkies Feb 20, 2024
ee947b3
Test unmarshalling
jeschkies Feb 20, 2024
e650cf0
Introduce custom tracker interface
jeschkies Feb 21, 2024
4ce9aaf
fix tests
jeschkies Feb 21, 2024
c57c493
Pass empty limits
jeschkies Feb 21, 2024
97a5af9
test tracker
jeschkies Feb 21, 2024
d077823
Merge remote-tracking branch 'grafana/main' into karsten/custom-trackers
jeschkies Feb 21, 2024
22e1d8e
Report on discarded stream bytes
jeschkies Feb 22, 2024
a9a17fa
Merge remote-tracking branch 'grafana/main' into karsten/custom-trackers
jeschkies Feb 22, 2024
c564f40
Document custom tracker structs and methods
jeschkies Feb 22, 2024
fede4cf
Rename tracker interface and methods
jeschkies Feb 22, 2024
9498628
Remove labels filtering and configuration.
jeschkies Feb 23, 2024
ab4f152
Rename tracker
jeschkies Feb 23, 2024
9df9532
Correct changelog
jeschkies Feb 23, 2024
c6a7cb4
Inject usage tracker
jeschkies Feb 23, 2024
d07ac85
Do not presist label string
jeschkies Feb 23, 2024
c01afd7
Correct push test
jeschkies Feb 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

##### Enhancements

* [11840](https://github.com/grafana/loki/pull/11840) **jeschkies**: Allow custom trackers for ingested and discarded bytes metric.
* [11814](https://github.com/grafana/loki/pull/11814) **kavirajk**: feat: Support split align and caching for instant metric query results
* [11851](https://github.com/grafana/loki/pull/11851) **elcomtik**: Helm: Allow the definition of resources for GrafanaAgent pods.
* [11819](https://github.com/grafana/loki/pull/11819) **jburnham**: Ruler: Add the ability to disable the `X-Scope-OrgId` tenant identification header in remote write requests.
Expand Down
2 changes: 1 addition & 1 deletion clients/pkg/promtail/targets/lokipush/pushtarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (t *PushTarget) run() error {
func (t *PushTarget) handleLoki(w http.ResponseWriter, r *http.Request) {
logger := util_log.WithContext(r.Context(), util_log.Logger)
userID, _ := tenant.TenantID(r.Context())
req, err := push.ParseRequest(logger, userID, r, nil, nil, push.ParseLokiRequest)
req, err := push.ParseRequest(logger, userID, r, nil, push.EmptyLimits{}, push.ParseLokiRequest, nil)
if err != nil {
level.Warn(t.logger).Log("msg", "failed to parse incoming push request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down
5 changes: 5 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3234,6 +3234,11 @@ otlp_config:
# Configuration for log attributes to store them as Structured Metadata or
# drop them altogether
[log_attributes: <list of attributes_configs>]

# Defines a set of custom trackers for ingested bytes.
# Takes a map of tracker names as keys and a list of label names as values. A
# tracker matches a stream if all labels are present.
custom_trackers:
```

### frontend_worker
Expand Down
39 changes: 31 additions & 8 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/grafana/loki/pkg/distributor/writefailures"
"github.com/grafana/loki/pkg/ingester"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/loghttp/push"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/runtime"
Expand Down Expand Up @@ -126,6 +127,8 @@ type Distributor struct {
ingesterAppendTimeouts *prometheus.CounterVec
replicationFactor prometheus.Gauge
streamShardCount prometheus.Counter

customStreamsTracker push.CustomStreamsTracker
}

// New a distributor creates.
Expand Down Expand Up @@ -337,7 +340,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
// Truncate first so subsequent steps have consistent line lengths
d.truncateLines(validationContext, &stream)

stream.Labels, stream.Hash, err = d.parseStreamLabels(validationContext, stream.Labels, &stream)
var lbs labels.Labels
lbs, stream.Labels, stream.Hash, err = d.parseStreamLabels(validationContext, stream.Labels, &stream)
if err != nil {
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
Expand All @@ -354,7 +358,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
pushSize := 0
prevTs := stream.Entries[0].Timestamp
for _, entry := range stream.Entries {
if err := d.validator.ValidateEntry(validationContext, stream.Labels, entry); err != nil {
if err := d.validator.ValidateEntry(validationContext, lbs, entry); err != nil {
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
continue
Expand Down Expand Up @@ -412,6 +416,24 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, tenantID).Add(float64(validatedLineCount))
validation.DiscardedBytes.WithLabelValues(validation.RateLimited, tenantID).Add(float64(validatedLineSize))

if d.customStreamsTracker != nil {
for _, stream := range req.Streams {
lbs, _, _, err := d.parseStreamLabels(validationContext, stream.Labels, &stream)
if err != nil {
continue
}

discardedStreamBytes := 0
for _, e := range stream.Entries {
discardedStreamBytes += len(e.Line)
}

for _, matchedLbs := range validationContext.customTrackerConfig.MatchTrackers(lbs) {
d.customStreamsTracker.DiscardedBytesAdd(tenantID, matchedLbs, float64(discardedStreamBytes))
}
}
}

err = fmt.Errorf(validation.RateLimitedErrorMsg, tenantID, int(d.ingestionRateLimiter.Limit(now, tenantID)), validatedLineCount, validatedLineSize)
d.writeFailuresManager.Log(tenantID, err)
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, err.Error())
Expand Down Expand Up @@ -684,30 +706,31 @@ func (d *Distributor) sendStreamsErr(ctx context.Context, ingester ring.Instance
}

type labelData struct {
ls labels.Labels
labels string
hash uint64
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we modify this struct to just have the ls? or do we call access the labels enough that ls.String() would be too expensive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's really weird. We use it to override stream.Labels. I think this is because the parsing also sorts the labels.


func (d *Distributor) parseStreamLabels(vContext validationContext, key string, stream *logproto.Stream) (string, uint64, error) {
func (d *Distributor) parseStreamLabels(vContext validationContext, key string, stream *logproto.Stream) (labels.Labels, string, uint64, error) {
if val, ok := d.labelCache.Get(key); ok {
labelVal := val.(labelData)
return labelVal.labels, labelVal.hash, nil
return labelVal.ls, labelVal.labels, labelVal.hash, nil
}

ls, err := syntax.ParseLabels(key)
if err != nil {
return "", 0, fmt.Errorf(validation.InvalidLabelsErrorMsg, key, err)
return nil, "", 0, fmt.Errorf(validation.InvalidLabelsErrorMsg, key, err)
}

if err := d.validator.ValidateLabels(vContext, ls, *stream); err != nil {
return "", 0, err
return nil, "", 0, err
}

lsVal := ls.String()
lsHash := ls.Hash()

d.labelCache.Add(key, labelData{lsVal, lsHash})
return lsVal, lsHash, nil
d.labelCache.Add(key, labelData{ls, lsVal, lsHash})
return ls, lsVal, lsHash, nil
}

// shardCountFor returns the right number of shards to be used by the given stream.
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ func Benchmark_SortLabelsOnPush(b *testing.B) {
for n := 0; n < b.N; n++ {
stream := request.Streams[0]
stream.Labels = `{buzz="f", a="b"}`
_, _, err := d.parseStreamLabels(vCtx, stream.Labels, &stream)
_, _, _, err := d.parseStreamLabels(vCtx, stream.Labels, &stream)
if err != nil {
panic("parseStreamLabels fail,err:" + err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, d.validator.Limits, pushRequestParser)
req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, d.validator.Limits, pushRequestParser, d.customStreamsTracker)
if err != nil {
if d.tenantConfigs.LogPushRequest(tenantID) {
level.Debug(logger).Log(
Expand Down
1 change: 1 addition & 0 deletions pkg/distributor/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ type Limits interface {
MaxStructuredMetadataSize(userID string) int
MaxStructuredMetadataCount(userID string) int
OTLPConfig(userID string) push.OTLPConfig
CustomTrackersConfig(userID string) *push.CustomTrackersConfig
}
39 changes: 37 additions & 2 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/loghttp/push"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/validation"
)
Expand All @@ -18,13 +19,14 @@ const (

type Validator struct {
Limits
customStreamsTracker push.CustomStreamsTracker
}

func NewValidator(l Limits) (*Validator, error) {
if l == nil {
return nil, errors.New("nil Limits")
}
return &Validator{l}, nil
return &Validator{l, nil}, nil
}

type validationContext struct {
Expand All @@ -46,6 +48,8 @@ type validationContext struct {
maxStructuredMetadataCount int

userID string

customTrackerConfig *push.CustomTrackersConfig
}

func (v Validator) getValidationContextForTime(now time.Time, userID string) validationContext {
Expand All @@ -63,11 +67,12 @@ func (v Validator) getValidationContextForTime(now time.Time, userID string) val
allowStructuredMetadata: v.AllowStructuredMetadata(userID),
maxStructuredMetadataSize: v.MaxStructuredMetadataSize(userID),
maxStructuredMetadataCount: v.MaxStructuredMetadataCount(userID),
customTrackerConfig: v.CustomTrackersConfig(userID),
}
}

// ValidateEntry returns an error if the entry is invalid and report metrics for invalid entries accordingly.
func (v Validator) ValidateEntry(ctx validationContext, labels string, entry logproto.Entry) error {
func (v Validator) ValidateEntry(ctx validationContext, labels labels.Labels, entry logproto.Entry) error {
ts := entry.Timestamp.UnixNano()
validation.LineLengthHist.Observe(float64(len(entry.Line)))

Expand All @@ -77,13 +82,23 @@ func (v Validator) ValidateEntry(ctx validationContext, labels string, entry log
formatedRejectMaxAgeTime := time.Unix(0, ctx.rejectOldSampleMaxAge).Format(timeFormat)
validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Add(float64(len(entry.Line)))
if v.customStreamsTracker != nil {
for _, matchedLbs := range ctx.customTrackerConfig.MatchTrackers(labels) {
v.customStreamsTracker.DiscardedBytesAdd(ctx.userID, matchedLbs, float64(len(entry.Line)))
}
}
return fmt.Errorf(validation.GreaterThanMaxSampleAgeErrorMsg, labels, formatedEntryTime, formatedRejectMaxAgeTime)
}

if ts > ctx.creationGracePeriod {
formatedEntryTime := entry.Timestamp.Format(timeFormat)
validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, ctx.userID).Add(float64(len(entry.Line)))
if v.customStreamsTracker != nil {
for _, matchedLbs := range ctx.customTrackerConfig.MatchTrackers(labels) {
v.customStreamsTracker.DiscardedBytesAdd(ctx.userID, matchedLbs, float64(len(entry.Line)))
}
}
return fmt.Errorf(validation.TooFarInFutureErrorMsg, labels, formatedEntryTime)
}

Expand All @@ -94,13 +109,23 @@ func (v Validator) ValidateEntry(ctx validationContext, labels string, entry log
// for parity.
validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, ctx.userID).Add(float64(len(entry.Line)))
if v.customStreamsTracker != nil {
for _, matchedLbs := range ctx.customTrackerConfig.MatchTrackers(labels) {
v.customStreamsTracker.DiscardedBytesAdd(ctx.userID, matchedLbs, float64(len(entry.Line)))
}
}
return fmt.Errorf(validation.LineTooLongErrorMsg, maxSize, labels, len(entry.Line))
}

if len(entry.StructuredMetadata) > 0 {
if !ctx.allowStructuredMetadata {
validation.DiscardedSamples.WithLabelValues(validation.DisallowedStructuredMetadata, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.DisallowedStructuredMetadata, ctx.userID).Add(float64(len(entry.Line)))
if v.customStreamsTracker != nil {
for _, matchedLbs := range ctx.customTrackerConfig.MatchTrackers(labels) {
v.customStreamsTracker.DiscardedBytesAdd(ctx.userID, matchedLbs, float64(len(entry.Line)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be useful to pass the reason why the bytes have been discarded

}
}
return fmt.Errorf(validation.DisallowedStructuredMetadataErrorMsg, labels)
}

Expand All @@ -113,12 +138,22 @@ func (v Validator) ValidateEntry(ctx validationContext, labels string, entry log
if maxSize := ctx.maxStructuredMetadataSize; maxSize != 0 && structuredMetadataSizeBytes > maxSize {
validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooLarge, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooLarge, ctx.userID).Add(float64(len(entry.Line)))
if v.customStreamsTracker != nil {
for _, matchedLbs := range ctx.customTrackerConfig.MatchTrackers(labels) {
v.customStreamsTracker.DiscardedBytesAdd(ctx.userID, matchedLbs, float64(len(entry.Line)))
}
}
return fmt.Errorf(validation.StructuredMetadataTooLargeErrorMsg, labels, structuredMetadataSizeBytes, ctx.maxStructuredMetadataSize)
}

if maxCount := ctx.maxStructuredMetadataCount; maxCount != 0 && structuredMetadataCount > maxCount {
validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooMany, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooMany, ctx.userID).Add(float64(len(entry.Line)))
if v.customStreamsTracker != nil {
for _, matchedLbs := range ctx.customTrackerConfig.MatchTrackers(labels) {
v.customStreamsTracker.DiscardedBytesAdd(ctx.userID, matchedLbs, float64(len(entry.Line)))
}
}
return fmt.Errorf(validation.StructuredMetadataTooManyErrorMsg, labels, structuredMetadataCount, ctx.maxStructuredMetadataCount)
}
}
Expand Down
17 changes: 9 additions & 8 deletions pkg/distributor/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ import (
)

var (
testStreamLabels = "FIXME"
testTime = time.Now()
testStreamLabels = labels.Labels{{Name: "my", Value: "label"}}
testStreamLabelsString = testStreamLabels.String()
testTime = time.Now()
)

type fakeLimits struct {
Expand Down Expand Up @@ -61,7 +62,7 @@ func TestValidator_ValidateEntry(t *testing.T) {
},
logproto.Entry{Timestamp: testTime.Add(-time.Hour * 5), Line: "test"},
fmt.Errorf(validation.GreaterThanMaxSampleAgeErrorMsg,
testStreamLabels,
testStreamLabelsString,
testTime.Add(-time.Hour*5).Format(timeFormat),
testTime.Add(-1*time.Hour).Format(timeFormat), // same as RejectOldSamplesMaxAge
),
Expand All @@ -71,7 +72,7 @@ func TestValidator_ValidateEntry(t *testing.T) {
"test",
nil,
logproto.Entry{Timestamp: testTime.Add(time.Hour * 5), Line: "test"},
fmt.Errorf(validation.TooFarInFutureErrorMsg, testStreamLabels, testTime.Add(time.Hour*5).Format(timeFormat)),
fmt.Errorf(validation.TooFarInFutureErrorMsg, testStreamLabelsString, testTime.Add(time.Hour*5).Format(timeFormat)),
},
{
"line too long",
Expand All @@ -82,7 +83,7 @@ func TestValidator_ValidateEntry(t *testing.T) {
},
},
logproto.Entry{Timestamp: testTime, Line: "12345678901"},
fmt.Errorf(validation.LineTooLongErrorMsg, 10, testStreamLabels, 11),
fmt.Errorf(validation.LineTooLongErrorMsg, 10, testStreamLabelsString, 11),
},
{
"disallowed structured metadata",
Expand All @@ -93,7 +94,7 @@ func TestValidator_ValidateEntry(t *testing.T) {
},
},
logproto.Entry{Timestamp: testTime, Line: "12345678901", StructuredMetadata: push.LabelsAdapter{{Name: "foo", Value: "bar"}}},
fmt.Errorf(validation.DisallowedStructuredMetadataErrorMsg, testStreamLabels),
fmt.Errorf(validation.DisallowedStructuredMetadataErrorMsg, testStreamLabelsString),
},
{
"structured metadata too big",
Expand All @@ -105,7 +106,7 @@ func TestValidator_ValidateEntry(t *testing.T) {
},
},
logproto.Entry{Timestamp: testTime, Line: "12345678901", StructuredMetadata: push.LabelsAdapter{{Name: "foo", Value: "bar"}}},
fmt.Errorf(validation.StructuredMetadataTooLargeErrorMsg, testStreamLabels, 6, 4),
fmt.Errorf(validation.StructuredMetadataTooLargeErrorMsg, testStreamLabelsString, 6, 4),
},
{
"structured metadata too many",
Expand All @@ -117,7 +118,7 @@ func TestValidator_ValidateEntry(t *testing.T) {
},
},
logproto.Entry{Timestamp: testTime, Line: "12345678901", StructuredMetadata: push.LabelsAdapter{{Name: "foo", Value: "bar"}, {Name: "too", Value: "many"}}},
fmt.Errorf(validation.StructuredMetadataTooManyErrorMsg, testStreamLabels, 2, 1),
fmt.Errorf(validation.StructuredMetadataTooManyErrorMsg, testStreamLabelsString, 2, 1),
},
}
for _, tt := range tests {
Expand Down
Loading
Loading