Skip to content

Commit

Permalink
chore(blooms): Implement BloomStore as a service (grafana#12044)
Browse files Browse the repository at this point in the history
The bloomstore service is instantiated once at startup and used as dependency for both the bloom compactor and the bloom gateway.

This will prevent instantiating the store in different ways.


Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum authored and rhnasc committed Apr 12, 2024
1 parent b688201 commit 630dcd0
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 116 deletions.
19 changes: 7 additions & 12 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,17 @@ func New(
fetcherProvider stores.ChunkFetcherProvider,
sharding util_ring.TenantSharding,
limits Limits,
store bloomshipper.Store,
logger log.Logger,
r prometheus.Registerer,
) (*Compactor, error) {
c := &Compactor{
cfg: cfg,
schemaCfg: schemaCfg,
logger: logger,
sharding: sharding,
limits: limits,
cfg: cfg,
schemaCfg: schemaCfg,
logger: logger,
sharding: sharding,
limits: limits,
bloomStore: store,
}

tsdbStore, err := NewTSDBStores(schemaCfg, storeCfg, clientMetrics)
Expand All @@ -84,13 +86,6 @@ func New(
}
c.tsdbStore = tsdbStore

// TODO(owen-d): export bloomstore as a dependency that can be reused by the compactor & gateway rather that
bloomStore, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storeCfg, clientMetrics, nil, nil, logger)
if err != nil {
return nil, errors.Wrap(err, "failed to create bloom store")
}
c.bloomStore = bloomStore

// initialize metrics
c.btMetrics = v1.NewMetrics(prometheus.WrapRegistererWithPrefix("loki_bloom_tokenizer_", r))
c.metrics = NewMetrics(r, c.btMetrics)
Expand Down
33 changes: 2 additions & 31 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,8 @@ import (

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/queue"
"github.com/grafana/loki/pkg/storage"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/constants"
Expand Down Expand Up @@ -181,7 +177,7 @@ func (l *fixedQueueLimits) MaxConsumers(_ string, _ int) int {
}

// New returns a new instance of the Bloom Gateway.
func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, overrides Limits, cm storage.ClientMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
func New(cfg Config, store bloomshipper.Store, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
g := &Gateway{
cfg: cfg,
logger: logger,
Expand All @@ -192,35 +188,11 @@ func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, o
},
workerMetrics: newWorkerMetrics(reg, constants.Loki, metricsSubsystem),
queueMetrics: queue.NewMetrics(reg, constants.Loki, metricsSubsystem),
bloomStore: store,
}
var err error

g.queue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, time.Minute, &fixedQueueLimits{0}, g.queueMetrics)
g.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(g.queueMetrics.Cleanup)

var metasCache cache.Cache
mcCfg := storageCfg.BloomShipperConfig.MetasCache
if cache.IsCacheConfigured(mcCfg) {
metasCache, err = cache.New(mcCfg, reg, logger, stats.BloomMetasCache, constants.Loki)
if err != nil {
return nil, err
}
}

var blocksCache cache.TypedCache[string, bloomshipper.BlockDirectory]
bcCfg := storageCfg.BloomShipperConfig.BlocksCache
if bcCfg.IsEnabled() {
blocksCache = bloomshipper.NewBlocksCache(bcCfg, reg, logger)
}

store, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, cm, metasCache, blocksCache, logger)
if err != nil {
return nil, err
}

// We need to keep a reference to be able to call Stop() on shutdown of the gateway.
g.bloomStore = store

if err := g.initServices(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -286,7 +258,6 @@ func (g *Gateway) running(ctx context.Context) error {
}

func (g *Gateway) stopping(_ error) error {
g.bloomStore.Stop()
return services.StopManagerAndAwaitStopped(context.Background(), g.serviceMngr)
}

Expand Down
90 changes: 31 additions & 59 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"math/rand"
"os"
"testing"
"time"

Expand All @@ -26,6 +25,7 @@ import (
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
bloomshipperconfig "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config"
lokiring "github.com/grafana/loki/pkg/util/ring"
"github.com/grafana/loki/pkg/validation"
Expand All @@ -46,10 +46,8 @@ func newLimits() *validation.Overrides {
return overrides
}

func TestBloomGateway_StartStopService(t *testing.T) {
func setupBloomStore(t *testing.T) *bloomshipper.BloomStore {
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
limits := newLimits()

cm := storage.NewClientMetrics()
t.Cleanup(cm.Unregister)
Expand Down Expand Up @@ -79,6 +77,17 @@ func TestBloomGateway_StartStopService(t *testing.T) {
},
}

store, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, cm, nil, nil, logger)
require.NoError(t, err)
t.Cleanup(store.Stop)

return store
}

func TestBloomGateway_StartStopService(t *testing.T) {
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()

t.Run("start and stop bloom gateway", func(t *testing.T) {
kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), logger, reg)
t.Cleanup(func() {
Expand All @@ -99,7 +108,8 @@ func TestBloomGateway_StartStopService(t *testing.T) {
MaxOutstandingPerTenant: 1024,
}

gw, err := New(cfg, schemaCfg, storageCfg, limits, cm, logger, reg)
store := setupBloomStore(t)
gw, err := New(cfg, store, logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
Expand All @@ -116,37 +126,10 @@ func TestBloomGateway_StartStopService(t *testing.T) {

func TestBloomGateway_FilterChunkRefs(t *testing.T) {
tenantID := "test"
logger := log.NewLogfmtLogger(os.Stderr)
reg := prometheus.NewRegistry()
limits := newLimits()

cm := storage.NewClientMetrics()
t.Cleanup(cm.Unregister)

p := config.PeriodConfig{
From: parseDayTime("2023-09-01"),
IndexTables: config.IndexPeriodicTableConfig{
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_",
Period: 24 * time.Hour,
},
},
IndexType: config.TSDBType,
ObjectType: config.StorageTypeFileSystem,
Schema: "v13",
RowShards: 16,
}
schemaCfg := config.SchemaConfig{
Configs: []config.PeriodConfig{p},
}
storageCfg := storage.Config{
BloomShipperConfig: bloomshipperconfig.Config{
WorkingDirectory: t.TempDir(),
},
FSConfig: local.FSConfig{
Directory: t.TempDir(),
},
}
store := setupBloomStore(t)
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()

kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), logger, reg)
t.Cleanup(func() {
Expand All @@ -168,20 +151,14 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
}

t.Run("shipper error is propagated", func(t *testing.T) {
reg := prometheus.NewRegistry()
gw, err := New(cfg, schemaCfg, storageCfg, limits, cm, logger, reg)
require.NoError(t, err)

now := mktime("2023-10-03 10:00")

// replace store implementation and re-initialize workers and sub-services
_, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)

mockStore := newMockBloomStore(queriers, metas)
mockStore.err = errors.New("request failed")
gw.bloomStore = mockStore

err = gw.initServices()
reg := prometheus.NewRegistry()
gw, err := New(cfg, mockStore, logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
Expand Down Expand Up @@ -216,20 +193,15 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
})

t.Run("request cancellation does not result in channel locking", func(t *testing.T) {
reg := prometheus.NewRegistry()
gw, err := New(cfg, schemaCfg, storageCfg, limits, cm, logger, reg)
require.NoError(t, err)

now := mktime("2024-01-25 10:00")

// replace store implementation and re-initialize workers and sub-services
_, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)

mockStore := newMockBloomStore(queriers, metas)
mockStore.delay = 2000 * time.Millisecond
gw.bloomStore = mockStore

err = gw.initServices()
reg := prometheus.NewRegistry()
gw, err := New(cfg, mockStore, logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
Expand Down Expand Up @@ -264,8 +236,10 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
})

t.Run("returns unfiltered chunk refs if no filters provided", func(t *testing.T) {
now := mktime("2023-10-03 10:00")

reg := prometheus.NewRegistry()
gw, err := New(cfg, schemaCfg, storageCfg, limits, cm, logger, reg)
gw, err := New(cfg, store, logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
Expand All @@ -275,8 +249,6 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
require.NoError(t, err)
})

now := mktime("2023-10-03 10:00")

chunkRefs := []*logproto.ChunkRef{
{Fingerprint: 3000, UserID: tenantID, From: now.Add(-24 * time.Hour), Through: now.Add(-23 * time.Hour), Checksum: 1},
{Fingerprint: 1000, UserID: tenantID, From: now.Add(-22 * time.Hour), Through: now.Add(-21 * time.Hour), Checksum: 2},
Expand Down Expand Up @@ -309,8 +281,10 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
})

t.Run("gateway tracks active users", func(t *testing.T) {
now := mktime("2023-10-03 10:00")

reg := prometheus.NewRegistry()
gw, err := New(cfg, schemaCfg, storageCfg, limits, cm, logger, reg)
gw, err := New(cfg, store, logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
Expand All @@ -320,8 +294,6 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
require.NoError(t, err)
})

now := mktime("2023-10-03 10:00")

tenants := []string{"tenant-a", "tenant-b", "tenant-c"}
for idx, tenantID := range tenants {
chunkRefs := []*logproto.ChunkRef{
Expand Down Expand Up @@ -349,12 +321,12 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
})

t.Run("use fuse queriers to filter chunks", func(t *testing.T) {
now := mktime("2023-10-03 10:00")

reg := prometheus.NewRegistry()
gw, err := New(cfg, schemaCfg, storageCfg, limits, cm, logger, reg)
gw, err := New(cfg, store, logger, reg)
require.NoError(t, err)

now := mktime("2023-10-03 10:00")

// replace store implementation and re-initialize workers and sub-services
_, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)

Expand Down
5 changes: 3 additions & 2 deletions pkg/logproto/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import (
"testing"
"unsafe"

"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/querier/plan"
jsoniter "github.com/json-iterator/go"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/querier/plan"
)

// This test verifies that jsoninter uses our custom method for marshalling.
Expand Down
7 changes: 5 additions & 2 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/indexgateway"
"github.com/grafana/loki/pkg/tracing"
"github.com/grafana/loki/pkg/util"
Expand Down Expand Up @@ -304,6 +305,7 @@ type Loki struct {
querierAPI *querier.QuerierAPI
ingesterQuerier *querier.IngesterQuerier
Store storage.Store
BloomStore bloomshipper.Store
tableManager *index.TableManager
frontend Frontend
ruler *base_ruler.Ruler
Expand Down Expand Up @@ -602,6 +604,7 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(RuleEvaluator, t.initRuleEvaluator, modules.UserInvisibleModule)
mm.RegisterModule(TableManager, t.initTableManager)
mm.RegisterModule(Compactor, t.initCompactor)
mm.RegisterModule(BloomStore, t.initBloomStore)
mm.RegisterModule(BloomCompactor, t.initBloomCompactor)
mm.RegisterModule(BloomCompactorRing, t.initBloomCompactorRing, modules.UserInvisibleModule)
mm.RegisterModule(IndexGateway, t.initIndexGateway)
Expand Down Expand Up @@ -638,8 +641,8 @@ func (t *Loki) setupModuleManager() error {
TableManager: {Server, Analytics},
Compactor: {Server, Overrides, MemberlistKV, Analytics},
IndexGateway: {Server, Store, IndexGatewayRing, IndexGatewayInterceptors, Analytics},
BloomGateway: {Server, BloomGatewayRing, Analytics},
BloomCompactor: {Server, BloomCompactorRing, Analytics, Store},
BloomGateway: {Server, BloomStore, BloomGatewayRing, Analytics},
BloomCompactor: {Server, BloomStore, BloomCompactorRing, Analytics, Store},
IngesterQuerier: {Ring},
QuerySchedulerRing: {Overrides, MemberlistKV},
IndexGatewayRing: {Overrides, MemberlistKV},
Expand Down
Loading

0 comments on commit 630dcd0

Please sign in to comment.