-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bloom-compactor Sharding #11154
Bloom-compactor Sharding #11154
Conversation
faff95c
to
1346da2
Compare
1346da2
to
49d8a39
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very good start, left some thoughts.
// Job holds a compaction job, which consists of a group of blocks that should be compacted together. | ||
// Not goroutine safe. | ||
// TODO: A job should probably contain series or chunks | ||
type Job struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I expect we'll want a job to contain multiple series rather than one per fingerprint (fingerprint is roughly analagous to stream). Creating one per fingerprint will likely result in logging millions of streams when the happy path isn't satisfied. That's an inconvenience, but there may be places where it's a performance hit as well.
Instead, we could either
- Create one job for all fingerprints in a TSDB owned by the compactor replica
- Choose some batch-size and create a number of jobs that have that many series|chunks in each (more work but more predictable)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was my original understanding/idea as well. But looking at #11115 looked like we would be listing and processing one series at a time. So I though it was actually easier to filter by the series fingerprint right away.
- Create one job for all fingerprints in a TSDB owned by the compactor replica
- Choose some batch-size and create a number of jobs that have that many series|chunks in each (more work but more predictable)
The "problem" I see with this is that we'd need to iterate throughout the whole index to build those batches anyway. And then, iterate again skipping those unowned. So I don't quite see the benefit of creating those batches other than not checking the ring as often (which shouldn't be a problem sinceShuffleShard
ops are cached at the ring level)
will likely result in logging millions of streams
For each series, the first thing we do is checking if we use the series' FP and skip it if we don't. By logging here I guess you mean the log line "skipping job because it is not owned by this shard"
. I agree this will be too verbose. Since we have a metric to track that (loki_bloomcompactor_jobs_skipped
), I think we can just remove this log line. Maybe we can even do the same for the tenant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@owen do you mean in option 1, all series in a single TSDB table can be owned by a job. so in this case we can achieve it by adding min and max seriesFp in that table
so we'll have a job per table, per tenant for all series?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed with Owen to continue with 1 series - 1 job for now. We can easily reevaluate this in future iterations.
bufHosts, bufZones []string | ||
} | ||
|
||
func NewShuffleShardingStrategy(r *ring.Ring, ringLifecycler *ring.BasicLifecycler, limits Limits) *ShuffleShardingStrategy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to reimplement (copy/paste) this code. If we only need to extend functionality for OwnsJob
, we can try struct-embedding. Is there something I'm missing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, we can use the sharding implementation from the gateway, move it to github.com/grafana/loki/pkg/util/ring
and extend it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, I created pkg/util/ring/sharding.go
with strategies to shard by tenant and by fingerprint. Refactored the sharding strategies from bloom-gw and bloom-compactor to reuse the new ones from the ring utils lib.
**What this PR does / why we need it**: This is the first step to create a bloom from a series. The logic to get tenants and tables is a temporary hack and will be replaced by the ring logic in #11154 **Which issue(s) this PR fixes**: Fixes #<issue number> **Special notes for your reviewer**: It does not handle appending multiple blooms into blocks. It does not handle creating and uploading of meta files. **Checklist** - [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [ ] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](d10549e) - [ ] If the change is deprecating or removing a configuration option, update the `deprecated-config.yaml` and `deleted-config.yaml` files respectively in the `tools/deprecated-config-checker` directory. [Example PR](0d4416a)
pkg/validation/limits.go
Outdated
@@ -297,6 +298,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { | |||
|
|||
f.IntVar(&l.IndexGatewayShardSize, "index-gateway.shard-size", 0, "The shard size defines how many index gateways should be used by a tenant for querying. If the global shard factor is 0, the global shard factor is set to the deprecated -replication-factor for backwards compatibility reasons.") | |||
f.IntVar(&l.BloomGatewayShardSize, "bloom-gateway.shard-size", 1, "The shard size defines how many bloom gateways should be used by a tenant for querying.") | |||
f.IntVar(&l.BloomCompactorShardSize, "bloom-compactor.shard-size", 0, "The shard size defines how many bloom compactors should be used by a tenant when computing blooms.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's a bit confusing that shard-size for Bloom Gateway is set to 1 by default where is for the compactor it's set to 0.
Logically it should work the same way, but the users might be confused a bit...
f.IntVar(&l.BloomCompactorShardSize, "bloom-compactor.shard-size", 0, "The shard size defines how many bloom compactors should be used by a tenant when computing blooms.") | |
f.IntVar(&l.BloomCompactorShardSize, "bloom-compactor.shard-size", 0, "The shard size defines how many bloom compactors should be used by a tenant when computing blooms. If it's set to 0, shuffle sharding is disabled.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. My original idea was to, by default, allow all tenants to use all compactors. Changed the default to 1 as well and added your suggestion as well.
bufHosts, bufZones []string | ||
} | ||
|
||
func NewShuffleShardingStrategy(r *ring.Ring, ringLifecycler *ring.BasicLifecycler, limits Limits) *ShuffleShardingStrategy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, we can use the sharding implementation from the gateway, move it to github.com/grafana/loki/pkg/util/ring
and extend it.
pkg/bloomcompactor/sharding.go
Outdated
ringLifeCycler: ringLifecycler, | ||
limits: limits, | ||
} | ||
s.bufDescs, s.bufHosts, s.bufZones = ring.MakeBuffersForGet() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Usage of these buffers is not threadsafe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. Thank you! I removed the buffs and passed nil. If we end up making many allocations here, we can use a buffer pool. Created the bufs inside NewFingerprintShuffleSharding
which is isolated per thread.
pkg/bloomcompactor/bloomcompactor.go
Outdated
if err := c.runCompact(ctx, c.bloomShipperClient, sc, job); err != nil { | ||
c.metrics.compactionRunFailedJobs.Inc() | ||
errs.Add(errors.Wrap(err, "runBloomCompact")) | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably a personal preference, but I would separate job collection and job execution into separate functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree! Let's do it on a followup PR once we add more logic to the actual compaction of chunks. Maybe moving that logic to chunkcompactor.go
pkg/bloomcompactor/bloomcompactor.go
Outdated
func filterAndSortTablesByRange(tables []string, maxAge time.Duration) []string { | ||
tableRanges := make(map[string]model.Interval, len(tables)) | ||
tablesToSort := make([]string, 0, len(tables)) | ||
maxAgeTime := model.Now().Add(-maxAge) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can we ensure we exclude un-compacted index tables from potentially last 15 mins? we agreed that it will be tricky to compact most recent uncompacted tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. I think we'll need to add a new limit MinTableAge
? and look End
time of the table's interval.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any other way of determining an un-compacted indexes I wonder?
I've seen some tar files in the index bucket in gcp, something like timestamp-ingester-12.tsdb.gz
, i assume these are un-compacted indexes. I'll check the code tomorrow to confirm.
I know the underlying object client doesn't filter them out, returns them all. But maybe something else in the middle does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any other way of determining an un-compacted indexes I wonder?
I'm not sure. Let me know the output of your investigation. We can modify this in a followup PR.
Added MinTableAge defaulting to 1h for now.
// Job holds a compaction job, which consists of a group of blocks that should be compacted together. | ||
// Not goroutine safe. | ||
// TODO: A job should probably contain series or chunks | ||
type Job struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@owen do you mean in option 1, all series in a single TSDB table can be owned by a job. so in this case we can achieve it by adding min and max seriesFp in that table
so we'll have a job per table, per tenant for all series?
pkg/bloomcompactor/bloomcompactor.go
Outdated
return | ||
} | ||
if !ownsJob { | ||
c.metrics.compactionRunSkippedJobs.Inc() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can see this metric name compactionRunSkippedJobs
can get confusing in the future because there will be whole compactions skipped because there is no change in indexes and chunks that are already compacted.
why do we want to count jobs not owned by the shard? we must assume it will be processed by another shard.
I can see it as a back up to see if we are missing a subgroup of jobs all together, maybe we can use a metric like jobNotOwned, still it doesn't help us find which jobs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because there will be whole compactions skipped because there is no change in indexes
What about renaming the metric to unowned_jobs
instead?
why do we want to count jobs not owned by the shard? we must assume it will be processed by another shard.
I think this will be useful for debugging proposes. We can look at the total number started compaction jobs and the number of finished jobs. ideally the number of started should be the same as finished. If they differ we can use the unowned metric to see if there are jobs not being owned by any compactor (if the sum of unowned jobs. more than jobs started).
Having said that, we'd need to add the started
metric...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes i agree that it's good for debugging purposes. unowned_jobs
sounds the most appropriate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, thanks for working this out.
pkg/bloomcompactor/bloomcompactor.go
Outdated
errs := multierror.New() | ||
_ = concurrency.ForEachJob(ctx, len(tables), parallelism, func(ctx context.Context, i int) error { | ||
tableName := tables[i] | ||
level.Info(c.logger).Log("msg", "compacting table", "table-name", tableName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On line 260 you use the key table
.
func (j *Job) computeFromThrough() { | ||
minFrom := model.Latest | ||
maxThrough := model.Earliest | ||
|
||
for _, chunk := range j.chunks { | ||
from, through := chunk.Bounds() | ||
if minFrom > from { | ||
minFrom = from | ||
} | ||
if maxThrough < through { | ||
maxThrough = through | ||
} | ||
} | ||
|
||
j.from = &minFrom | ||
j.through = &maxThrough | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will yield incorrect result when len(j.chunks)
is 0
, because minFrom
and maxThrough
are never updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great catch, I can also address that in my next PR.
type metrics struct { | ||
compactionRunsStarted prometheus.Counter | ||
compactionRunsCompleted prometheus.Counter | ||
compactionRunsErred prometheus.Counter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
compactionRunsErred prometheus.Counter | |
compactionRunsFailed prometheus.Counter |
**What this PR does / why we need it**: This is the first step to create a bloom from a series. The logic to get tenants and tables is a temporary hack and will be replaced by the ring logic in grafana#11154 **Which issue(s) this PR fixes**: Fixes #<issue number> **Special notes for your reviewer**: It does not handle appending multiple blooms into blocks. It does not handle creating and uploading of meta files. **Checklist** - [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [ ] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](grafana@d10549e) - [ ] If the change is deprecating or removing a configuration option, update the `deprecated-config.yaml` and `deleted-config.yaml` files respectively in the `tools/deprecated-config-checker` directory. [Example PR](grafana@0d4416a)
**What this PR does / why we need it**: This PR adds tenant and fingerprint (FP) sharding to bloom compactors. Note that the bloom-compactor doesn't yet perform any compaction, but iterates through all tables, tenants, and series checking if the compactor owns the tenant and the series (by the series FP). Actual compaction will be implemented with grafana#11115. A new structure `Job` is added which will carry around all the context for a compaction job such as the tenant ID, the table name, and the series FP. The sharding strategy has two methods: - `OwnsTenant(tenant string)`: Checks if the compactor shard owns the tenant. - `OwnsJob(job Job)`: Checks (again) if the compactor owns the job's tenant. Then, it checks if the compactor owns the job's fingerprint by looking inside the tenant subring. We add a new per-tenant limit: `bloom_compactor_shard_size`. If it's 0, the tenant can use all compactors (i.e. `OwnsTenant` will always return `true`), otherwise, only `bloom_compactor_shard_size` out of the total number of compactors will own the tenant. A given job's FP will be owned by exactly one compactor within the tenant shard. **Special notes for your reviewer**: - Added a bunch of metrics in `metrics.go` - Added a test for the sharding strategy
Addressing comments from grafana#11154 after it was merged
What this PR does / why we need it:
This PR adds tenant and fingerprint (FP) sharding to bloom compactors. Note that the bloom-compactor doesn't yet perform any compaction, but iterates through all tables, tenants, and series checking if the compactor owns the tenant and the series (by the series FP). Actual compaction will be implemented with #11115.
A new structure
Job
is added which will carry around all the context for a compaction job such as the tenant ID, the table name, and the series FP. The sharding strategy has two methods:OwnsTenant(tenant string)
: Checks if the compactor shard owns the tenant.OwnsJob(job Job)
: Checks (again) if the compactor owns the job's tenant. Then, it checks if the compactor owns the job's fingerprint by looking inside the tenant subring.We add a new per-tenant limit:
bloom_compactor_shard_size
. If it's 0, the tenant can use all compactors (i.e.OwnsTenant
will always returntrue
), otherwise, onlybloom_compactor_shard_size
out of the total number of compactors will own the tenant. A given job's FP will be owned by exactly one compactor within the tenant shard.Special notes for your reviewer:
metrics.go