Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable/disable bloom gateway per tenant #11203

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1799,7 +1799,7 @@ ring:
# CLI flag: -bloom-gateway.replication-factor
[replication_factor: <int> | default = 3]

# Flag to enable or disable the usage of the bloom gatway component.
# Flag to enable or disable the bloom gateway component globally.
# CLI flag: -bloom-gateway.enabled
[enabled: <boolean> | default = false]

Expand Down Expand Up @@ -2938,6 +2938,10 @@ shard_streams:
# CLI flag: -bloom-gateway.shard-size
[bloom_gateway_shard_size: <int> | default = 1]

# Whether to use the bloom gateway component in the read path to filter chunks.
# CLI flag: -bloom-gateway.enable-filtering
[bloom_gateway_enable_filtering: <boolean> | default = false]

# The shard size defines how many bloom compactors should be used by a tenant
# when computing blooms. If it's set to 0, shuffle sharding is disabled.
# CLI flag: -bloom-compactor.shard-size
Expand Down
4 changes: 4 additions & 0 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ func shuffleAddrs(addrs []string) []string {

// FilterChunkRefs implements Client
func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, through model.Time, groups []*logproto.GroupedChunkRefs, filters ...*logproto.LineFilterExpression) ([]*logproto.GroupedChunkRefs, error) {
if !c.limits.BloomGatewayEnabled(tenant) {
return groups, nil
}

// Get the addresses of corresponding bloom gateways for each series.
fingerprints, addrs, err := c.serverAddrsForFingerprints(tenant, groups)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
// RegisterFlagsWithPrefix registers flags for the Bloom Gateway configuration with a common prefix.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.Ring.RegisterFlagsWithPrefix(prefix, "collectors/", f)
f.BoolVar(&cfg.Enabled, prefix+"enabled", false, "Flag to enable or disable the usage of the bloom gatway component.")
f.BoolVar(&cfg.Enabled, prefix+"enabled", false, "Flag to enable or disable the bloom gateway component globally.")
// TODO(chaudum): Figure out what the better place is for registering flags
// -bloom-gateway.client.* or -bloom-gateway-client.*
cfg.Client.RegisterFlags(f)
Expand Down
1 change: 1 addition & 0 deletions pkg/bloomgateway/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var (

type Limits interface {
BloomGatewayShardSize(tenantID string) int
BloomGatewayEnabled(tenantID string) bool
}

type ShardingStrategy interface {
Expand Down
14 changes: 12 additions & 2 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,11 @@ type Limits struct {
RequiredLabels []string `yaml:"required_labels,omitempty" json:"required_labels,omitempty" doc:"description=Define a list of required selector labels."`
RequiredNumberLabels int `yaml:"minimum_labels_number,omitempty" json:"minimum_labels_number,omitempty" doc:"description=Minimum number of label matchers a query should contain."`

IndexGatewayShardSize int `yaml:"index_gateway_shard_size" json:"index_gateway_shard_size"`
BloomGatewayShardSize int `yaml:"bloom_gateway_shard_size" json:"bloom_gateway_shard_size"`
IndexGatewayShardSize int `yaml:"index_gateway_shard_size" json:"index_gateway_shard_size"`

BloomGatewayShardSize int `yaml:"bloom_gateway_shard_size" json:"bloom_gateway_shard_size"`
BloomGatewayEnabled bool `yaml:"bloom_gateway_enable_filtering" json:"bloom_gateway_enable_filtering"`

BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"`
BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"`
BloomCompactorMinTableAge time.Duration `yaml:"bloom_compactor_min_table_age" json:"bloom_compactor_min_table_age"`
Expand Down Expand Up @@ -291,7 +294,10 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
dskit_flagext.DeprecatedFlag(f, "compactor.allow-deletes", "Deprecated. Instead, see compactor.deletion-mode which is another per tenant configuration", util_log.Logger)

f.IntVar(&l.IndexGatewayShardSize, "index-gateway.shard-size", 0, "The shard size defines how many index gateways should be used by a tenant for querying. If the global shard factor is 0, the global shard factor is set to the deprecated -replication-factor for backwards compatibility reasons.")

f.IntVar(&l.BloomGatewayShardSize, "bloom-gateway.shard-size", 1, "The shard size defines how many bloom gateways should be used by a tenant for querying.")
f.BoolVar(&l.BloomGatewayEnabled, "bloom-gateway.enable-filtering", false, "Whether to use the bloom gateway component in the read path to filter chunks.")

f.IntVar(&l.BloomCompactorShardSize, "bloom-compactor.shard-size", 1, "The shard size defines how many bloom compactors should be used by a tenant when computing blooms. If it's set to 0, shuffle sharding is disabled.")
f.DurationVar(&l.BloomCompactorMaxTableAge, "bloom-compactor.max-table-age", 7*24*time.Hour, "The maximum age of a table before it is compacted. Do not compact tables older than the the configured time. Default to 7 days. 0s means no limit.")
f.DurationVar(&l.BloomCompactorMinTableAge, "bloom-compactor.min-table-age", 1*time.Hour, "The minimum age of a table before it is compacted. Do not compact tables newer than the the configured time. Default to 1 hour. 0s means no limit. This is useful to avoid compacting tables that will be updated with out-of-order writes.")
Expand Down Expand Up @@ -774,6 +780,10 @@ func (o *Overrides) BloomGatewayShardSize(userID string) int {
return o.getOverridesForUser(userID).BloomGatewayShardSize
}

func (o *Overrides) BloomGatewayEnabled(userID string) bool {
return o.getOverridesForUser(userID).BloomGatewayEnabled
}

func (o *Overrides) BloomCompactorShardSize(userID string) int {
return o.getOverridesForUser(userID).BloomCompactorShardSize
}
Expand Down
Loading