Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(storage): Simplify store interfaces and abstractions (pt 3) #10516

Closed
wants to merge 10 commits into from
3 changes: 2 additions & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/client"
chunk_util "github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/boltdb"
Expand Down Expand Up @@ -776,7 +777,7 @@ func (t *Loki) setupAsyncStore() error {
if asyncStore {
t.Cfg.StorageConfig.EnableAsyncStore = true

t.Cfg.StorageConfig.AsyncStoreConfig = storage.AsyncStoreCfg{
t.Cfg.StorageConfig.AsyncStoreConfig = stores.AsyncStoreCfg{
IngesterQuerier: t.ingesterQuerier,
QueryIngestersWithin: calculateAsyncStoreQueryIngestersWithin(
t.Cfg.Querier.QueryIngestersWithin,
Expand Down
9 changes: 5 additions & 4 deletions pkg/storage/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores"
)

var NilMetrics = NewChunkMetrics(nil, 0)
Expand Down Expand Up @@ -1010,7 +1011,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) {
t.Fatalf("error reading batch %s", err)
}

assertStream(t, tt.expected, streams.Streams)
AssertStream(t, tt.expected, streams.Streams)
})
}
}
Expand Down Expand Up @@ -1429,7 +1430,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) {
t.Fatalf("error reading batch %s", err)
}

assertSeries(t, tt.expected, series.Series)
AssertSeries(t, tt.expected, series.Series)
})
}
}
Expand Down Expand Up @@ -1660,7 +1661,7 @@ func TestBuildHeapIterator(t *testing.T) {
if err != nil {
t.Fatalf("error reading batch %s", err)
}
assertStream(t, tc.expected, streams.Streams)
AssertStream(t, tc.expected, streams.Streams)
})
}
}
Expand Down Expand Up @@ -1765,7 +1766,7 @@ func Benchmark_store_OverlappingChunks(b *testing.B) {
cfg: Config{
MaxChunkBatchSize: 50,
},
Store: newMockChunkStore(chunkfmt, headfmt, newOverlappingStreams(200, 200)),
store: stores.NewMockChunkStore(chunkfmt, headfmt, newOverlappingStreams(200, 200)),
}
b.ResetTimer()
statsCtx, ctx := stats.NewContext(user.InjectOrgID(context.Background(), "fake"))
Expand Down
14 changes: 5 additions & 9 deletions pkg/storage/chunk/cache/background_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,13 @@ func TestBackground(t *testing.T) {
WriteBackSizeLimit: flagext.ByteSize(limit),
}, cache.NewMockCache(), nil)

s := config.SchemaConfig{
Configs: []config.PeriodConfig{
{
From: config.DayTime{Time: 0},
Schema: "v11",
RowShards: 16,
},
},
p := config.PeriodConfig{
From: config.DayTime{Time: 0},
Schema: "v11",
RowShards: 16,
}

keys, chunks := fillCache(t, s, c)
keys, chunks := fillCache(t, p, c)
cache.Flush(c)

testCacheSingle(t, c, keys, chunks)
Expand Down
42 changes: 15 additions & 27 deletions pkg/storage/chunk/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

const userID = "1"

func fillCache(t *testing.T, scfg config.SchemaConfig, cache cache.Cache) ([]string, []chunk.Chunk) {
func fillCache(t *testing.T, p config.PeriodConfig, cache cache.Cache) ([]string, []chunk.Chunk) {
const chunkLen = 13 * 3600 // in seconds

// put a set of chunks, larger than background batch size, with varying timestamps and values
Expand Down Expand Up @@ -74,7 +74,7 @@ func fillCache(t *testing.T, scfg config.SchemaConfig, cache cache.Cache) ([]str
err = cleanChunk.Decode(chunk.NewDecodeContext(), buf)
require.NoError(t, err)

keys = append(keys, scfg.ExternalKey(c.ChunkRef))
keys = append(keys, p.ExternalKey(c.ChunkRef))
bufs = append(bufs, buf)
chunks = append(chunks, cleanChunk)
}
Expand Down Expand Up @@ -120,37 +120,28 @@ func testCacheMultiple(t *testing.T, cache cache.Cache, keys []string, chunks []
require.Equal(t, chunks, result)
}

func testChunkFetcher(t *testing.T, c cache.Cache, chunks []chunk.Chunk) {
s := config.SchemaConfig{
Configs: []config.PeriodConfig{
{
From: config.DayTime{Time: 0},
Schema: "v11",
RowShards: 16,
},
},
}
func testChunkFetcher(t *testing.T, p config.PeriodConfig, c cache.Cache, chunks []chunk.Chunk) {

fetcher, err := fetcher.New(c, nil, false, s, nil, 10, 100, 0)
fetcher, err := fetcher.New(c, nil, false, p, nil, 10, 100, 0)
require.NoError(t, err)
defer fetcher.Stop()

found, err := fetcher.FetchChunks(context.Background(), chunks)
require.NoError(t, err)
sort.Sort(byExternalKey{found, s})
sort.Sort(byExternalKey{chunks, s})
sort.Sort(byExternalKey{found, p})
sort.Sort(byExternalKey{chunks, p})
require.Equal(t, chunks, found)
}

type byExternalKey struct {
chunks []chunk.Chunk
scfg config.SchemaConfig
cfg config.PeriodConfig
}

func (a byExternalKey) Len() int { return len(a.chunks) }
func (a byExternalKey) Swap(i, j int) { a.chunks[i], a.chunks[j] = a.chunks[j], a.chunks[i] }
func (a byExternalKey) Less(i, j int) bool {
return a.scfg.ExternalKey(a.chunks[i].ChunkRef) < a.scfg.ExternalKey(a.chunks[j].ChunkRef)
return a.cfg.ExternalKey(a.chunks[i].ChunkRef) < a.cfg.ExternalKey(a.chunks[j].ChunkRef)
}

func testCacheMiss(t *testing.T, cache cache.Cache) {
Expand All @@ -164,16 +155,13 @@ func testCacheMiss(t *testing.T, cache cache.Cache) {
}

func testCache(t *testing.T, cache cache.Cache) {
s := config.SchemaConfig{
Configs: []config.PeriodConfig{
{
From: config.DayTime{Time: 0},
Schema: "v11",
RowShards: 16,
},
},
p := config.PeriodConfig{
From: config.DayTime{Time: 0},
Schema: "v11",
RowShards: 16,
}
keys, chunks := fillCache(t, s, cache)

keys, chunks := fillCache(t, p, cache)
t.Run("Single", func(t *testing.T) {
testCacheSingle(t, cache, keys, chunks)
})
Expand All @@ -184,7 +172,7 @@ func testCache(t *testing.T, cache cache.Cache) {
testCacheMiss(t, cache)
})
t.Run("Fetcher", func(t *testing.T) {
testChunkFetcher(t, cache, chunks)
testChunkFetcher(t, p, cache, chunks)
})
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/chunk/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ const chunkDecodeParallelism = 16
// and writing back any misses to the cache. Also responsible for decoding
// chunks from the cache, in parallel.
type Fetcher struct {
schema config.SchemaConfig
schema config.PeriodConfig
storage client.Client
cache cache.Cache
cachel2 cache.Cache
Expand Down Expand Up @@ -89,7 +89,7 @@ type decodeResponse struct {
}

// New makes a new ChunkFetcher.
func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config.SchemaConfig, storage client.Client, maxAsyncConcurrency int, maxAsyncBufferSize int, l2CacheHandoff time.Duration) (*Fetcher, error) {
func New(cache cache.Cache, cachel2 cache.Cache, cacheStubs bool, schema config.PeriodConfig, storage client.Client, maxAsyncConcurrency int, maxAsyncBufferSize int, l2CacheHandoff time.Duration) (*Fetcher, error) {
c := &Fetcher{
schema: schema,
storage: storage,
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/chunk/fetcher/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func Test(t *testing.T) {
assert.NoError(t, chunkClient.PutChunks(context.Background(), test.storeStart))

// Build fetcher
f, err := New(c1, c2, false, sc, chunkClient, 1, 1, test.handoff)
f, err := New(c1, c2, false, sc.Configs[0], chunkClient, 1, 1, test.handoff)
assert.NoError(t, err)

// Run the test
Expand Down Expand Up @@ -290,7 +290,7 @@ func BenchmarkFetch(b *testing.B) {
_ = chunkClient.PutChunks(context.Background(), test.storeStart)

// Build fetcher
f, _ := New(c1, c2, false, sc, chunkClient, 1, 1, test.handoff)
f, _ := New(c1, c2, false, sc.Configs[0], chunkClient, 1, 1, test.handoff)

for i := 0; i < b.N; i++ {
_, err := f.FetchChunks(context.Background(), test.fetch)
Expand Down
44 changes: 26 additions & 18 deletions pkg/storage/config/schema_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,20 @@ func (cfg *SchemaConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.fileName, "schema-config-file", "", "The path to the schema config file. The schema config is used only when running Cortex with the chunks storage.")
}

// Load the yaml file, or build the config from legacy command-line flags
func (cfg *SchemaConfig) Load() error {
if len(cfg.Configs) > 0 {
return nil
}

// Load config from file.
if err := cfg.loadFromFile(); err != nil {
return err
}

return cfg.Validate()
}

// loadFromFile loads the schema config from a yaml file
func (cfg *SchemaConfig) loadFromFile() error {
if cfg.fileName == "" {
Expand Down Expand Up @@ -448,20 +462,6 @@ func (cfg PeriodConfig) validate() error {
return nil
}

// Load the yaml file, or build the config from legacy command-line flags
func (cfg *SchemaConfig) Load() error {
if len(cfg.Configs) > 0 {
return nil
}

// Load config from file.
if err := cfg.loadFromFile(); err != nil {
return err
}

return cfg.Validate()
}

func (cfg *PeriodConfig) VersionAsInt() (int, error) {
// Read memoized schema version. This is called during unmarshaling,
// but may be nil in the case of testware.
Expand Down Expand Up @@ -507,6 +507,15 @@ func ValidatePathPrefix(prefix string) error {
return nil
}

// Generate the appropriate external key based on cfg.Schema, chunk.Checksum, and chunk.From
func (cfg *PeriodConfig) ExternalKey(ref logproto.ChunkRef) string {
v, err := cfg.VersionAsInt()
if err == nil && v >= 12 {
return newerExternalKey(ref)
}
return newExternalKey(ref)
}

// PeriodicTableConfig is configuration for a set of time-sharded tables.
type PeriodicTableConfig struct {
Prefix string `yaml:"prefix" doc:"description=Table prefix for all period tables."`
Expand Down Expand Up @@ -672,11 +681,10 @@ func (cfg *PeriodicTableConfig) tableForPeriod(i int64) string {
// Generate the appropriate external key based on cfg.Schema, chunk.Checksum, and chunk.From
func (cfg SchemaConfig) ExternalKey(ref logproto.ChunkRef) string {
p, err := cfg.SchemaForTime(ref.From)
v, _ := p.VersionAsInt()
if err == nil && v >= 12 {
return newerExternalKey(ref)
if err != nil {
return newExternalKey(ref)
}
return newExternalKey(ref)
return p.ExternalKey(ref)
}

// VersionForChunk will return the schema version associated with the `From` timestamp of a chunk.
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,8 @@ type Config struct {

// Config for using AsyncStore when using async index stores like `boltdb-shipper`.
// It is required for getting chunk ids of recently flushed chunks from the ingesters.
EnableAsyncStore bool `yaml:"-"`
AsyncStoreConfig AsyncStoreCfg `yaml:"-"`
EnableAsyncStore bool `yaml:"-"`
AsyncStoreConfig stores.AsyncStoreCfg `yaml:"-"`
}

// RegisterFlags adds the flags required to configure this flag set.
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func TestNamedStores(t *testing.T) {

t.Run("period config referring to unrecognized store", func(t *testing.T) {
schemaConfig := schemaConfig
cfg := cfg
schemaConfig.Configs[0].ObjectType = "not-found"
_, err := NewStore(cfg, config.ChunkStoreConfig{}, schemaConfig, limits, cm, nil, util_log.Logger, constants.Loki)
require.Error(t, err)
Expand Down
Loading
Loading