Skip to content

Commit

Permalink
Remove preemptive interface from indexReaderWriter
Browse files Browse the repository at this point in the history
and make it an exported struct

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed Sep 12, 2023
1 parent 764490c commit 9ca7323
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 40 deletions.
6 changes: 3 additions & 3 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,11 +327,11 @@ func (s *LokiStore) storeForPeriod(p config.PeriodConfig, tableRange config.Tabl
}

indexReaderWriter := series.NewIndexReaderWriter(s.schemaCfg, schema, idx, f, s.cfg.MaxChunkBatchSize, s.writeDedupeCache)
indexReaderWriter = index.NewMonitoredReaderWriter(indexReaderWriter, indexClientReg)
chunkWriter := stores.NewChunkWriter(f, s.schemaCfg, indexReaderWriter, s.storeCfg.DisableIndexDeduplication)
monitoredReaderWriter := index.NewMonitoredReaderWriter(indexReaderWriter, indexClientReg)
chunkWriter := stores.NewChunkWriter(f, s.schemaCfg, monitoredReaderWriter, s.storeCfg.DisableIndexDeduplication)

return chunkWriter,
indexReaderWriter,
monitoredReaderWriter,
func() {
chunkClient.Stop()
f.Stop()
Expand Down
22 changes: 11 additions & 11 deletions pkg/storage/stores/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,19 @@ type ReaderWriter interface {
Writer
}

type monitoredReaderWriter struct {
type MonitoredReaderWriter struct {
rw ReaderWriter
metrics *metrics
}

func NewMonitoredReaderWriter(rw ReaderWriter, reg prometheus.Registerer) ReaderWriter {
return &monitoredReaderWriter{
func NewMonitoredReaderWriter(rw ReaderWriter, reg prometheus.Registerer) *MonitoredReaderWriter {
return &MonitoredReaderWriter{
rw: rw,
metrics: newMetrics(reg),
}
}

func (m monitoredReaderWriter) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]logproto.ChunkRef, error) {
func (m MonitoredReaderWriter) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]logproto.ChunkRef, error) {
var chunks []logproto.ChunkRef

if err := loki_instrument.TimeRequest(ctx, "chunk_refs", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error {
Expand All @@ -72,7 +72,7 @@ func (m monitoredReaderWriter) GetChunkRefs(ctx context.Context, userID string,
return chunks, nil
}

func (m monitoredReaderWriter) GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) {
func (m MonitoredReaderWriter) GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) {
var lbls []labels.Labels
if err := loki_instrument.TimeRequest(ctx, "series", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error {
var err error
Expand All @@ -85,7 +85,7 @@ func (m monitoredReaderWriter) GetSeries(ctx context.Context, userID string, fro
return lbls, nil
}

func (m monitoredReaderWriter) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) {
func (m MonitoredReaderWriter) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) {
var values []string
if err := loki_instrument.TimeRequest(ctx, "label_values", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error {
var err error
Expand All @@ -98,7 +98,7 @@ func (m monitoredReaderWriter) LabelValuesForMetricName(ctx context.Context, use
return values, nil
}

func (m monitoredReaderWriter) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
func (m MonitoredReaderWriter) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
var values []string
if err := loki_instrument.TimeRequest(ctx, "label_names", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error {
var err error
Expand All @@ -111,7 +111,7 @@ func (m monitoredReaderWriter) LabelNamesForMetricName(ctx context.Context, user
return values, nil
}

func (m monitoredReaderWriter) Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) {
func (m MonitoredReaderWriter) Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error) {
var sts *stats.Stats
if err := loki_instrument.TimeRequest(ctx, "stats", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error {
var err error
Expand All @@ -124,7 +124,7 @@ func (m monitoredReaderWriter) Stats(ctx context.Context, userID string, from, t
return sts, nil
}

func (m monitoredReaderWriter) Volume(ctx context.Context, userID string, from, through model.Time, limit int32, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) (*logproto.VolumeResponse, error) {
func (m MonitoredReaderWriter) Volume(ctx context.Context, userID string, from, through model.Time, limit int32, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) (*logproto.VolumeResponse, error) {
var vol *logproto.VolumeResponse
if err := loki_instrument.TimeRequest(ctx, "volume", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error {
var err error
Expand All @@ -137,11 +137,11 @@ func (m monitoredReaderWriter) Volume(ctx context.Context, userID string, from,
return vol, nil
}

func (m monitoredReaderWriter) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) {
func (m MonitoredReaderWriter) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) {
m.rw.SetChunkFilterer(chunkFilter)
}

func (m monitoredReaderWriter) IndexChunk(ctx context.Context, from, through model.Time, chk chunk.Chunk) error {
func (m MonitoredReaderWriter) IndexChunk(ctx context.Context, from, through model.Time, chk chunk.Chunk) error {
return loki_instrument.TimeRequest(ctx, "index_chunk", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error {
return m.rw.IndexChunk(ctx, from, through, chk)
})
Expand Down
3 changes: 1 addition & 2 deletions pkg/storage/stores/series/series_index_gateway_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/stores/index"
"github.com/grafana/loki/pkg/storage/stores/index/stats"
)

Expand All @@ -22,7 +21,7 @@ type IndexGatewayClientStore struct {
logger log.Logger
}

func NewIndexGatewayClientStore(client logproto.IndexGatewayClient, logger log.Logger) index.ReaderWriter {
func NewIndexGatewayClientStore(client logproto.IndexGatewayClient, logger log.Logger) *IndexGatewayClientStore {
return &IndexGatewayClientStore{
client: client,
logger: logger,
Expand Down
48 changes: 24 additions & 24 deletions pkg/storage/stores/series/series_index_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/grafana/loki/pkg/storage/config"
storageerrors "github.com/grafana/loki/pkg/storage/errors"
"github.com/grafana/loki/pkg/storage/stores"
"github.com/grafana/loki/pkg/storage/stores/index"
"github.com/grafana/loki/pkg/storage/stores/index/stats"
series_index "github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/util"
Expand Down Expand Up @@ -63,7 +62,8 @@ var (
})
)

type indexReaderWriter struct {
// IndexReaderWriter implements pkg/storage/stores/index.ReaderWriter
type IndexReaderWriter struct {
schema series_index.SeriesStoreSchema
index series_index.Client
schemaCfg config.SchemaConfig
Expand All @@ -74,8 +74,8 @@ type indexReaderWriter struct {
}

func NewIndexReaderWriter(schemaCfg config.SchemaConfig, schema series_index.SeriesStoreSchema, index series_index.Client,
fetcher *fetcher.Fetcher, chunkBatchSize int, writeDedupeCache cache.Cache) index.ReaderWriter {
return &indexReaderWriter{
fetcher *fetcher.Fetcher, chunkBatchSize int, writeDedupeCache cache.Cache) *IndexReaderWriter {
return &IndexReaderWriter{
schema: schema,
index: index,
schemaCfg: schemaCfg,
Expand All @@ -85,7 +85,7 @@ func NewIndexReaderWriter(schemaCfg config.SchemaConfig, schema series_index.Ser
}
}

func (c *indexReaderWriter) IndexChunk(ctx context.Context, from, through model.Time, chk chunk.Chunk) error {
func (c *IndexReaderWriter) IndexChunk(ctx context.Context, from, through model.Time, chk chunk.Chunk) error {
writeReqs, keysToCache, err := c.calculateIndexEntries(ctx, from, through, chk)
if err != nil {
return err
Expand All @@ -104,7 +104,7 @@ func (c *indexReaderWriter) IndexChunk(ctx context.Context, from, through model.
}

// calculateIndexEntries creates a set of batched WriteRequests for all the chunks it is given.
func (c *indexReaderWriter) calculateIndexEntries(ctx context.Context, from, through model.Time, chunk chunk.Chunk) (series_index.WriteBatch, []string, error) {
func (c *IndexReaderWriter) calculateIndexEntries(ctx context.Context, from, through model.Time, chunk chunk.Chunk) (series_index.WriteBatch, []string, error) {
seenIndexEntries := map[string]struct{}{}
entries := []series_index.Entry{}

Expand Down Expand Up @@ -149,7 +149,7 @@ func (c *indexReaderWriter) calculateIndexEntries(ctx context.Context, from, thr
return result, missing, nil
}

func (c *indexReaderWriter) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, allMatchers ...*labels.Matcher) ([]logproto.ChunkRef, error) {
func (c *IndexReaderWriter) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, allMatchers ...*labels.Matcher) ([]logproto.ChunkRef, error) {
log := util_log.WithContext(ctx, util_log.Logger)
// Check there is a metric name matcher of type equal,
metricNameMatcher, matchers, ok := extract.MetricNameMatcherFromMatchers(allMatchers)
Expand Down Expand Up @@ -192,7 +192,7 @@ func (c *indexReaderWriter) GetChunkRefs(ctx context.Context, userID string, fro
return chunks, nil
}

func (c *indexReaderWriter) SetChunkFilterer(f chunk.RequestChunkFilterer) {
func (c *IndexReaderWriter) SetChunkFilterer(f chunk.RequestChunkFilterer) {
c.chunkFilterer = f
}

Expand All @@ -209,7 +209,7 @@ func (c chunkGroup) Less(i, j int) bool {
return c.schema.ExternalKey(c.chunks[i].ChunkRef) < c.schema.ExternalKey(c.chunks[j].ChunkRef)
}

func (c *indexReaderWriter) GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) {
func (c *IndexReaderWriter) GetSeries(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]labels.Labels, error) {
chks, err := c.GetChunkRefs(ctx, userID, from, through, matchers...)
if err != nil {
return nil, err
Expand All @@ -218,7 +218,7 @@ func (c *indexReaderWriter) GetSeries(ctx context.Context, userID string, from,
return c.chunksToSeries(ctx, chks, matchers)
}

func (c *indexReaderWriter) chunksToSeries(ctx context.Context, in []logproto.ChunkRef, matchers []*labels.Matcher) ([]labels.Labels, error) {
func (c *IndexReaderWriter) chunksToSeries(ctx context.Context, in []logproto.ChunkRef, matchers []*labels.Matcher) ([]labels.Labels, error) {
// download one per series and merge
// group chunks by series
chunksBySeries := filterChunkRefsByUniqueFingerprint(in)
Expand Down Expand Up @@ -313,7 +313,7 @@ func (c *indexReaderWriter) chunksToSeries(ctx context.Context, in []logproto.Ch
}

// LabelNamesForMetricName retrieves all label names for a metric name.
func (c *indexReaderWriter) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
func (c *IndexReaderWriter) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.LabelNamesForMetricName")
defer sp.Finish()
log := spanlogger.FromContext(ctx)
Expand Down Expand Up @@ -341,7 +341,7 @@ func (c *indexReaderWriter) LabelNamesForMetricName(ctx context.Context, userID
return labelNames, nil
}

func (c *indexReaderWriter) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) {
func (c *IndexReaderWriter) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.LabelValuesForMetricName")
defer sp.Finish()
log := spanlogger.FromContext(ctx)
Expand Down Expand Up @@ -377,7 +377,7 @@ func (c *indexReaderWriter) LabelValuesForMetricName(ctx context.Context, userID
}

// LabelValuesForMetricName retrieves all label values for a single label name and metric name.
func (c *indexReaderWriter) labelValuesForMetricNameWithMatchers(ctx context.Context, userID string, from, through model.Time, metricName, labelName string, matchers ...*labels.Matcher) ([]string, error) {
func (c *IndexReaderWriter) labelValuesForMetricNameWithMatchers(ctx context.Context, userID string, from, through model.Time, metricName, labelName string, matchers ...*labels.Matcher) ([]string, error) {
// Otherwise get series which include other matchers
seriesIDs, err := c.lookupSeriesByMetricNameMatchers(ctx, from, through, userID, metricName, matchers)
if err != nil {
Expand Down Expand Up @@ -419,7 +419,7 @@ func (c *indexReaderWriter) labelValuesForMetricNameWithMatchers(ctx context.Con
return result.Strings(), nil
}

func (c *indexReaderWriter) lookupSeriesByMetricNameMatchers(ctx context.Context, from, through model.Time, userID, metricName string, matchers []*labels.Matcher) ([]string, error) {
func (c *IndexReaderWriter) lookupSeriesByMetricNameMatchers(ctx context.Context, from, through model.Time, userID, metricName string, matchers []*labels.Matcher) ([]string, error) {
// Check if one of the labels is a shard annotation, pass that information to lookupSeriesByMetricNameMatcher,
// and remove the label.
shard, shardLabelIndex, err := astmapper.ShardFromMatchers(matchers)
Expand Down Expand Up @@ -502,13 +502,13 @@ func (c *indexReaderWriter) lookupSeriesByMetricNameMatchers(ctx context.Context
return ids, nil
}

func (c *indexReaderWriter) lookupSeriesByMetricNameMatcher(ctx context.Context, from, through model.Time, userID, metricName string, matcher *labels.Matcher, shard *astmapper.ShardAnnotation) ([]string, error) {
func (c *IndexReaderWriter) lookupSeriesByMetricNameMatcher(ctx context.Context, from, through model.Time, userID, metricName string, matcher *labels.Matcher, shard *astmapper.ShardAnnotation) ([]string, error) {
return c.lookupIdsByMetricNameMatcher(ctx, from, through, userID, metricName, matcher, func(queries []series_index.Query) []series_index.Query {
return c.schema.FilterReadQueries(queries, shard)
})
}

func (c *indexReaderWriter) lookupIdsByMetricNameMatcher(ctx context.Context, from, through model.Time, userID, metricName string, matcher *labels.Matcher, filter func([]series_index.Query) []series_index.Query) ([]string, error) {
func (c *IndexReaderWriter) lookupIdsByMetricNameMatcher(ctx context.Context, from, through model.Time, userID, metricName string, matcher *labels.Matcher, filter func([]series_index.Query) []series_index.Query) ([]string, error) {
var err error
var queries []series_index.Query
var labelName string
Expand Down Expand Up @@ -600,7 +600,7 @@ var entriesPool = sync.Pool{
},
}

func (c *indexReaderWriter) lookupEntriesByQueries(ctx context.Context, queries []series_index.Query, entries *[]series_index.Entry) error {
func (c *IndexReaderWriter) lookupEntriesByQueries(ctx context.Context, queries []series_index.Query, entries *[]series_index.Entry) error {
*entries = (*entries)[:0]
// Nothing to do if there are no queries.
if len(queries) == 0 {
Expand Down Expand Up @@ -628,7 +628,7 @@ func (c *indexReaderWriter) lookupEntriesByQueries(ctx context.Context, queries
return err
}

func (c *indexReaderWriter) lookupLabelNamesBySeries(ctx context.Context, from, through model.Time, userID string, seriesIDs []string) ([]string, error) {
func (c *IndexReaderWriter) lookupLabelNamesBySeries(ctx context.Context, from, through model.Time, userID string, seriesIDs []string) ([]string, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.lookupLabelNamesBySeries")
defer sp.Finish()
log := spanlogger.FromContext(ctx)
Expand Down Expand Up @@ -665,7 +665,7 @@ func (c *indexReaderWriter) lookupLabelNamesBySeries(ctx context.Context, from,
return result.Strings(), nil
}

func (c *indexReaderWriter) lookupLabelNamesByChunks(ctx context.Context, from, through model.Time, userID string, seriesIDs []string) ([]string, error) {
func (c *IndexReaderWriter) lookupLabelNamesByChunks(ctx context.Context, from, through model.Time, userID string, seriesIDs []string) ([]string, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.lookupLabelNamesByChunks")
defer sp.Finish()
log := spanlogger.FromContext(ctx)
Expand Down Expand Up @@ -701,7 +701,7 @@ func (c *indexReaderWriter) lookupLabelNamesByChunks(ctx context.Context, from,
return labelNamesFromChunks(allChunks), nil
}

func (c *indexReaderWriter) lookupChunksBySeries(ctx context.Context, from, through model.Time, userID string, seriesIDs []string) ([]string, error) {
func (c *IndexReaderWriter) lookupChunksBySeries(ctx context.Context, from, through model.Time, userID string, seriesIDs []string) ([]string, error) {
queries := make([]series_index.Query, 0, len(seriesIDs))
for _, seriesID := range seriesIDs {
qs, err := c.schema.GetChunksForSeries(from, through, userID, []byte(seriesID))
Expand All @@ -722,7 +722,7 @@ func (c *indexReaderWriter) lookupChunksBySeries(ctx context.Context, from, thro
return result, err
}

func (c *indexReaderWriter) convertChunkIDsToChunks(_ context.Context, userID string, chunkIDs []string) ([]chunk.Chunk, error) {
func (c *IndexReaderWriter) convertChunkIDsToChunks(_ context.Context, userID string, chunkIDs []string) ([]chunk.Chunk, error) {
chunkSet := make([]chunk.Chunk, 0, len(chunkIDs))
for _, chunkID := range chunkIDs {
chunk, err := chunk.ParseExternalKey(userID, chunkID)
Expand All @@ -735,7 +735,7 @@ func (c *indexReaderWriter) convertChunkIDsToChunks(_ context.Context, userID st
return chunkSet, nil
}

func (c *indexReaderWriter) convertChunkIDsToChunkRefs(_ context.Context, userID string, chunkIDs []string) ([]logproto.ChunkRef, error) {
func (c *IndexReaderWriter) convertChunkIDsToChunkRefs(_ context.Context, userID string, chunkIDs []string) ([]logproto.ChunkRef, error) {
chunkSet := make([]logproto.ChunkRef, 0, len(chunkIDs))
for _, chunkID := range chunkIDs {
chunk, err := chunk.ParseExternalKey(userID, chunkID)
Expand All @@ -749,11 +749,11 @@ func (c *indexReaderWriter) convertChunkIDsToChunkRefs(_ context.Context, userID
}

// old index stores do not implement stats -- skip
func (c *indexReaderWriter) Stats(_ context.Context, _ string, _, _ model.Time, _ ...*labels.Matcher) (*stats.Stats, error) {
func (c *IndexReaderWriter) Stats(_ context.Context, _ string, _, _ model.Time, _ ...*labels.Matcher) (*stats.Stats, error) {
return nil, nil
}

// old index stores do not implement label volume -- skip
func (c *indexReaderWriter) Volume(_ context.Context, _ string, _, _ model.Time, _ int32, _ []string, _ string, _ ...*labels.Matcher) (*logproto.VolumeResponse, error) {
func (c *IndexReaderWriter) Volume(_ context.Context, _ string, _, _ model.Time, _ int32, _ []string, _ string, _ ...*labels.Matcher) (*logproto.VolumeResponse, error) {
return nil, nil
}

0 comments on commit 9ca7323

Please sign in to comment.