diff --git a/CHANGELOG.md b/CHANGELOG.md index fa8861228407f..fb09c05dded79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ ##### Enhancements +* [11840](https://github.com/grafana/loki/pull/11840) **jeschkies**: Allow custom usage trackers for ingested and discarded bytes metric. * [11814](https://github.com/grafana/loki/pull/11814) **kavirajk**: feat: Support split align and caching for instant metric query results * [11851](https://github.com/grafana/loki/pull/11851) **elcomtik**: Helm: Allow the definition of resources for GrafanaAgent pods. * [11819](https://github.com/grafana/loki/pull/11819) **jburnham**: Ruler: Add the ability to disable the `X-Scope-OrgId` tenant identification header in remote write requests. diff --git a/clients/pkg/promtail/targets/lokipush/pushtarget.go b/clients/pkg/promtail/targets/lokipush/pushtarget.go index c981de0de3dda..88c7859bd36e5 100644 --- a/clients/pkg/promtail/targets/lokipush/pushtarget.go +++ b/clients/pkg/promtail/targets/lokipush/pushtarget.go @@ -111,7 +111,7 @@ func (t *PushTarget) run() error { func (t *PushTarget) handleLoki(w http.ResponseWriter, r *http.Request) { logger := util_log.WithContext(r.Context(), util_log.Logger) userID, _ := tenant.TenantID(r.Context()) - req, err := push.ParseRequest(logger, userID, r, nil, nil, push.ParseLokiRequest) + req, err := push.ParseRequest(logger, userID, r, nil, push.EmptyLimits{}, push.ParseLokiRequest, nil) if err != nil { level.Warn(t.logger).Log("msg", "failed to parse incoming push request", "err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index 40eec568247fb..da3c70d81b893 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -194,7 +194,7 @@ func (c *Compactor) ownsTenant(tenant string) ([]v1.FingerprintBounds, bool, err return nil, false, nil } - // TOOD(owen-d): use .GetTokenRangesForInstance() + // TODO(owen-d): use .GetTokenRangesForInstance() // when it's supported for non zone-aware rings // instead of doing all this manually diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index f47148fa42b0d..53ff20ed9274e 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -39,6 +39,7 @@ import ( "github.com/grafana/loki/pkg/distributor/writefailures" "github.com/grafana/loki/pkg/ingester" "github.com/grafana/loki/pkg/ingester/client" + "github.com/grafana/loki/pkg/loghttp/push" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/runtime" @@ -126,6 +127,8 @@ type Distributor struct { ingesterAppendTimeouts *prometheus.CounterVec replicationFactor prometheus.Gauge streamShardCount prometheus.Counter + + usageTracker push.UsageTracker } // New a distributor creates. @@ -138,6 +141,7 @@ func New( registerer prometheus.Registerer, metricsNamespace string, tee Tee, + usageTracker push.UsageTracker, logger log.Logger, ) (*Distributor, error) { factory := cfg.factory @@ -153,7 +157,7 @@ func New( return client.New(internalCfg, addr) } - validator, err := NewValidator(overrides) + validator, err := NewValidator(overrides, usageTracker) if err != nil { return nil, err } @@ -185,6 +189,7 @@ func New( healthyInstancesCount: atomic.NewUint32(0), rateLimitStrat: rateLimitStrat, tee: tee, + usageTracker: usageTracker, ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Namespace: constants.Loki, Name: "distributor_ingester_appends_total", @@ -337,7 +342,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log // Truncate first so subsequent steps have consistent line lengths d.truncateLines(validationContext, &stream) - stream.Labels, stream.Hash, err = d.parseStreamLabels(validationContext, stream.Labels, &stream) + var lbs labels.Labels + lbs, stream.Labels, stream.Hash, err = d.parseStreamLabels(validationContext, stream.Labels, &stream) if err != nil { d.writeFailuresManager.Log(tenantID, err) validationErrors.Add(err) @@ -354,7 +360,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log pushSize := 0 prevTs := stream.Entries[0].Timestamp for _, entry := range stream.Entries { - if err := d.validator.ValidateEntry(validationContext, stream.Labels, entry); err != nil { + if err := d.validator.ValidateEntry(validationContext, lbs, entry); err != nil { d.writeFailuresManager.Log(tenantID, err) validationErrors.Add(err) continue @@ -412,6 +418,24 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log validation.DiscardedSamples.WithLabelValues(validation.RateLimited, tenantID).Add(float64(validatedLineCount)) validation.DiscardedBytes.WithLabelValues(validation.RateLimited, tenantID).Add(float64(validatedLineSize)) + if d.usageTracker != nil { + for _, stream := range req.Streams { + lbs, _, _, err := d.parseStreamLabels(validationContext, stream.Labels, &stream) + if err != nil { + continue + } + + discardedStreamBytes := 0 + for _, e := range stream.Entries { + discardedStreamBytes += len(e.Line) + } + + if d.usageTracker != nil { + d.usageTracker.DiscardedBytesAdd(tenantID, validation.RateLimited, lbs, float64(discardedStreamBytes)) + } + } + } + err = fmt.Errorf(validation.RateLimitedErrorMsg, tenantID, int(d.ingestionRateLimiter.Limit(now, tenantID)), validatedLineCount, validatedLineSize) d.writeFailuresManager.Log(tenantID, err) return nil, httpgrpc.Errorf(http.StatusTooManyRequests, err.Error()) @@ -684,30 +708,29 @@ func (d *Distributor) sendStreamsErr(ctx context.Context, ingester ring.Instance } type labelData struct { - labels string - hash uint64 + ls labels.Labels + hash uint64 } -func (d *Distributor) parseStreamLabels(vContext validationContext, key string, stream *logproto.Stream) (string, uint64, error) { +func (d *Distributor) parseStreamLabels(vContext validationContext, key string, stream *logproto.Stream) (labels.Labels, string, uint64, error) { if val, ok := d.labelCache.Get(key); ok { labelVal := val.(labelData) - return labelVal.labels, labelVal.hash, nil + return labelVal.ls, labelVal.ls.String(), labelVal.hash, nil } ls, err := syntax.ParseLabels(key) if err != nil { - return "", 0, fmt.Errorf(validation.InvalidLabelsErrorMsg, key, err) + return nil, "", 0, fmt.Errorf(validation.InvalidLabelsErrorMsg, key, err) } if err := d.validator.ValidateLabels(vContext, ls, *stream); err != nil { - return "", 0, err + return nil, "", 0, err } - lsVal := ls.String() lsHash := ls.Hash() - d.labelCache.Add(key, labelData{lsVal, lsHash}) - return lsVal, lsHash, nil + d.labelCache.Add(key, labelData{ls, lsHash}) + return ls, ls.String(), lsHash, nil } // shardCountFor returns the right number of shards to be used by the given stream. diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 75e3a6e786700..04747ffb72334 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -612,7 +612,7 @@ func TestStreamShard(t *testing.T) { overrides, err := validation.NewOverrides(*distributorLimits, nil) require.NoError(t, err) - validator, err := NewValidator(overrides) + validator, err := NewValidator(overrides, nil) require.NoError(t, err) d := Distributor{ @@ -656,7 +656,7 @@ func TestStreamShardAcrossCalls(t *testing.T) { overrides, err := validation.NewOverrides(*distributorLimits, nil) require.NoError(t, err) - validator, err := NewValidator(overrides) + validator, err := NewValidator(overrides, nil) require.NoError(t, err) t.Run("it generates 4 shards across 2 calls when calculated shards = 2 * entries per call", func(t *testing.T) { @@ -721,7 +721,7 @@ func BenchmarkShardStream(b *testing.B) { overrides, err := validation.NewOverrides(*distributorLimits, nil) require.NoError(b, err) - validator, err := NewValidator(overrides) + validator, err := NewValidator(overrides, nil) require.NoError(b, err) distributorBuilder := func(shards int) *Distributor { @@ -788,7 +788,7 @@ func Benchmark_SortLabelsOnPush(b *testing.B) { for n := 0; n < b.N; n++ { stream := request.Streams[0] stream.Labels = `{buzz="f", a="b"}` - _, _, err := d.parseStreamLabels(vCtx, stream.Labels, &stream) + _, _, _, err := d.parseStreamLabels(vCtx, stream.Labels, &stream) if err != nil { panic("parseStreamLabels fail,err:" + err.Error()) } @@ -1159,7 +1159,7 @@ func prepare(t *testing.T, numDistributors, numIngesters int, limits *validation overrides, err := validation.NewOverrides(*limits, nil) require.NoError(t, err) - d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, overrides, prometheus.NewPedanticRegistry(), constants.Loki, nil, log.NewNopLogger()) + d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, overrides, prometheus.NewPedanticRegistry(), constants.Loki, nil, nil, log.NewNopLogger()) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), d)) distributors[i] = d diff --git a/pkg/distributor/http.go b/pkg/distributor/http.go index ce242355e077b..d2582f027f9b9 100644 --- a/pkg/distributor/http.go +++ b/pkg/distributor/http.go @@ -34,7 +34,7 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe http.Error(w, err.Error(), http.StatusBadRequest) return } - req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, d.validator.Limits, pushRequestParser) + req, err := push.ParseRequest(logger, tenantID, r, d.tenantsRetention, d.validator.Limits, pushRequestParser, d.usageTracker) if err != nil { if d.tenantConfigs.LogPushRequest(tenantID) { level.Debug(logger).Log( diff --git a/pkg/distributor/validator.go b/pkg/distributor/validator.go index 7fe76fae78231..f1f2e4acb0ea6 100644 --- a/pkg/distributor/validator.go +++ b/pkg/distributor/validator.go @@ -8,6 +8,7 @@ import ( "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/pkg/loghttp/push" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/validation" ) @@ -18,13 +19,14 @@ const ( type Validator struct { Limits + usageTracker push.UsageTracker } -func NewValidator(l Limits) (*Validator, error) { +func NewValidator(l Limits, t push.UsageTracker) (*Validator, error) { if l == nil { return nil, errors.New("nil Limits") } - return &Validator{l}, nil + return &Validator{l, t}, nil } type validationContext struct { @@ -67,7 +69,7 @@ func (v Validator) getValidationContextForTime(now time.Time, userID string) val } // ValidateEntry returns an error if the entry is invalid and report metrics for invalid entries accordingly. -func (v Validator) ValidateEntry(ctx validationContext, labels string, entry logproto.Entry) error { +func (v Validator) ValidateEntry(ctx validationContext, labels labels.Labels, entry logproto.Entry) error { ts := entry.Timestamp.UnixNano() validation.LineLengthHist.Observe(float64(len(entry.Line))) @@ -77,6 +79,9 @@ func (v Validator) ValidateEntry(ctx validationContext, labels string, entry log formatedRejectMaxAgeTime := time.Unix(0, ctx.rejectOldSampleMaxAge).Format(timeFormat) validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Inc() validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Add(float64(len(entry.Line))) + if v.usageTracker != nil { + v.usageTracker.DiscardedBytesAdd(ctx.userID, validation.GreaterThanMaxSampleAge, labels, float64(len(entry.Line))) + } return fmt.Errorf(validation.GreaterThanMaxSampleAgeErrorMsg, labels, formatedEntryTime, formatedRejectMaxAgeTime) } @@ -84,6 +89,9 @@ func (v Validator) ValidateEntry(ctx validationContext, labels string, entry log formatedEntryTime := entry.Timestamp.Format(timeFormat) validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, ctx.userID).Inc() validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, ctx.userID).Add(float64(len(entry.Line))) + if v.usageTracker != nil { + v.usageTracker.DiscardedBytesAdd(ctx.userID, validation.TooFarInFuture, labels, float64(len(entry.Line))) + } return fmt.Errorf(validation.TooFarInFutureErrorMsg, labels, formatedEntryTime) } @@ -94,6 +102,9 @@ func (v Validator) ValidateEntry(ctx validationContext, labels string, entry log // for parity. validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, ctx.userID).Inc() validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, ctx.userID).Add(float64(len(entry.Line))) + if v.usageTracker != nil { + v.usageTracker.DiscardedBytesAdd(ctx.userID, validation.LineTooLong, labels, float64(len(entry.Line))) + } return fmt.Errorf(validation.LineTooLongErrorMsg, maxSize, labels, len(entry.Line)) } @@ -101,6 +112,9 @@ func (v Validator) ValidateEntry(ctx validationContext, labels string, entry log if !ctx.allowStructuredMetadata { validation.DiscardedSamples.WithLabelValues(validation.DisallowedStructuredMetadata, ctx.userID).Inc() validation.DiscardedBytes.WithLabelValues(validation.DisallowedStructuredMetadata, ctx.userID).Add(float64(len(entry.Line))) + if v.usageTracker != nil { + v.usageTracker.DiscardedBytesAdd(ctx.userID, validation.DisallowedStructuredMetadata, labels, float64(len(entry.Line))) + } return fmt.Errorf(validation.DisallowedStructuredMetadataErrorMsg, labels) } @@ -113,12 +127,18 @@ func (v Validator) ValidateEntry(ctx validationContext, labels string, entry log if maxSize := ctx.maxStructuredMetadataSize; maxSize != 0 && structuredMetadataSizeBytes > maxSize { validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooLarge, ctx.userID).Inc() validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooLarge, ctx.userID).Add(float64(len(entry.Line))) + if v.usageTracker != nil { + v.usageTracker.DiscardedBytesAdd(ctx.userID, validation.StructuredMetadataTooLarge, labels, float64(len(entry.Line))) + } return fmt.Errorf(validation.StructuredMetadataTooLargeErrorMsg, labels, structuredMetadataSizeBytes, ctx.maxStructuredMetadataSize) } if maxCount := ctx.maxStructuredMetadataCount; maxCount != 0 && structuredMetadataCount > maxCount { validation.DiscardedSamples.WithLabelValues(validation.StructuredMetadataTooMany, ctx.userID).Inc() validation.DiscardedBytes.WithLabelValues(validation.StructuredMetadataTooMany, ctx.userID).Add(float64(len(entry.Line))) + if v.usageTracker != nil { + v.usageTracker.DiscardedBytesAdd(ctx.userID, validation.StructuredMetadataTooMany, labels, float64(len(entry.Line))) + } return fmt.Errorf(validation.StructuredMetadataTooManyErrorMsg, labels, structuredMetadataCount, ctx.maxStructuredMetadataCount) } } diff --git a/pkg/distributor/validator_test.go b/pkg/distributor/validator_test.go index 038f1dc4c5b78..0c37065e30563 100644 --- a/pkg/distributor/validator_test.go +++ b/pkg/distributor/validator_test.go @@ -18,8 +18,9 @@ import ( ) var ( - testStreamLabels = "FIXME" - testTime = time.Now() + testStreamLabels = labels.Labels{{Name: "my", Value: "label"}} + testStreamLabelsString = testStreamLabels.String() + testTime = time.Now() ) type fakeLimits struct { @@ -61,7 +62,7 @@ func TestValidator_ValidateEntry(t *testing.T) { }, logproto.Entry{Timestamp: testTime.Add(-time.Hour * 5), Line: "test"}, fmt.Errorf(validation.GreaterThanMaxSampleAgeErrorMsg, - testStreamLabels, + testStreamLabelsString, testTime.Add(-time.Hour*5).Format(timeFormat), testTime.Add(-1*time.Hour).Format(timeFormat), // same as RejectOldSamplesMaxAge ), @@ -71,7 +72,7 @@ func TestValidator_ValidateEntry(t *testing.T) { "test", nil, logproto.Entry{Timestamp: testTime.Add(time.Hour * 5), Line: "test"}, - fmt.Errorf(validation.TooFarInFutureErrorMsg, testStreamLabels, testTime.Add(time.Hour*5).Format(timeFormat)), + fmt.Errorf(validation.TooFarInFutureErrorMsg, testStreamLabelsString, testTime.Add(time.Hour*5).Format(timeFormat)), }, { "line too long", @@ -82,7 +83,7 @@ func TestValidator_ValidateEntry(t *testing.T) { }, }, logproto.Entry{Timestamp: testTime, Line: "12345678901"}, - fmt.Errorf(validation.LineTooLongErrorMsg, 10, testStreamLabels, 11), + fmt.Errorf(validation.LineTooLongErrorMsg, 10, testStreamLabelsString, 11), }, { "disallowed structured metadata", @@ -93,7 +94,7 @@ func TestValidator_ValidateEntry(t *testing.T) { }, }, logproto.Entry{Timestamp: testTime, Line: "12345678901", StructuredMetadata: push.LabelsAdapter{{Name: "foo", Value: "bar"}}}, - fmt.Errorf(validation.DisallowedStructuredMetadataErrorMsg, testStreamLabels), + fmt.Errorf(validation.DisallowedStructuredMetadataErrorMsg, testStreamLabelsString), }, { "structured metadata too big", @@ -105,7 +106,7 @@ func TestValidator_ValidateEntry(t *testing.T) { }, }, logproto.Entry{Timestamp: testTime, Line: "12345678901", StructuredMetadata: push.LabelsAdapter{{Name: "foo", Value: "bar"}}}, - fmt.Errorf(validation.StructuredMetadataTooLargeErrorMsg, testStreamLabels, 6, 4), + fmt.Errorf(validation.StructuredMetadataTooLargeErrorMsg, testStreamLabelsString, 6, 4), }, { "structured metadata too many", @@ -117,7 +118,7 @@ func TestValidator_ValidateEntry(t *testing.T) { }, }, logproto.Entry{Timestamp: testTime, Line: "12345678901", StructuredMetadata: push.LabelsAdapter{{Name: "foo", Value: "bar"}, {Name: "too", Value: "many"}}}, - fmt.Errorf(validation.StructuredMetadataTooManyErrorMsg, testStreamLabels, 2, 1), + fmt.Errorf(validation.StructuredMetadataTooManyErrorMsg, testStreamLabelsString, 2, 1), }, } for _, tt := range tests { @@ -126,7 +127,7 @@ func TestValidator_ValidateEntry(t *testing.T) { flagext.DefaultValues(l) o, err := validation.NewOverrides(*l, tt.overrides) assert.NoError(t, err) - v, err := NewValidator(o) + v, err := NewValidator(o, nil) assert.NoError(t, err) err = v.ValidateEntry(v.getValidationContextForTime(testTime, tt.userID), testStreamLabels, tt.entry) @@ -224,7 +225,7 @@ func TestValidator_ValidateLabels(t *testing.T) { flagext.DefaultValues(l) o, err := validation.NewOverrides(*l, tt.overrides) assert.NoError(t, err) - v, err := NewValidator(o) + v, err := NewValidator(o, nil) assert.NoError(t, err) err = v.ValidateLabels(v.getValidationContextForTime(testTime, tt.userID), mustParseLabels(tt.labels), logproto.Stream{Labels: tt.labels}) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index f29628d85eeb8..c7953ea1aba1e 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -30,6 +30,7 @@ import ( "github.com/grafana/loki/pkg/ingester/index" "github.com/grafana/loki/pkg/ingester/wal" "github.com/grafana/loki/pkg/iter" + "github.com/grafana/loki/pkg/loghttp/push" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logql/log" @@ -119,6 +120,8 @@ type instance struct { writeFailures *writefailures.Manager schemaconfig *config.SchemaConfig + + customStreamsTracker push.UsageTracker } func newInstance( @@ -262,6 +265,20 @@ func (i *instance) createStream(pushReqStream logproto.Stream, record *wal.Recor // record is only nil when replaying WAL. We don't want to drop data when replaying a WAL after // reducing the stream limits, for instance. var err error + + labels, err := syntax.ParseLabels(pushReqStream.Labels) + if err != nil { + if i.configs.LogStreamCreation(i.instanceID) { + level.Debug(util_log.Logger).Log( + "msg", "failed to create stream, failed to parse labels", + "org_id", i.instanceID, + "err", err, + "stream", pushReqStream.Labels, + ) + } + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + if record != nil { err = i.limiter.AssertMaxStreamsPerUser(i.instanceID, i.streams.Len()) } @@ -282,21 +299,12 @@ func (i *instance) createStream(pushReqStream logproto.Stream, record *wal.Recor bytes += len(e.Line) } validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes)) + if i.customStreamsTracker != nil { + i.customStreamsTracker.DiscardedBytesAdd(i.instanceID, validation.StreamLimit, labels, float64(bytes)) + } return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg, i.instanceID) } - labels, err := syntax.ParseLabels(pushReqStream.Labels) - if err != nil { - if i.configs.LogStreamCreation(i.instanceID) { - level.Debug(util_log.Logger).Log( - "msg", "failed to create stream, failed to parse labels", - "org_id", i.instanceID, - "err", err, - "stream", pushReqStream.Labels, - ) - } - return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) - } fp := i.getHashForLabels(labels) sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp) diff --git a/pkg/loghttp/push/otlp.go b/pkg/loghttp/push/otlp.go index c25477a984e24..cb73f6db59eec 100644 --- a/pkg/loghttp/push/otlp.go +++ b/pkg/loghttp/push/otlp.go @@ -43,14 +43,14 @@ func newPushStats() *Stats { } } -func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits) (*logproto.PushRequest, *Stats, error) { +func ParseOTLPRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) { stats := newPushStats() otlpLogs, err := extractLogs(r, stats) if err != nil { return nil, nil, err } - req := otlpToLokiPushRequest(otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), stats) + req := otlpToLokiPushRequest(otlpLogs, userID, tenantsRetention, limits.OTLPConfig(userID), tracker, stats) return req, stats, nil } @@ -101,7 +101,7 @@ func extractLogs(r *http.Request, pushStats *Stats) (plog.Logs, error) { return req.Logs(), nil } -func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, stats *Stats) *logproto.PushRequest { +func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention TenantsRetention, otlpConfig OTLPConfig, tracker UsageTracker, stats *Stats) *logproto.PushRequest { if ld.LogRecordCount() == 0 { return &logproto.PushRequest{} } @@ -145,6 +145,7 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants labelsStr := streamLabels.String() lbs := modelLabelsSetToLabelsList(streamLabels) + if _, ok := pushRequestsByStream[labelsStr]; !ok { pushRequestsByStream[labelsStr] = logproto.Stream{ Labels: labelsStr, @@ -223,8 +224,15 @@ func otlpToLokiPushRequest(ld plog.Logs, userID string, tenantsRetention Tenants stream.Entries = append(stream.Entries, entry) pushRequestsByStream[labelsStr] = stream - stats.structuredMetadataBytes[tenantsRetention.RetentionPeriodFor(userID, lbs)] += int64(labelsSize(entry.StructuredMetadata) - resourceAttributesAsStructuredMetadataSize - scopeAttributesAsStructuredMetadataSize) + metadataSize := int64(labelsSize(entry.StructuredMetadata) - resourceAttributesAsStructuredMetadataSize - scopeAttributesAsStructuredMetadataSize) + stats.structuredMetadataBytes[tenantsRetention.RetentionPeriodFor(userID, lbs)] += metadataSize stats.logLinesBytes[tenantsRetention.RetentionPeriodFor(userID, lbs)] += int64(len(entry.Line)) + + if tracker != nil { + tracker.ReceivedBytesAdd(userID, tenantsRetention.RetentionPeriodFor(userID, lbs), lbs, float64(len(entry.Line))) + tracker.ReceivedBytesAdd(userID, tenantsRetention.RetentionPeriodFor(userID, lbs), lbs, float64(metadataSize)) + } + stats.numLines++ if entry.Timestamp.After(stats.mostRecentEntryTimestamp) { stats.mostRecentEntryTimestamp = entry.Timestamp diff --git a/pkg/loghttp/push/otlp_test.go b/pkg/loghttp/push/otlp_test.go index badb6cd000e4b..593ac380e6696 100644 --- a/pkg/loghttp/push/otlp_test.go +++ b/pkg/loghttp/push/otlp_test.go @@ -25,6 +25,7 @@ func TestOTLPToLokiPushRequest(t *testing.T) { expectedPushRequest logproto.PushRequest expectedStats Stats otlpConfig OTLPConfig + tracker UsageTracker }{ { name: "no logs", @@ -121,6 +122,7 @@ func TestOTLPToLokiPushRequest(t *testing.T) { { name: "service.name not defined in resource attributes", otlpConfig: DefaultOTLPConfig, + tracker: NewMockTracker(), generateLogs: func() plog.Logs { ld := plog.NewLogs() ld.ResourceLogs().AppendEmpty().Resource().Attributes().PutStr("service.namespace", "foo") @@ -152,7 +154,32 @@ func TestOTLPToLokiPushRequest(t *testing.T) { }, streamLabelsSize: 47, mostRecentEntryTimestamp: now, + /* + logLinesBytesCustomTrackers: []customTrackerPair{ + { + Labels: []labels.Label{ + {Name: "service_namespace", Value: "foo"}, + {Name: "tracker", Value: "foo"}, + }, + Bytes: map[time.Duration]int64{ + time.Hour: 9, + }, + }, + }, + structuredMetadataBytesCustomTrackers: []customTrackerPair{ + { + Labels: []labels.Label{ + {Name: "service_namespace", Value: "foo"}, + {Name: "tracker", Value: "foo"}, + }, + Bytes: map[time.Duration]int64{ + time.Hour: 0, + }, + }, + }, + */ }, + //expectedTrackedUsaged: }, { name: "resource attributes and scope attributes stored as structured metadata", @@ -459,7 +486,7 @@ func TestOTLPToLokiPushRequest(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { stats := newPushStats() - pushReq := otlpToLokiPushRequest(tc.generateLogs(), "foo", fakeRetention{}, tc.otlpConfig, stats) + pushReq := otlpToLokiPushRequest(tc.generateLogs(), "foo", fakeRetention{}, tc.otlpConfig, tc.tracker, stats) require.Equal(t, tc.expectedPushRequest, *pushReq) require.Equal(t, tc.expectedStats, *stats) }) diff --git a/pkg/loghttp/push/push.go b/pkg/loghttp/push/push.go index 15b7bba0a78c9..012a70386bd76 100644 --- a/pkg/loghttp/push/push.go +++ b/pkg/loghttp/push/push.go @@ -36,6 +36,7 @@ var ( Name: "distributor_bytes_received_total", Help: "The total number of uncompressed bytes received per tenant. Includes structured metadata bytes.", }, []string{"tenant", "retention_hours"}) + structuredMetadataBytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: constants.Loki, Name: "distributor_structured_metadata_bytes_received_total", @@ -62,7 +63,13 @@ type Limits interface { OTLPConfig(userID string) OTLPConfig } -type RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits) (*logproto.PushRequest, *Stats, error) +type EmptyLimits struct{} + +func (EmptyLimits) OTLPConfig(string) OTLPConfig { + return DefaultOTLPConfig +} + +type RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) type Stats struct { errs []error @@ -76,8 +83,8 @@ type Stats struct { bodySize int64 } -func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, pushRequestParser RequestParser) (*logproto.PushRequest, error) { - req, pushStats, err := pushRequestParser(userID, r, tenantsRetention, limits) +func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, pushRequestParser RequestParser, tracker UsageTracker) (*logproto.PushRequest, error) { + req, pushStats, err := pushRequestParser(userID, r, tenantsRetention, limits, tracker) if err != nil { return nil, err } @@ -87,10 +94,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete structuredMetadataSize int64 ) for retentionPeriod, size := range pushStats.logLinesBytes { - var retentionHours string - if retentionPeriod > 0 { - retentionHours = fmt.Sprintf("%d", int64(math.Floor(retentionPeriod.Hours()))) - } + retentionHours := retentionPeriodToString(retentionPeriod) bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(size)) bytesReceivedStats.Inc(size) @@ -98,10 +102,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete } for retentionPeriod, size := range pushStats.structuredMetadataBytes { - var retentionHours string - if retentionPeriod > 0 { - retentionHours = fmt.Sprintf("%d", int64(math.Floor(retentionPeriod.Hours()))) - } + retentionHours := retentionPeriodToString(retentionPeriod) structuredMetadataBytesIngested.WithLabelValues(userID, retentionHours).Add(float64(size)) bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(size)) @@ -135,7 +136,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete return req, nil } -func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, _ Limits) (*logproto.PushRequest, *Stats, error) { +func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, _ Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) { // Body var body io.Reader // bodySize should always reflect the compressed size of the request body @@ -206,12 +207,17 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe for _, s := range req.Streams { pushStats.streamLabelsSize += int64(len(s.Labels)) - var retentionPeriod time.Duration - if tenantsRetention != nil { - lbs, err := syntax.ParseLabels(s.Labels) + + var lbs labels.Labels + if tenantsRetention != nil || tracker != nil { + lbs, err = syntax.ParseLabels(s.Labels) if err != nil { return nil, nil, fmt.Errorf("couldn't parse labels: %w", err) } + } + + var retentionPeriod time.Duration + if tenantsRetention != nil { retentionPeriod = tenantsRetention.RetentionPeriodFor(userID, lbs) } for _, e := range s.Entries { @@ -222,6 +228,12 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe } pushStats.logLinesBytes[retentionPeriod] += int64(len(e.Line)) pushStats.structuredMetadataBytes[retentionPeriod] += entryLabelsSize + + if tracker != nil { + tracker.ReceivedBytesAdd(userID, retentionPeriod, lbs, float64(len(e.Line))) + tracker.ReceivedBytesAdd(userID, retentionPeriod, lbs, float64(entryLabelsSize)) + } + if e.Timestamp.After(pushStats.mostRecentEntryTimestamp) { pushStats.mostRecentEntryTimestamp = e.Timestamp } @@ -230,3 +242,11 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe return &req, pushStats, nil } + +func retentionPeriodToString(retentionPeriod time.Duration) string { + var retentionHours string + if retentionPeriod > 0 { + retentionHours = fmt.Sprintf("%d", int64(math.Floor(retentionPeriod.Hours()))) + } + return retentionHours +} diff --git a/pkg/loghttp/push/push_test.go b/pkg/loghttp/push/push_test.go index fa1e2fb28d115..ec4fd8c8f818a 100644 --- a/pkg/loghttp/push/push_test.go +++ b/pkg/loghttp/push/push_test.go @@ -9,8 +9,10 @@ import ( "net/http/httptest" "strings" "testing" + "time" "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -54,6 +56,7 @@ func TestParseRequest(t *testing.T) { expectedStructuredMetadataBytes int expectedBytes int expectedLines int + expectedBytesUsageTracker map[string]float64 }{ { path: `/loki/api/v1/push`, @@ -68,21 +71,23 @@ func TestParseRequest(t *testing.T) { valid: false, }, { - path: `/loki/api/v1/push`, - body: `{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`, - contentType: `application/json`, - valid: true, - expectedBytes: len("fizzbuzz"), - expectedLines: 1, + path: `/loki/api/v1/push`, + body: `{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`, + contentType: `application/json`, + valid: true, + expectedBytes: len("fizzbuzz"), + expectedLines: 1, + expectedBytesUsageTracker: map[string]float64{`{foo="bar2"}`: float64(len("fizzbuss"))}, }, { - path: `/loki/api/v1/push`, - body: `{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`, - contentType: `application/json`, - contentEncoding: ``, - valid: true, - expectedBytes: len("fizzbuzz"), - expectedLines: 1, + path: `/loki/api/v1/push`, + body: `{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`, + contentType: `application/json`, + contentEncoding: ``, + valid: true, + expectedBytes: len("fizzbuzz"), + expectedLines: 1, + expectedBytesUsageTracker: map[string]float64{`{foo="bar2"}`: float64(len("fizzbuss"))}, }, { path: `/loki/api/v1/push`, @@ -92,22 +97,24 @@ func TestParseRequest(t *testing.T) { valid: false, }, { - path: `/loki/api/v1/push`, - body: gzipString(`{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`), - contentType: `application/json`, - contentEncoding: `gzip`, - valid: true, - expectedBytes: len("fizzbuzz"), - expectedLines: 1, + path: `/loki/api/v1/push`, + body: gzipString(`{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`), + contentType: `application/json`, + contentEncoding: `gzip`, + valid: true, + expectedBytes: len("fizzbuzz"), + expectedLines: 1, + expectedBytesUsageTracker: map[string]float64{`{foo="bar2"}`: float64(len("fizzbuss"))}, }, { - path: `/loki/api/v1/push`, - body: deflateString(`{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`), - contentType: `application/json`, - contentEncoding: `deflate`, - valid: true, - expectedBytes: len("fizzbuzz"), - expectedLines: 1, + path: `/loki/api/v1/push`, + body: deflateString(`{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`), + contentType: `application/json`, + contentEncoding: `deflate`, + valid: true, + expectedBytes: len("fizzbuzz"), + expectedLines: 1, + expectedBytesUsageTracker: map[string]float64{`{foo="bar2"}`: float64(len("fizzbuss"))}, }, { path: `/loki/api/v1/push`, @@ -117,22 +124,24 @@ func TestParseRequest(t *testing.T) { valid: false, }, { - path: `/loki/api/v1/push`, - body: gzipString(`{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`), - contentType: `application/json; charset=utf-8`, - contentEncoding: `gzip`, - valid: true, - expectedBytes: len("fizzbuzz"), - expectedLines: 1, + path: `/loki/api/v1/push`, + body: gzipString(`{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`), + contentType: `application/json; charset=utf-8`, + contentEncoding: `gzip`, + valid: true, + expectedBytes: len("fizzbuzz"), + expectedLines: 1, + expectedBytesUsageTracker: map[string]float64{`{foo="bar2"}`: float64(len("fizzbuss"))}, }, { - path: `/loki/api/v1/push`, - body: deflateString(`{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`), - contentType: `application/json; charset=utf-8`, - contentEncoding: `deflate`, - valid: true, - expectedBytes: len("fizzbuzz"), - expectedLines: 1, + path: `/loki/api/v1/push`, + body: deflateString(`{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`), + contentType: `application/json; charset=utf-8`, + contentEncoding: `deflate`, + valid: true, + expectedBytes: len("fizzbuzz"), + expectedLines: 1, + expectedBytesUsageTracker: map[string]float64{`{foo="bar2"}`: float64(len("fizzbuss"))}, }, { path: `/loki/api/v1/push`, @@ -185,6 +194,7 @@ func TestParseRequest(t *testing.T) { expectedStructuredMetadataBytes: 2*len("a") + 2*len("b"), expectedBytes: len("fizzbuzz") + 2*len("a") + 2*len("b"), expectedLines: 1, + expectedBytesUsageTracker: map[string]float64{`{foo="bar2"}`: float64(len("fizzbuzz") + 2*len("a") + 2*len("b"))}, }, } { t.Run(fmt.Sprintf("test %d", index), func(t *testing.T) { @@ -200,7 +210,8 @@ func TestParseRequest(t *testing.T) { request.Header.Add("Content-Encoding", test.contentEncoding) } - data, err := ParseRequest(util_log.Logger, "fake", request, nil, nil, ParseLokiRequest) + tracker := NewMockTracker() + data, err := ParseRequest(util_log.Logger, "fake", request, nil, nil, ParseLokiRequest, tracker) structuredMetadataBytesReceived := int(structuredMetadataBytesReceivedStats.Value()["total"].(int64)) - previousStructuredMetadataBytesReceived previousStructuredMetadataBytesReceived += structuredMetadataBytesReceived @@ -210,7 +221,7 @@ func TestParseRequest(t *testing.T) { previousLinesReceived += linesReceived if test.valid { - assert.Nil(t, err, "Should not give error for %d", index) + assert.NoErrorf(t, err, "Should not give error for %d", index) assert.NotNil(t, data, "Should give data for %d", index) require.Equal(t, test.expectedStructuredMetadataBytes, structuredMetadataBytesReceived) require.Equal(t, test.expectedBytes, bytesReceived) @@ -218,8 +229,9 @@ func TestParseRequest(t *testing.T) { require.Equal(t, float64(test.expectedStructuredMetadataBytes), testutil.ToFloat64(structuredMetadataBytesIngested.WithLabelValues("fake", ""))) require.Equal(t, float64(test.expectedBytes), testutil.ToFloat64(bytesIngested.WithLabelValues("fake", ""))) require.Equal(t, float64(test.expectedLines), testutil.ToFloat64(linesIngested.WithLabelValues("fake"))) + require.InDeltaMapValuesf(t, test.expectedBytesUsageTracker, tracker.receivedBytes, 0.0, "%s != %s", test.expectedBytesUsageTracker, tracker.receivedBytes) } else { - assert.NotNil(t, err, "Should give error for %d", index) + assert.Errorf(t, err, "Should give error for %d", index) assert.Nil(t, data, "Should not give data for %d", index) require.Equal(t, 0, structuredMetadataBytesReceived) require.Equal(t, 0, bytesReceived) @@ -231,3 +243,25 @@ func TestParseRequest(t *testing.T) { }) } } + +type MockCustomTracker struct { + receivedBytes map[string]float64 + discardedBytes map[string]float64 +} + +func NewMockTracker() *MockCustomTracker { + return &MockCustomTracker{ + receivedBytes: map[string]float64{}, + discardedBytes: map[string]float64{}, + } +} + +// DiscardedBytesAdd implements CustomTracker. +func (t *MockCustomTracker) DiscardedBytesAdd(_, _ string, labels labels.Labels, value float64) { + t.discardedBytes[labels.String()] += value +} + +// ReceivedBytesAdd implements CustomTracker. +func (t *MockCustomTracker) ReceivedBytesAdd(_ string, _ time.Duration, labels labels.Labels, value float64) { + t.receivedBytes[labels.String()] += value +} diff --git a/pkg/loghttp/push/usage_tracker.go b/pkg/loghttp/push/usage_tracker.go new file mode 100644 index 0000000000000..ab84da5c6acc3 --- /dev/null +++ b/pkg/loghttp/push/usage_tracker.go @@ -0,0 +1,16 @@ +package push + +import ( + "time" + + "github.com/prometheus/prometheus/model/labels" +) + +type UsageTracker interface { + + // ReceivedBytesAdd records ingested bytes by tenant, retention period and labels. + ReceivedBytesAdd(tenant string, retentionPeriod time.Duration, labels labels.Labels, value float64) + + // DiscardedBytesAdd records discarded bytes by tenant and labels. + DiscardedBytesAdd(tenant, reason string, labels labels.Labels, value float64) +} diff --git a/pkg/logql/rangemapper.go b/pkg/logql/rangemapper.go index 14cf76f1475a5..f898e19d2ea1e 100644 --- a/pkg/logql/rangemapper.go +++ b/pkg/logql/rangemapper.go @@ -61,7 +61,7 @@ type RangeMapper struct { splitAlignTs time.Time } -// NewRangeMapperWithSplitAlign is similar to `NewRangeMapper` except it accepts additonal `splitAlign` argument and used to +// NewRangeMapperWithSplitAlign is similar to `NewRangeMapper` except it accepts additional `splitAlign` argument and used to // align the subqueries generated according to that. Look at `rangeSplitAlign` method for more information. func NewRangeMapperWithSplitAlign(interval time.Duration, splitAlign time.Time, metrics *MapperMetrics, stats *MapperStats) (RangeMapper, error) { rm, err := NewRangeMapper(interval, metrics, stats) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index d8ee613f6108b..63477495f1291 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -38,6 +38,7 @@ import ( "github.com/grafana/loki/pkg/distributor" "github.com/grafana/loki/pkg/ingester" ingester_client "github.com/grafana/loki/pkg/ingester/client" + "github.com/grafana/loki/pkg/loghttp/push" "github.com/grafana/loki/pkg/loki/common" "github.com/grafana/loki/pkg/lokifrontend" "github.com/grafana/loki/pkg/lokifrontend/frontend/transport" @@ -332,6 +333,8 @@ type Loki struct { Codec Codec Metrics *server.Metrics + + UsageTracker push.UsageTracker } // New makes a new Loki. diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 5b73be62ca51e..0e479204cb630 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -323,6 +323,7 @@ func (t *Loki) initDistributor() (services.Service, error) { prometheus.DefaultRegisterer, t.Cfg.MetricsNamespace, t.Tee, + t.UsageTracker, logger, ) if err != nil {