Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bloom-compactor Sharding #11154

Merged
merged 17 commits into from
Nov 10, 2023
Merged

Bloom-compactor Sharding #11154

merged 17 commits into from
Nov 10, 2023

Conversation

salvacorts
Copy link
Contributor

@salvacorts salvacorts commented Nov 7, 2023

What this PR does / why we need it:
This PR adds tenant and fingerprint (FP) sharding to bloom compactors. Note that the bloom-compactor doesn't yet perform any compaction, but iterates through all tables, tenants, and series checking if the compactor owns the tenant and the series (by the series FP). Actual compaction will be implemented with #11115.

A new structure Job is added which will carry around all the context for a compaction job such as the tenant ID, the table name, and the series FP. The sharding strategy has two methods:

  • OwnsTenant(tenant string): Checks if the compactor shard owns the tenant.
  • OwnsJob(job Job): Checks (again) if the compactor owns the job's tenant. Then, it checks if the compactor owns the job's fingerprint by looking inside the tenant subring.

We add a new per-tenant limit: bloom_compactor_shard_size. If it's 0, the tenant can use all compactors (i.e. OwnsTenant will always return true), otherwise, only bloom_compactor_shard_size out of the total number of compactors will own the tenant. A given job's FP will be owned by exactly one compactor within the tenant shard.

Special notes for your reviewer:

  • Added a bunch of metrics in metrics.go
  • Added a test for the sharding strategy

@salvacorts salvacorts changed the title Salvacorts/bloomcompactor sharding Bloom-compactor Sharding Nov 7, 2023
@salvacorts salvacorts force-pushed the salvacorts/bloomcompactor-sharding branch 3 times, most recently from faff95c to 1346da2 Compare November 7, 2023 11:06
@salvacorts salvacorts force-pushed the salvacorts/bloomcompactor-sharding branch from 1346da2 to 49d8a39 Compare November 7, 2023 11:14
@github-actions github-actions bot added the type/docs Issues related to technical documentation; the Docs Squad uses this label across many repositories label Nov 7, 2023
@salvacorts salvacorts marked this pull request as ready for review November 7, 2023 11:27
@salvacorts salvacorts requested a review from a team as a code owner November 7, 2023 11:27
Copy link
Member

@owen-d owen-d left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good start, left some thoughts.

pkg/bloomcompactor/bloomcompactor.go Outdated Show resolved Hide resolved
pkg/bloomcompactor/bloomcompactor.go Outdated Show resolved Hide resolved
pkg/bloomcompactor/bloomcompactor.go Outdated Show resolved Hide resolved
pkg/bloomcompactor/bloomcompactor.go Outdated Show resolved Hide resolved
pkg/bloomcompactor/bloomcompactor.go Show resolved Hide resolved
pkg/bloomcompactor/config.go Outdated Show resolved Hide resolved
// Job holds a compaction job, which consists of a group of blocks that should be compacted together.
// Not goroutine safe.
// TODO: A job should probably contain series or chunks
type Job struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I expect we'll want a job to contain multiple series rather than one per fingerprint (fingerprint is roughly analagous to stream). Creating one per fingerprint will likely result in logging millions of streams when the happy path isn't satisfied. That's an inconvenience, but there may be places where it's a performance hit as well.

Instead, we could either

  • Create one job for all fingerprints in a TSDB owned by the compactor replica
  • Choose some batch-size and create a number of jobs that have that many series|chunks in each (more work but more predictable)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was my original understanding/idea as well. But looking at #11115 looked like we would be listing and processing one series at a time. So I though it was actually easier to filter by the series fingerprint right away.

  • Create one job for all fingerprints in a TSDB owned by the compactor replica
  • Choose some batch-size and create a number of jobs that have that many series|chunks in each (more work but more predictable)

The "problem" I see with this is that we'd need to iterate throughout the whole index to build those batches anyway. And then, iterate again skipping those unowned. So I don't quite see the benefit of creating those batches other than not checking the ring as often (which shouldn't be a problem sinceShuffleShard ops are cached at the ring level)

will likely result in logging millions of streams

For each series, the first thing we do is checking if we use the series' FP and skip it if we don't. By logging here I guess you mean the log line "skipping job because it is not owned by this shard". I agree this will be too verbose. Since we have a metric to track that (loki_bloomcompactor_jobs_skipped), I think we can just remove this log line. Maybe we can even do the same for the tenant?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@owen do you mean in option 1, all series in a single TSDB table can be owned by a job. so in this case we can achieve it by adding min and max seriesFp in that table
so we'll have a job per table, per tenant for all series?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed with Owen to continue with 1 series - 1 job for now. We can easily reevaluate this in future iterations.

bufHosts, bufZones []string
}

func NewShuffleShardingStrategy(r *ring.Ring, ringLifecycler *ring.BasicLifecycler, limits Limits) *ShuffleShardingStrategy {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to reimplement (copy/paste) this code. If we only need to extend functionality for OwnsJob, we can try struct-embedding. Is there something I'm missing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, we can use the sharding implementation from the gateway, move it to github.com/grafana/loki/pkg/util/ring and extend it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, I created pkg/util/ring/sharding.go with strategies to shard by tenant and by fingerprint. Refactored the sharding strategies from bloom-gw and bloom-compactor to reuse the new ones from the ring utils lib.

poyzannur added a commit that referenced this pull request Nov 8, 2023
**What this PR does / why we need it**:
This is the first step to create a bloom from a series. The logic to get tenants and tables is a temporary hack and will be replaced by the ring logic in #11154

**Which issue(s) this PR fixes**:
Fixes #<issue number>

**Special notes for your reviewer**:
It does not handle appending multiple blooms into blocks. 
It does not handle creating and uploading of meta files. 

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [ ] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory.
[Example
PR](0d4416a)
pkg/bloomcompactor/bloomcompactor.go Outdated Show resolved Hide resolved
pkg/bloomcompactor/bloomcompactor.go Show resolved Hide resolved
pkg/bloomcompactor/metrics.go Outdated Show resolved Hide resolved
@@ -297,6 +298,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {

f.IntVar(&l.IndexGatewayShardSize, "index-gateway.shard-size", 0, "The shard size defines how many index gateways should be used by a tenant for querying. If the global shard factor is 0, the global shard factor is set to the deprecated -replication-factor for backwards compatibility reasons.")
f.IntVar(&l.BloomGatewayShardSize, "bloom-gateway.shard-size", 1, "The shard size defines how many bloom gateways should be used by a tenant for querying.")
f.IntVar(&l.BloomCompactorShardSize, "bloom-compactor.shard-size", 0, "The shard size defines how many bloom compactors should be used by a tenant when computing blooms.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a bit confusing that shard-size for Bloom Gateway is set to 1 by default where is for the compactor it's set to 0.
Logically it should work the same way, but the users might be confused a bit...

Suggested change
f.IntVar(&l.BloomCompactorShardSize, "bloom-compactor.shard-size", 0, "The shard size defines how many bloom compactors should be used by a tenant when computing blooms.")
f.IntVar(&l.BloomCompactorShardSize, "bloom-compactor.shard-size", 0, "The shard size defines how many bloom compactors should be used by a tenant when computing blooms. If it's set to 0, shuffle sharding is disabled.")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. My original idea was to, by default, allow all tenants to use all compactors. Changed the default to 1 as well and added your suggestion as well.

pkg/bloomcompactor/bloomcompactor.go Outdated Show resolved Hide resolved
pkg/bloomcompactor/bloomcompactor.go Show resolved Hide resolved
pkg/bloomcompactor/config.go Outdated Show resolved Hide resolved
pkg/bloomcompactor/bloomcompactor.go Show resolved Hide resolved
pkg/bloomcompactor/bloomcompactor.go Outdated Show resolved Hide resolved
pkg/bloomcompactor/bloomcompactor_test.go Outdated Show resolved Hide resolved
bufHosts, bufZones []string
}

func NewShuffleShardingStrategy(r *ring.Ring, ringLifecycler *ring.BasicLifecycler, limits Limits) *ShuffleShardingStrategy {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, we can use the sharding implementation from the gateway, move it to github.com/grafana/loki/pkg/util/ring and extend it.

ringLifeCycler: ringLifecycler,
limits: limits,
}
s.bufDescs, s.bufHosts, s.bufZones = ring.MakeBuffersForGet()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usage of these buffers is not threadsafe

Copy link
Contributor Author

@salvacorts salvacorts Nov 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. Thank you! I removed the buffs and passed nil. If we end up making many allocations here, we can use a buffer pool. Created the bufs inside NewFingerprintShuffleSharding which is isolated per thread.

pkg/bloomcompactor/metrics.go Show resolved Hide resolved
Comment on lines 395 to 399
if err := c.runCompact(ctx, c.bloomShipperClient, sc, job); err != nil {
c.metrics.compactionRunFailedJobs.Inc()
errs.Add(errors.Wrap(err, "runBloomCompact"))
return
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably a personal preference, but I would separate job collection and job execution into separate functions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree! Let's do it on a followup PR once we add more logic to the actual compaction of chunks. Maybe moving that logic to chunkcompactor.go

Comment on lines 579 to 582
func filterAndSortTablesByRange(tables []string, maxAge time.Duration) []string {
tableRanges := make(map[string]model.Interval, len(tables))
tablesToSort := make([]string, 0, len(tables))
maxAgeTime := model.Now().Add(-maxAge)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we ensure we exclude un-compacted index tables from potentially last 15 mins? we agreed that it will be tricky to compact most recent uncompacted tables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. I think we'll need to add a new limit MinTableAge? and look End time of the table's interval.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any other way of determining an un-compacted indexes I wonder?
I've seen some tar files in the index bucket in gcp, something like timestamp-ingester-12.tsdb.gz, i assume these are un-compacted indexes. I'll check the code tomorrow to confirm.
I know the underlying object client doesn't filter them out, returns them all. But maybe something else in the middle does.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any other way of determining an un-compacted indexes I wonder?

I'm not sure. Let me know the output of your investigation. We can modify this in a followup PR.

Added MinTableAge defaulting to 1h for now.

// Job holds a compaction job, which consists of a group of blocks that should be compacted together.
// Not goroutine safe.
// TODO: A job should probably contain series or chunks
type Job struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@owen do you mean in option 1, all series in a single TSDB table can be owned by a job. so in this case we can achieve it by adding min and max seriesFp in that table
so we'll have a job per table, per tenant for all series?

return
}
if !ownsJob {
c.metrics.compactionRunSkippedJobs.Inc()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see this metric name compactionRunSkippedJobs can get confusing in the future because there will be whole compactions skipped because there is no change in indexes and chunks that are already compacted.
why do we want to count jobs not owned by the shard? we must assume it will be processed by another shard.

I can see it as a back up to see if we are missing a subgroup of jobs all together, maybe we can use a metric like jobNotOwned, still it doesn't help us find which jobs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because there will be whole compactions skipped because there is no change in indexes

What about renaming the metric to unowned_jobs instead?

why do we want to count jobs not owned by the shard? we must assume it will be processed by another shard.

I think this will be useful for debugging proposes. We can look at the total number started compaction jobs and the number of finished jobs. ideally the number of started should be the same as finished. If they differ we can use the unowned metric to see if there are jobs not being owned by any compactor (if the sum of unowned jobs. more than jobs started).

Having said that, we'd need to add the started metric...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes i agree that it's good for debugging purposes. unowned_jobs sounds the most appropriate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done :)

Copy link
Contributor

@vlad-diachenko vlad-diachenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM ;)

Copy link
Contributor

@poyzannur poyzannur left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, thanks for working this out.

@salvacorts salvacorts merged commit 4248825 into main Nov 10, 2023
6 checks passed
@salvacorts salvacorts deleted the salvacorts/bloomcompactor-sharding branch November 10, 2023 15:25
errs := multierror.New()
_ = concurrency.ForEachJob(ctx, len(tables), parallelism, func(ctx context.Context, i int) error {
tableName := tables[i]
level.Info(c.logger).Log("msg", "compacting table", "table-name", tableName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On line 260 you use the key table.

Comment on lines +81 to +97
func (j *Job) computeFromThrough() {
minFrom := model.Latest
maxThrough := model.Earliest

for _, chunk := range j.chunks {
from, through := chunk.Bounds()
if minFrom > from {
minFrom = from
}
if maxThrough < through {
maxThrough = through
}
}

j.from = &minFrom
j.through = &maxThrough
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will yield incorrect result when len(j.chunks) is 0, because minFrom and maxThrough are never updated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great catch, I can also address that in my next PR.

type metrics struct {
compactionRunsStarted prometheus.Counter
compactionRunsCompleted prometheus.Counter
compactionRunsErred prometheus.Counter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
compactionRunsErred prometheus.Counter
compactionRunsFailed prometheus.Counter

salvacorts added a commit that referenced this pull request Nov 10, 2023
@salvacorts salvacorts mentioned this pull request Nov 10, 2023
salvacorts added a commit that referenced this pull request Nov 10, 2023
Addressing comments from #11154 after it was merged
@poyzannur
Copy link
Contributor

rhnasc pushed a commit to inloco/loki that referenced this pull request Apr 12, 2024
**What this PR does / why we need it**:
This is the first step to create a bloom from a series. The logic to get tenants and tables is a temporary hack and will be replaced by the ring logic in grafana#11154

**Which issue(s) this PR fixes**:
Fixes #<issue number>

**Special notes for your reviewer**:
It does not handle appending multiple blooms into blocks. 
It does not handle creating and uploading of meta files. 

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [ ] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](grafana@d10549e)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory.
[Example
PR](grafana@0d4416a)
rhnasc pushed a commit to inloco/loki that referenced this pull request Apr 12, 2024
**What this PR does / why we need it**:
This PR adds tenant and fingerprint (FP) sharding to bloom compactors.
Note that the bloom-compactor doesn't yet perform any compaction, but
iterates through all tables, tenants, and series checking if the
compactor owns the tenant and the series (by the series FP). Actual
compaction will be implemented with
grafana#11115.

A new structure `Job` is added which will carry around all the context
for a compaction job such as the tenant ID, the table name, and the
series FP. The sharding strategy has two methods:
- `OwnsTenant(tenant string)`: Checks if the compactor shard owns the
tenant.
- `OwnsJob(job Job)`: Checks (again) if the compactor owns the job's
tenant. Then, it checks if the compactor owns the job's fingerprint by
looking inside the tenant subring.

We add a new per-tenant limit: `bloom_compactor_shard_size`. If it's 0,
the tenant can use all compactors (i.e. `OwnsTenant` will always return
`true`), otherwise, only `bloom_compactor_shard_size` out of the total
number of compactors will own the tenant. A given job's FP will be owned
by exactly one compactor within the tenant shard.

**Special notes for your reviewer**:
- Added a bunch of metrics in `metrics.go`
- Added a test for the sharding strategy
rhnasc pushed a commit to inloco/loki that referenced this pull request Apr 12, 2024
Addressing comments from grafana#11154 after it was merged
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size/XXL type/docs Issues related to technical documentation; the Docs Squad uses this label across many repositories
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants