Skip to content

Commit

Permalink
Merge remote-tracking branch 'grafana/main' into karsten/custom-trackers
Browse files Browse the repository at this point in the history
  • Loading branch information
jeschkies committed Feb 20, 2024
2 parents b6b47a5 + 6204886 commit 457b002
Show file tree
Hide file tree
Showing 53 changed files with 1,655 additions and 431 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ cmd/querytee/querytee
dlv
rootfs/
dist
coverage.txt
test_results.txt
*coverage.txt
*test_results.txt
.DS_Store
.aws-sam
.idea
Expand Down
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
##### Enhancements

* [11840](https://github.com/grafana/loki/pull/11840) **jeschkies**: Allow custom trackers for ingested and discarded bytes metric.
* [11814](https://github.com/grafana/loki/pull/11814) **kavirajk**: feat: Support split align and caching for instant metric query results
* [11851](https://github.com/grafana/loki/pull/11851) **elcomtik**: Helm: Allow the definition of resources for GrafanaAgent pods.
* [11819](https://github.com/grafana/loki/pull/11819) **jburnham**: Ruler: Add the ability to disable the `X-Scope-OrgId` tenant identification header in remote write requests.
* [11633](https://github.com/grafana/loki/pull/11633) **cyriltovena**: Add profiling integrations to tracing instrumentation.
Expand Down Expand Up @@ -71,7 +72,7 @@
* [11657](https://github.com/grafana/loki/pull/11657) **ashwanthgoli** Log results cache: compose empty response based on the request being served to avoid returning incorrect limit or direction.
* [11587](https://github.com/grafana/loki/pull/11587) **trevorwhitney** Fix semantics of label parsing logic of metrics and logs queries. Both only parse the first label if multiple extractions into the same label are requested.
* [11776](https://github.com/grafana/loki/pull/11776) **ashwanthgoli** Background Cache: Fixes a bug that is causing the background queue size to be incremented twice for each enqueued item.
* [11921](https://github.com/grafana/loki/pull/11921) **paul1r**: Parsing: String array elements were not being parsed correctly in JSON processing
* [11921](https://github.com/grafana/loki/pull/11921) **paul1r**: Parsing: String array elements were not being parsed correctly in JSON processing

##### Changes

Expand Down
11 changes: 11 additions & 0 deletions cmd/loki/loki-local-with-memcached.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@ query_range:
cache_results: true
cache_volume_results: true
cache_series_results: true
cache_instant_metric_results: true
instant_metric_query_split_align: true
instant_metric_results_cache:
cache:
default_validity: 12h
memcached_client:
consistent_hash: true
addresses: "dns+localhost:11211"
max_idle_conns: 16
timeout: 500ms
update_interval: 1m
series_results_cache:
cache:
default_validity: 12h
Expand Down
30 changes: 30 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,28 @@ volume_results_cache:
# CLI flag: -frontend.volume-results-cache.compression
[compression: <string> | default = ""]
# Cache instant metric query results.
# CLI flag: -querier.cache-instant-metric-results
[cache_instant_metric_results: <boolean> | default = false]
# If a cache config is not specified and cache_instant_metric_results is true,
# the config for the results cache is used.
instant_metric_results_cache:
# The cache block configures the cache backend.
# The CLI flags prefix for this block configuration is:
# frontend.instant-metric-results-cache
[cache: <cache_config>]
# Use compression in cache. The default is an empty value '', which disables
# compression. Supported values are: 'snappy' and ''.
# CLI flag: -frontend.instant-metric-results-cache.compression
[compression: <string> | default = ""]
# Whether to align the splits of instant metric query with splitByInterval and
# query's exec time. Useful when instant_metric_cache is enabled
# CLI flag: -querier.instant-metric-query-split-align
[instant_metric_query_split_align: <boolean> | default = false]
# Cache series query results.
# CLI flag: -querier.cache-series-results
[cache_series_results: <boolean> | default = false]
Expand Down Expand Up @@ -2935,6 +2957,13 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# CLI flag: -experimental.querier.recent-metadata-query-window
[recent_metadata_query_window: <duration> | default = 0s]

# Split instant metric queries by a time interval and execute in parallel. The
# value 0 disables splitting instant metric queries by time. This also
# determines how cache keys are chosen when instant metric query result caching
# is enabled.
# CLI flag: -querier.split-instant-metric-queries-by-interval
[split_instant_metric_queries_by_interval: <duration> | default = 1h]

# Interval to use for time-based splitting when a request is within the
# `query_ingesters_within` window; defaults to `split-queries-by-interval` by
# setting to 0.
Expand Down Expand Up @@ -4407,6 +4436,7 @@ The cache block configures the cache backend. The supported CLI flags `<prefix>`
- `bloom.metas-cache`
- `frontend`
- `frontend.index-stats-results-cache`
- `frontend.instant-metric-results-cache`
- `frontend.label-results-cache`
- `frontend.series-results-cache`
- `frontend.volume-results-cache`
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ require (
github.com/DmitriyVTitov/size v1.5.0
github.com/IBM/go-sdk-core/v5 v5.13.1
github.com/IBM/ibm-cos-sdk-go v1.10.0
github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc
github.com/axiomhq/hyperloglog v0.0.0-20240124082744-24bca3a5b39b
github.com/d4l3k/messagediff v1.2.1
github.com/efficientgo/core v1.0.0-rc.2
github.com/fsnotify/fsnotify v1.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.16.1 h1:xsOtPAvHqhvQvBza5ohaUcfq1Lce
github.com/aws/aws-sdk-go-v2/service/sts v1.16.1/go.mod h1:Aq2/Qggh2oemSfyHH+EO4UBbgWG6zFCXLHYI4ILTY7w=
github.com/aws/smithy-go v1.11.1 h1:IQ+lPZVkSM3FRtyaDox41R8YS6iwPMYIreejOgPW49g=
github.com/aws/smithy-go v1.11.1/go.mod h1:3xHYmszWVx2c0kIwQeEVf9uSm4fYZt67FBJnwub1bgM=
github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc h1:Keo7wQ7UODUaHcEi7ltENhbAK2VgZjfat6mLy03tQzo=
github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc/go.mod h1:k08r+Yj1PRAmuayFiRK6MYuR5Ve4IuZtTfxErMIh0+c=
github.com/axiomhq/hyperloglog v0.0.0-20240124082744-24bca3a5b39b h1:F3yMzKumBUQ6Fn0sYI1YQ16vQRucpZOfBQ9HXWl5+XI=
github.com/axiomhq/hyperloglog v0.0.0-20240124082744-24bca3a5b39b/go.mod h1:k08r+Yj1PRAmuayFiRK6MYuR5Ve4IuZtTfxErMIh0+c=
github.com/baidubce/bce-sdk-go v0.9.141 h1:EV5BH5lfymIGPSmYDo9xYdsVlvWAW6nFeiA6t929zBE=
github.com/baidubce/bce-sdk-go v0.9.141/go.mod h1:zbYJMQwE4IZuyrJiFO8tO8NbtYiKTFTbwh4eIsqjVdg=
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc=
Expand Down
14 changes: 7 additions & 7 deletions pkg/bloomcompactor/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,10 @@ func (i *blockLoadingIter) loadNext() bool {
// check if there are more overlapping groups to load
if !i.overlapping.Next() {
i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]()
return false
}
if i.overlapping.Err() != nil {
i.err = i.overlapping.Err()
}

if i.overlapping.Err() != nil {
i.err = i.overlapping.Err()
return false
}

Expand All @@ -300,7 +299,7 @@ func (i *blockLoadingIter) loadNext() bool {
filtered := v1.NewFilterIter[*bloomshipper.CloseableBlockQuerier](loader, i.filter)

iters := make([]v1.PeekingIterator[*v1.SeriesWithBloom], 0, len(blockRefs))
for filtered.Next() && filtered.Err() == nil {
for filtered.Next() {
bq := loader.At()
if _, ok := i.loaded[bq]; !ok {
i.loaded[bq] = struct{}{}
Expand All @@ -309,8 +308,9 @@ func (i *blockLoadingIter) loadNext() bool {
iters = append(iters, iter)
}

if loader.Err() != nil {
i.err = loader.Err()
if err := filtered.Err(); err != nil {
i.err = err
i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]()
return false
}

Expand Down
15 changes: 13 additions & 2 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (c *Compactor) ownsTenant(tenant string) (v1.FingerprintBounds, bool, error

// runs a single round of compaction for all relevant tenants and tables
func (c *Compactor) runOne(ctx context.Context) error {
level.Info(c.logger).Log("msg", "running bloom compaction", "workers", c.cfg.WorkerParallelism)
var workersErr error
var wg sync.WaitGroup
ch := make(chan tenantTable)
Expand All @@ -226,7 +227,11 @@ func (c *Compactor) runOne(ctx context.Context) error {
err := c.loadWork(ctx, ch)

wg.Wait()
return multierror.New(workersErr, err, ctx.Err()).Err()
err = multierror.New(workersErr, err, ctx.Err()).Err()
if err != nil {
level.Error(c.logger).Log("msg", "compaction iteration failed", "err", err)
}
return err
}

func (c *Compactor) tables(ts time.Time) *dayRangeIterator {
Expand All @@ -241,6 +246,7 @@ func (c *Compactor) tables(ts time.Time) *dayRangeIterator {

fromDay := config.NewDayTime(model.TimeFromUnixNano(from))
throughDay := config.NewDayTime(model.TimeFromUnixNano(through))
level.Debug(c.logger).Log("msg", "loaded tables for compaction", "from", fromDay, "through", throughDay)
return newDayRangeIterator(fromDay, throughDay, c.schemaCfg)
}

Expand All @@ -250,6 +256,8 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
for tables.Next() && tables.Err() == nil && ctx.Err() == nil {
table := tables.At()

level.Debug(c.logger).Log("msg", "loading work for table", "table", table)

tenants, err := c.tenants(ctx, table)
if err != nil {
return errors.Wrap(err, "getting tenants")
Expand All @@ -262,6 +270,7 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
if err != nil {
return errors.Wrap(err, "checking tenant ownership")
}
level.Debug(c.logger).Log("msg", "enqueueing work for tenant", "tenant", tenant, "table", table, "ownership", ownershipRange.String(), "owns", owns)
if !owns {
c.metrics.tenantsSkipped.Inc()
continue
Expand All @@ -280,12 +289,14 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
}

if err := tenants.Err(); err != nil {
level.Error(c.logger).Log("msg", "error iterating tenants", "err", err)
return errors.Wrap(err, "iterating tenants")
}

}

if err := tables.Err(); err != nil {
level.Error(c.logger).Log("msg", "error iterating tables", "err", err)
return errors.Wrap(err, "iterating tables")
}

Expand Down Expand Up @@ -330,7 +341,7 @@ func (c *Compactor) runWorkers(ctx context.Context, ch <-chan tenantTable) error
}

func (c *Compactor) compactTenantTable(ctx context.Context, tt tenantTable) error {
level.Info(c.logger).Log("msg", "compacting", "org_id", tt.tenant, "table", tt.table, "ownership", tt.ownershipRange)
level.Info(c.logger).Log("msg", "compacting", "org_id", tt.tenant, "table", tt.table, "ownership", tt.ownershipRange.String())
return c.controller.compactTenant(ctx, tt.table, tt.tenant, tt.ownershipRange)
}

Expand Down
Loading

0 comments on commit 457b002

Please sign in to comment.