From 17248c5a5ae9213b113975b66367c71074612a6e Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Fri, 4 Oct 2024 18:04:22 +0900 Subject: [PATCH] Add multi-level chunk cache Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + docs/blocks-storage/querier.md | 21 +- docs/blocks-storage/store-gateway.md | 21 +- docs/configuration/config-file-reference.md | 21 +- integration/querier_test.go | 25 +++ pkg/storage/tsdb/caching_bucket.go | 123 +++++++---- pkg/storage/tsdb/caching_bucket_test.go | 70 +++++++ pkg/storage/tsdb/multilevel_chunk_cache.go | 149 ++++++++++++++ .../tsdb/multilevel_chunk_cache_test.go | 193 ++++++++++++++++++ 9 files changed, 581 insertions(+), 43 deletions(-) create mode 100644 pkg/storage/tsdb/multilevel_chunk_cache.go create mode 100644 pkg/storage/tsdb/multilevel_chunk_cache_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index a86e11d57d..0834d10a92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ * [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151 * [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129 * [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245 +* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249 * [ENHANCEMENT] Ingester: Add `blocks-storage.tsdb.wal-compression-type` to support zstd wal compression type. #6232 * [ENHANCEMENT] Query Frontend: Add info field to query response. #6207 * [ENHANCEMENT] Query Frontend: Add peakSample in query stats response. #6188 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index ba6caa34e3..80a7d9ae9b 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -788,8 +788,10 @@ blocks_storage: [max_backfill_items: | default = 10000] chunks_cache: - # Backend for chunks cache, if not empty. Supported values: memcached, - # redis, inmemory, and '' (disable). + # The chunks cache backend type. Single or Multiple cache backend can be + # provided. Supported values in single cache: memcached, redis, inmemory, + # and '' (disable). Supported values in multi level cache: a + # comma-separated list of (inmemory, memcached, redis) # CLI flag: -blocks-storage.bucket-store.chunks-cache.backend [backend: | default = ""] @@ -1000,6 +1002,21 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis.set-async.circuit-breaker.failure-percent [failure_percent: | default = 0.05] + multilevel: + # The maximum number of concurrent asynchronous operations can occur + # when backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-async-concurrency + [max_async_concurrency: | default = 50] + + # The maximum number of enqueued asynchronous operations allowed when + # backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # The maximum number of items to backfill per asynchronous operation. + # CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-backfill-items + [max_backfill_items: | default = 10000] + # Size of each subrange that bucket object is split into for better # caching. # CLI flag: -blocks-storage.bucket-store.chunks-cache.subrange-size diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 5279be4867..a11159b872 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -903,8 +903,10 @@ blocks_storage: [max_backfill_items: | default = 10000] chunks_cache: - # Backend for chunks cache, if not empty. Supported values: memcached, - # redis, inmemory, and '' (disable). + # The chunks cache backend type. Single or Multiple cache backend can be + # provided. Supported values in single cache: memcached, redis, inmemory, + # and '' (disable). Supported values in multi level cache: a + # comma-separated list of (inmemory, memcached, redis) # CLI flag: -blocks-storage.bucket-store.chunks-cache.backend [backend: | default = ""] @@ -1115,6 +1117,21 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis.set-async.circuit-breaker.failure-percent [failure_percent: | default = 0.05] + multilevel: + # The maximum number of concurrent asynchronous operations can occur + # when backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-async-concurrency + [max_async_concurrency: | default = 50] + + # The maximum number of enqueued asynchronous operations allowed when + # backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # The maximum number of items to backfill per asynchronous operation. + # CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-backfill-items + [max_backfill_items: | default = 10000] + # Size of each subrange that bucket object is split into for better # caching. # CLI flag: -blocks-storage.bucket-store.chunks-cache.subrange-size diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index ceda75705b..4e369a04ee 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -1339,8 +1339,10 @@ bucket_store: [max_backfill_items: | default = 10000] chunks_cache: - # Backend for chunks cache, if not empty. Supported values: memcached, - # redis, inmemory, and '' (disable). + # The chunks cache backend type. Single or Multiple cache backend can be + # provided. Supported values in single cache: memcached, redis, inmemory, + # and '' (disable). Supported values in multi level cache: a comma-separated + # list of (inmemory, memcached, redis) # CLI flag: -blocks-storage.bucket-store.chunks-cache.backend [backend: | default = ""] @@ -1549,6 +1551,21 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis.set-async.circuit-breaker.failure-percent [failure_percent: | default = 0.05] + multilevel: + # The maximum number of concurrent asynchronous operations can occur when + # backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-async-concurrency + [max_async_concurrency: | default = 50] + + # The maximum number of enqueued asynchronous operations allowed when + # backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # The maximum number of items to backfill per asynchronous operation. + # CLI flag: -blocks-storage.bucket-store.chunks-cache.multilevel.max-backfill-items + [max_backfill_items: | default = 10000] + # Size of each subrange that bucket object is split into for better caching. # CLI flag: -blocks-storage.bucket-store.chunks-cache.subrange-size [subrange_size: | default = 16000] diff --git a/integration/querier_test.go b/integration/querier_test.go index ae442bcb16..35319f2bde 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -97,6 +97,12 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) { chunkCacheBackend: tsdb.CacheBackendRedis, bucketIndexEnabled: true, }, + "blocks sharding disabled, in-memory chunk cache": { + blocksShardingStrategy: "", + indexCacheBackend: tsdb.IndexCacheBackendRedis, + chunkCacheBackend: tsdb.CacheBackendInMemory, + bucketIndexEnabled: true, + }, "blocks default sharding, in-memory chunk cache": { blocksShardingStrategy: "default", indexCacheBackend: tsdb.IndexCacheBackendRedis, @@ -110,6 +116,25 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) { chunkCacheBackend: tsdb.CacheBackendInMemory, bucketIndexEnabled: true, }, + "block sharding disabled, multi-level chunk cache": { + blocksShardingStrategy: "", + indexCacheBackend: tsdb.IndexCacheBackendRedis, + chunkCacheBackend: fmt.Sprintf("%v,%v,%v", tsdb.CacheBackendInMemory, tsdb.CacheBackendMemcached, tsdb.CacheBackendRedis), + bucketIndexEnabled: true, + }, + "block default sharding, multi-level chunk cache": { + blocksShardingStrategy: "default", + indexCacheBackend: tsdb.IndexCacheBackendRedis, + chunkCacheBackend: fmt.Sprintf("%v,%v,%v", tsdb.CacheBackendInMemory, tsdb.CacheBackendMemcached, tsdb.CacheBackendRedis), + bucketIndexEnabled: true, + }, + "block shuffle sharding, multi-level chunk cache": { + blocksShardingStrategy: "shuffle-sharding", + tenantShardSize: 1, + indexCacheBackend: tsdb.IndexCacheBackendRedis, + chunkCacheBackend: fmt.Sprintf("%v,%v,%v", tsdb.CacheBackendInMemory, tsdb.CacheBackendMemcached, tsdb.CacheBackendRedis), + bucketIndexEnabled: true, + }, } for testName, testCfg := range tests { diff --git a/pkg/storage/tsdb/caching_bucket.go b/pkg/storage/tsdb/caching_bucket.go index 4946e3f036..82403b7129 100644 --- a/pkg/storage/tsdb/caching_bucket.go +++ b/pkg/storage/tsdb/caching_bucket.go @@ -21,10 +21,15 @@ import ( "github.com/thanos-io/thanos/pkg/cacheutil" "github.com/thanos-io/thanos/pkg/model" storecache "github.com/thanos-io/thanos/pkg/store/cache" + + "github.com/cortexproject/cortex/pkg/util" ) var ( + supportedChunkCacheBackends = []string{CacheBackendInMemory, CacheBackendMemcached, CacheBackendRedis} + errUnsupportedChunkCacheBackend = errors.New("unsupported chunk cache backend") + errDuplicatedChunkCacheBackend = errors.New("duplicated chunk cache backend") ) const ( @@ -54,23 +59,52 @@ func (cfg *MetadataCacheBackend) Validate() error { } type ChunkCacheBackend struct { - Backend string `yaml:"backend"` - InMemory InMemoryChunkCacheConfig `yaml:"inmemory"` - Memcached MemcachedClientConfig `yaml:"memcached"` - Redis RedisClientConfig `yaml:"redis"` + Backend string `yaml:"backend"` + InMemory InMemoryChunkCacheConfig `yaml:"inmemory"` + Memcached MemcachedClientConfig `yaml:"memcached"` + Redis RedisClientConfig `yaml:"redis"` + MultiLevel MultiLevelChunkCacheConfig `yaml:"multilevel"` } // Validate the config. func (cfg *ChunkCacheBackend) Validate() error { - switch cfg.Backend { - case CacheBackendMemcached: - return cfg.Memcached.Validate() - case CacheBackendRedis: - return cfg.Redis.Validate() - case CacheBackendInMemory, "": - default: - return errUnsupportedChunkCacheBackend + if cfg.Backend == "" { + return nil + } + + splitBackends := strings.Split(cfg.Backend, ",") + configuredBackends := map[string]struct{}{} + + if len(splitBackends) > 1 { + if err := cfg.MultiLevel.Validate(); err != nil { + return err + } } + + for _, backend := range splitBackends { + if !util.StringsContain(supportedChunkCacheBackends, backend) { + return errUnsupportedChunkCacheBackend + } + + if _, ok := configuredBackends[backend]; ok { + return errDuplicatedChunkCacheBackend + } + + switch backend { + case CacheBackendMemcached: + if err := cfg.Memcached.Validate(); err != nil { + return err + } + case CacheBackendRedis: + if err := cfg.Redis.Validate(); err != nil { + return err + } + case CacheBackendInMemory: + } + + configuredBackends[backend] = struct{}{} + } + return nil } @@ -84,16 +118,22 @@ type ChunksCacheConfig struct { } func (cfg *ChunksCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { - f.StringVar(&cfg.Backend, prefix+"backend", "", fmt.Sprintf("Backend for chunks cache, if not empty. Supported values: %s, %s, %s, and '' (disable).", CacheBackendMemcached, CacheBackendRedis, CacheBackendInMemory)) + f.StringVar(&cfg.Backend, prefix+"backend", "", fmt.Sprintf("The chunks cache backend type. Single or Multiple cache backend can be provided. "+ + "Supported values in single cache: %s, %s, %s, and '' (disable). "+ + "Supported values in multi level cache: a comma-separated list of (%s)", CacheBackendMemcached, CacheBackendRedis, CacheBackendInMemory, strings.Join(supportedChunkCacheBackends, ", "))) cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.") cfg.Redis.RegisterFlagsWithPrefix(f, prefix+"redis.") cfg.InMemory.RegisterFlagsWithPrefix(f, prefix+"inmemory.") + cfg.MultiLevel.RegisterFlagsWithPrefix(f, prefix+"multilevel.") f.Int64Var(&cfg.SubrangeSize, prefix+"subrange-size", 16000, "Size of each subrange that bucket object is split into for better caching.") f.IntVar(&cfg.MaxGetRangeRequests, prefix+"max-get-range-requests", 3, "Maximum number of sub-GetRange requests that a single GetRange request can be split into when fetching chunks. Zero or negative value = unlimited number of sub-requests.") f.DurationVar(&cfg.AttributesTTL, prefix+"attributes-ttl", 168*time.Hour, "TTL for caching object attributes for chunks.") f.DurationVar(&cfg.SubrangeTTL, prefix+"subrange-ttl", 24*time.Hour, "TTL for caching individual chunks subranges.") + + // In the multi level chunk cache, backfill TTL follows subrange TTL + cfg.ChunkCacheBackend.MultiLevel.BackFillTTL = cfg.SubrangeTTL } func (cfg *ChunksCacheConfig) Validate() error { @@ -230,34 +270,43 @@ func createMetadataCache(cacheName string, cacheBackend *MetadataCacheBackend, l } func createChunkCache(cacheName string, cacheBackend *ChunkCacheBackend, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) { - switch cacheBackend.Backend { - case "": - // No caching. - return nil, nil - case CacheBackendInMemory: - inMemoryCache, err := cache.NewInMemoryCacheWithConfig(cacheName, logger, reg, cacheBackend.InMemory.toInMemoryChunkCacheConfig()) - if err != nil { - return nil, errors.Wrapf(err, "failed to create in-memory chunk cache") - } - return inMemoryCache, nil - case CacheBackendMemcached: - var client cacheutil.MemcachedClient - client, err := cacheutil.NewMemcachedClientWithConfig(logger, cacheName, cacheBackend.Memcached.ToMemcachedClientConfig(), reg) - if err != nil { - return nil, errors.Wrapf(err, "failed to create memcached client") - } - return cache.NewMemcachedCache(cacheName, logger, client, reg), nil + splitBackends := strings.Split(cacheBackend.Backend, ",") + var ( + caches []cache.Cache + ) - case CacheBackendRedis: - redisCache, err := cacheutil.NewRedisClientWithConfig(logger, cacheName, cacheBackend.Redis.ToRedisClientConfig(), reg) - if err != nil { - return nil, errors.Wrapf(err, "failed to create redis client") + for i, backend := range splitBackends { + iReg := reg + + // Create the level label if we have more than one cache + if len(splitBackends) > 1 { + iReg = prometheus.WrapRegistererWith(prometheus.Labels{"level": fmt.Sprintf("L%v", i)}, reg) } - return cache.NewRedisCache(cacheName, logger, redisCache, reg), nil - default: - return nil, errors.Errorf("unsupported cache type for cache %s: %s", cacheName, cacheBackend.Backend) + switch backend { + case CacheBackendInMemory: + inMemoryCache, err := cache.NewInMemoryCacheWithConfig(cacheName, logger, iReg, cacheBackend.InMemory.toInMemoryChunkCacheConfig()) + if err != nil { + return nil, errors.Wrapf(err, "failed to create in-memory chunk cache") + } + caches = append(caches, inMemoryCache) + case CacheBackendMemcached: + var client cacheutil.MemcachedClient + client, err := cacheutil.NewMemcachedClientWithConfig(logger, cacheName, cacheBackend.Memcached.ToMemcachedClientConfig(), reg) + if err != nil { + return nil, errors.Wrapf(err, "failed to create memcached client") + } + caches = append(caches, cache.NewMemcachedCache(cacheName, logger, client, iReg)) + case CacheBackendRedis: + redisCache, err := cacheutil.NewRedisClientWithConfig(logger, cacheName, cacheBackend.Redis.ToRedisClientConfig(), reg) + if err != nil { + return nil, errors.Wrapf(err, "failed to create redis client") + } + caches = append(caches, cache.NewRedisCache(cacheName, logger, redisCache, iReg)) + } } + + return newMultiLevelChunkCache(cacheName, cacheBackend.MultiLevel, reg, caches...), nil } type Matchers struct { diff --git a/pkg/storage/tsdb/caching_bucket_test.go b/pkg/storage/tsdb/caching_bucket_test.go index 78ad1fb9b9..875134452e 100644 --- a/pkg/storage/tsdb/caching_bucket_test.go +++ b/pkg/storage/tsdb/caching_bucket_test.go @@ -49,6 +49,76 @@ func Test_ChunkCacheBackendValidation(t *testing.T) { }, expectedErr: errUnsupportedChunkCacheBackend, }, + "valid multi chunk cache type": { + cfg: ChunkCacheBackend{ + Backend: fmt.Sprintf("%s,%s,%s", CacheBackendInMemory, CacheBackendMemcached, CacheBackendRedis), + Memcached: MemcachedClientConfig{ + Addresses: "dns+localhost:11211", + }, + Redis: RedisClientConfig{ + Addresses: "localhost:6379", + }, + MultiLevel: MultiLevelChunkCacheConfig{ + MaxAsyncConcurrency: 1, + MaxAsyncBufferSize: 1, + MaxBackfillItems: 1, + }, + }, + expectedErr: nil, + }, + "duplicate multi chunk cache type": { + cfg: ChunkCacheBackend{ + Backend: fmt.Sprintf("%s,%s", CacheBackendInMemory, CacheBackendInMemory), + MultiLevel: MultiLevelChunkCacheConfig{ + MaxAsyncConcurrency: 1, + MaxAsyncBufferSize: 1, + MaxBackfillItems: 1, + }, + }, + expectedErr: errDuplicatedChunkCacheBackend, + }, + "invalid max async concurrency": { + cfg: ChunkCacheBackend{ + Backend: fmt.Sprintf("%s,%s", CacheBackendInMemory, CacheBackendMemcached), + Memcached: MemcachedClientConfig{ + Addresses: "dns+localhost:11211", + }, + MultiLevel: MultiLevelChunkCacheConfig{ + MaxAsyncConcurrency: 0, + MaxAsyncBufferSize: 1, + MaxBackfillItems: 1, + }, + }, + expectedErr: errInvalidMaxAsyncConcurrency, + }, + "invalid max async buffer size": { + cfg: ChunkCacheBackend{ + Backend: fmt.Sprintf("%s,%s", CacheBackendInMemory, CacheBackendMemcached), + Memcached: MemcachedClientConfig{ + Addresses: "dns+localhost:11211", + }, + MultiLevel: MultiLevelChunkCacheConfig{ + MaxAsyncConcurrency: 1, + MaxAsyncBufferSize: 0, + MaxBackfillItems: 1, + }, + }, + expectedErr: errInvalidMaxAsyncBufferSize, + }, + "invalid max back fill items": { + cfg: ChunkCacheBackend{ + Backend: fmt.Sprintf("%s,%s", CacheBackendInMemory, CacheBackendMemcached), + Memcached: MemcachedClientConfig{ + Addresses: "dns+localhost:11211", + }, + MultiLevel: MultiLevelChunkCacheConfig{ + MaxAsyncConcurrency: 1, + MaxAsyncBufferSize: 1, + MaxBackfillItems: 0, + }, + }, + expectedErr: errInvalidMaxBackfillItems, + }, } for name, tc := range tests { diff --git a/pkg/storage/tsdb/multilevel_chunk_cache.go b/pkg/storage/tsdb/multilevel_chunk_cache.go new file mode 100644 index 0000000000..0d2ea69edf --- /dev/null +++ b/pkg/storage/tsdb/multilevel_chunk_cache.go @@ -0,0 +1,149 @@ +package tsdb + +import ( + "context" + "errors" + "flag" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/thanos-io/thanos/pkg/cache" + "github.com/thanos-io/thanos/pkg/cacheutil" +) + +type multiLevelChunkCache struct { + name string + caches []cache.Cache + + backfillProcessor *cacheutil.AsyncOperationProcessor + fetchLatency *prometheus.HistogramVec + backFillLatency *prometheus.HistogramVec + storeDroppedItems prometheus.Counter + backfillDroppedItems prometheus.Counter + maxBackfillItems int + backfillTTL time.Duration +} + +type MultiLevelChunkCacheConfig struct { + MaxAsyncConcurrency int `yaml:"max_async_concurrency"` + MaxAsyncBufferSize int `yaml:"max_async_buffer_size"` + MaxBackfillItems int `yaml:"max_backfill_items"` + + BackFillTTL time.Duration `yaml:"-"` +} + +func (cfg *MultiLevelChunkCacheConfig) Validate() error { + if cfg.MaxAsyncBufferSize <= 0 { + return errInvalidMaxAsyncBufferSize + } + if cfg.MaxAsyncConcurrency <= 0 { + return errInvalidMaxAsyncConcurrency + } + if cfg.MaxBackfillItems <= 0 { + return errInvalidMaxBackfillItems + } + return nil +} + +func (cfg *MultiLevelChunkCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { + f.IntVar(&cfg.MaxAsyncConcurrency, prefix+"max-async-concurrency", 50, "The maximum number of concurrent asynchronous operations can occur when backfilling cache items.") + f.IntVar(&cfg.MaxAsyncBufferSize, prefix+"max-async-buffer-size", 10000, "The maximum number of enqueued asynchronous operations allowed when backfilling cache items.") + f.IntVar(&cfg.MaxBackfillItems, prefix+"max-backfill-items", 10000, "The maximum number of items to backfill per asynchronous operation.") +} + +func newMultiLevelChunkCache(name string, cfg MultiLevelChunkCacheConfig, reg prometheus.Registerer, c ...cache.Cache) cache.Cache { + if len(c) == 1 { + return c[0] + } + + return &multiLevelChunkCache{ + name: name, + caches: c, + backfillProcessor: cacheutil.NewAsyncOperationProcessor(cfg.MaxAsyncBufferSize, cfg.MaxAsyncConcurrency), + fetchLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "cortex_store_multilevel_chunk_cache_fetch_duration_seconds", + Help: "Histogram to track latency to fetch items from multi level chunk cache", + Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10, 15, 20, 25, 30, 40, 50, 60, 90}, + }, nil), + backFillLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "cortex_store_multilevel_chunk_cache_backfill_duration_seconds", + Help: "Histogram to track latency to backfill items from multi level chunk cache", + Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10, 15, 20, 25, 30, 40, 50, 60, 90}, + }, nil), + storeDroppedItems: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_store_multilevel_chunk_cache_backfill_dropped_items_total", + Help: "Total number of items dropped due to async buffer full when backfilling multilevel cache ", + }), + backfillDroppedItems: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "cortex_store_multilevel_chunk_cache_store_dropped_items_total", + Help: "Total number of items dropped due to async buffer full when storing multilevel cache ", + }), + maxBackfillItems: cfg.MaxBackfillItems, + backfillTTL: cfg.BackFillTTL, + } +} + +func (m *multiLevelChunkCache) Store(data map[string][]byte, ttl time.Duration) { + for _, c := range m.caches { + if err := m.backfillProcessor.EnqueueAsync(func() { + c.Store(data, ttl) + }); errors.Is(err, cacheutil.ErrAsyncBufferFull) { + m.storeDroppedItems.Inc() + } + } +} + +func (m *multiLevelChunkCache) Fetch(ctx context.Context, keys []string) map[string][]byte { + timer := prometheus.NewTimer(m.fetchLatency.WithLabelValues()) + defer timer.ObserveDuration() + + hits := map[string][]byte{} + backfillItems := make([]map[string][]byte, len(m.caches)-1) + + for i, c := range m.caches { + if i < len(m.caches)-1 { + backfillItems[i] = map[string][]byte{} + } + if ctx.Err() != nil { + return nil + } + if data := c.Fetch(ctx, keys); len(data) > 0 { + for k, d := range data { + hits[k] = d + } + + if i > 0 && len(hits) > 0 { + backfillItems[i-1] = hits + } + + if len(hits) == len(keys) { + // fetch done + break + } + } + } + + defer func() { + backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues()) + defer backFillTimer.ObserveDuration() + + for i, values := range backfillItems { + if len(values) == 0 { + continue + } + + if err := m.backfillProcessor.EnqueueAsync(func() { + m.caches[i].Store(values, m.backfillTTL) + }); errors.Is(err, cacheutil.ErrAsyncBufferFull) { + m.backfillDroppedItems.Inc() + } + } + }() + + return hits +} + +func (m *multiLevelChunkCache) Name() string { + return m.name +} diff --git a/pkg/storage/tsdb/multilevel_chunk_cache_test.go b/pkg/storage/tsdb/multilevel_chunk_cache_test.go new file mode 100644 index 0000000000..c72c1f3a55 --- /dev/null +++ b/pkg/storage/tsdb/multilevel_chunk_cache_test.go @@ -0,0 +1,193 @@ +package tsdb + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" +) + +func Test_MultiLevelChunkCacheStore(t *testing.T) { + ttl := time.Hour * 24 + cfg := MultiLevelChunkCacheConfig{ + MaxAsyncConcurrency: 10, + MaxAsyncBufferSize: 100000, + MaxBackfillItems: 10000, + BackFillTTL: ttl, + } + + data := map[string][]byte{ + "key1": []byte("value1"), + "key2": []byte("value2"), + "key3": []byte("value3"), + } + + testCases := map[string]struct { + m1InitData map[string][]byte + m2InitData map[string][]byte + expectedM1Data map[string][]byte + expectedM2Data map[string][]byte + storeData map[string][]byte + }{ + "should stored data to both caches": { + m1InitData: nil, + m2InitData: nil, + expectedM1Data: data, + expectedM2Data: data, + storeData: data, + }, + "should stored data to m1 cache": { + m1InitData: nil, + m2InitData: data, + expectedM1Data: data, + expectedM2Data: data, + storeData: data, + }, + "should stored data to m2 cache": { + m1InitData: data, + m2InitData: nil, + expectedM1Data: data, + expectedM2Data: data, + storeData: data, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + m1 := newMockChunkCache("m1", tc.m1InitData) + m2 := newMockChunkCache("m2", tc.m2InitData) + reg := prometheus.NewRegistry() + c := newMultiLevelChunkCache("chunk-cache", cfg, reg, m1, m2) + c.Store(tc.storeData, ttl) + + mlc := c.(*multiLevelChunkCache) + // Wait until async operation finishes. + mlc.backfillProcessor.Stop() + + require.Equal(t, tc.expectedM1Data, m1.data) + require.Equal(t, tc.expectedM2Data, m2.data) + }) + } +} + +func Test_MultiLevelChunkCacheFetch(t *testing.T) { + cfg := MultiLevelChunkCacheConfig{ + MaxAsyncConcurrency: 10, + MaxAsyncBufferSize: 100000, + MaxBackfillItems: 10000, + BackFillTTL: time.Hour * 24, + } + + testCases := map[string]struct { + m1ExistingData map[string][]byte + m2ExistingData map[string][]byte + expectedM1Data map[string][]byte + expectedM2Data map[string][]byte + expectedFetchedData map[string][]byte + fetchKeys []string + }{ + "fetched data should be union of m1, m2 and 'key2' and `key3' should be backfilled to m1": { + m1ExistingData: map[string][]byte{ + "key1": []byte("value1"), + }, + m2ExistingData: map[string][]byte{ + "key2": []byte("value2"), + "key3": []byte("value3"), + }, + expectedM1Data: map[string][]byte{ + "key1": []byte("value1"), + "key2": []byte("value2"), + "key3": []byte("value3"), + }, + expectedM2Data: map[string][]byte{ + "key2": []byte("value2"), + "key3": []byte("value3"), + }, + expectedFetchedData: map[string][]byte{ + "key1": []byte("value1"), + "key2": []byte("value2"), + "key3": []byte("value3"), + }, + fetchKeys: []string{"key1", "key2", "key3"}, + }, + "should be not fetched data that do not exist in both caches": { + m1ExistingData: map[string][]byte{ + "key1": []byte("value1"), + }, + m2ExistingData: map[string][]byte{ + "key2": []byte("value2"), + }, + expectedM1Data: map[string][]byte{ + "key1": []byte("value1"), + "key2": []byte("value2"), + }, + expectedM2Data: map[string][]byte{ + "key2": []byte("value2"), + }, + expectedFetchedData: map[string][]byte{ + "key1": []byte("value1"), + "key2": []byte("value2"), + }, + fetchKeys: []string{"key1", "key2", "key3"}, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + m1 := newMockChunkCache("m1", tc.m1ExistingData) + m2 := newMockChunkCache("m2", tc.m2ExistingData) + reg := prometheus.NewRegistry() + c := newMultiLevelChunkCache("chunk-cache", cfg, reg, m1, m2) + fetchData := c.Fetch(context.Background(), tc.fetchKeys) + + mlc := c.(*multiLevelChunkCache) + // Wait until async operation finishes. + mlc.backfillProcessor.Stop() + + require.Equal(t, tc.expectedM1Data, m1.data) + require.Equal(t, tc.expectedM2Data, m2.data) + require.Equal(t, tc.expectedFetchedData, fetchData) + }) + } +} + +type mockChunkCache struct { + mu sync.Mutex + name string + data map[string][]byte +} + +func newMockChunkCache(name string, data map[string][]byte) *mockChunkCache { + if data == nil { + data = make(map[string][]byte) + } + + return &mockChunkCache{ + name: name, + data: data, + } +} + +func (m *mockChunkCache) Store(data map[string][]byte, _ time.Duration) { + m.data = data +} + +func (m *mockChunkCache) Fetch(_ context.Context, keys []string) map[string][]byte { + m.mu.Lock() + defer m.mu.Unlock() + h := map[string][]byte{} + + for _, k := range keys { + if _, ok := m.data[k]; ok { + h[k] = m.data[k] + } + } + + return h +} + +func (m *mockChunkCache) Name() string { + return m.name +}