Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proxy: coalesce identical Series() requests #4290

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

## Unreleased

### Changed

- [#4290](https://github.com/thanos-io/thanos/pull/4290) proxy: coalesce multiple requests for the same data; greatly improves performance when opening a dashboard without query-frontend where there are a lot of different panels (queries) asking for the same data

## v0.23.0 - In Progress

### Added
Expand Down
258 changes: 248 additions & 10 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,30 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/tracing"
lru "github.com/hashicorp/golang-lru/simplelru"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/strutil"
"github.com/thanos-io/thanos/pkg/tracing"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

type ctxKey int

// Seems good enough. In the worst case, there are going to be more allocations.
const rlkLRUSize = 1_000_000

// StoreMatcherKey is the context key for the store's allow list.
const StoreMatcherKey = ctxKey(0)

Expand Down Expand Up @@ -61,10 +67,22 @@ type ProxyStore struct {

responseTimeout time.Duration
metrics *proxyStoreMetrics

// Request -> add yourself to list of listeners that are listening on those stores+request.
// At the end, send the same data to each worker.
// Delete the request from the map at the end!
requestListenersLRU *lru.LRU
requestListenersLock *sync.Mutex
}

type requestListenerVal struct {
listeners []chan *storepb.SeriesResponse
valLock *sync.Mutex
}

type proxyStoreMetrics struct {
emptyStreamResponses prometheus.Counter
emptyStreamResponses prometheus.Counter
coalescedSeriesRequests prometheus.Counter
}

func newProxyStoreMetrics(reg prometheus.Registerer) *proxyStoreMetrics {
Expand All @@ -75,6 +93,11 @@ func newProxyStoreMetrics(reg prometheus.Registerer) *proxyStoreMetrics {
Help: "Total number of empty responses received.",
})

m.coalescedSeriesRequests = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_proxy_store_coalesced_series_requests_total",
Help: "How many Series() requests we've avoided sending due to coalescing.",
})

return &m
}

Expand All @@ -99,13 +122,16 @@ func NewProxyStore(
}

metrics := newProxyStoreMetrics(reg)
l, _ := lru.NewLRU(rlkLRUSize, nil)
s := &ProxyStore{
logger: logger,
stores: stores,
component: component,
selectorLabels: selectorLabels,
responseTimeout: responseTimeout,
metrics: metrics,
logger: logger,
stores: stores,
component: component,
selectorLabels: selectorLabels,
responseTimeout: responseTimeout,
metrics: metrics,
requestListenersLRU: l,
requestListenersLock: &sync.Mutex{},
}
return s
}
Expand Down Expand Up @@ -185,9 +211,221 @@ func (s cancelableRespSender) send(r *storepb.SeriesResponse) {
}
}

// Series returns all series for a requested time range and label matcher. Requested series are taken from other
// stores and proxied to RPC client. NOTE: Resulted data are not trimmed exactly to min and max time range.
type broadcastingSeriesServer struct {
ctx context.Context

rlk *requestListenerVal
srv storepb.Store_SeriesServer
resps []*storepb.SeriesResponse
}

// Send is like a regular Send() but it fans out those responses to multiple channels.
func (b *broadcastingSeriesServer) Send(resp *storepb.SeriesResponse) error {
b.resps = append(b.resps, resp)
return nil
}

func (b *broadcastingSeriesServer) Context() context.Context {
return b.ctx
}

// copySeriesResponse makes a copy of the given SeriesResponse if it is a Series.
// If not then the original response is returned.
func copySeriesResponse(r *storepb.SeriesResponse) *storepb.SeriesResponse {
originalSeries := r.GetSeries()
if originalSeries == nil {
return r
}
resp := &storepb.SeriesResponse{}

newLabels := labels.Labels{}
for _, lbl := range originalSeries.Labels {
newLabels = append(newLabels, labels.Label{
Name: lbl.Name,
Value: lbl.Value,
})
}

series := &storepb.Series{
Labels: labelpb.ZLabelsFromPromLabels(newLabels),
}

if len(originalSeries.Chunks) > 0 {
chunks := make([]storepb.AggrChunk, len(originalSeries.Chunks))
copy(chunks, originalSeries.Chunks)
series.Chunks = chunks
}

resp.Result = &storepb.SeriesResponse_Series{
Series: series,
}

return resp
}

func (b *broadcastingSeriesServer) Close() error {
rlk := b.rlk

rlk.valLock.Lock()
defer func() {
rlk.listeners = rlk.listeners[:0]
rlk.valLock.Unlock()
}()

for li, l := range rlk.listeners {
for _, resp := range b.resps {
if li > 0 {
resp = copySeriesResponse(resp)
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
}
select {
case l <- resp:
case <-b.srv.Context().Done():
err := b.srv.Context().Err()
for _, lc := range rlk.listeners {
select {
case lc <- storepb.NewWarnSeriesResponse(err):
default:
}
close(lc)
}
return b.srv.Context().Err()
}
}
close(l)
}
return nil
}

func (b *broadcastingSeriesServer) RecvMsg(m interface{}) error { return b.srv.RecvMsg(m) }
func (b *broadcastingSeriesServer) SendMsg(m interface{}) error { return b.srv.SendMsg(m) }
func (b *broadcastingSeriesServer) SetHeader(m metadata.MD) error { return b.srv.SetHeader(m) }
func (b *broadcastingSeriesServer) SendHeader(m metadata.MD) error { return b.srv.SendHeader(m) }
func (b *broadcastingSeriesServer) SetTrailer(m metadata.MD) { b.srv.SetTrailer(m) }

// findMostMatchingKey generates a most fitting listener key. Must be called under
// a lock.
func findMostMatchingKey(stores []Client, r *storepb.SeriesRequest, listeners *lru.LRU) string {
var sb strings.Builder

const marker rune = 0xffff

for _, st := range stores {
fmt.Fprint(&sb, st.String())
}

fmt.Fprintf(&sb, "%d%d%v%v%v%v", r.MaxTime, r.MinTime, r.MaxResolutionWindow, r.PartialResponseStrategy, r.PartialResponseDisabled, r.Hints.String())

// For RAW data it doesn't matter what the aggregates are.
// TODO(GiedriusS): remove this once query push-down becomes a reality.
if r.MaxResolutionWindow != 0 {
fmt.Fprintf(&sb, "%v", r.Aggregates)
}

fmt.Fprintf(&sb, "%c", marker)

markers := 0
if len(r.Matchers) > 0 {
markers = len(r.Matchers) - 1
}
markerPositions := make([]int, markers)

for i, m := range r.Matchers {
if i > 0 {
markerPositions = append(markerPositions, sb.Len())
}
fmt.Fprintf(&sb, "%s%c%s%c", m.Name, marker, m.Value, marker)
}

_, ok := listeners.Get(sb.String())
// Easy path - direct match.
if ok {
return sb.String()
}

originalKey := sb.String()

for _, markerPos := range markerPositions {
currentKey := originalKey[:markerPos]
_, ok := listeners.Get(currentKey)
if ok {
return currentKey
}
}
return originalKey
}

// Memoized version of realSeries() - it doesn't perform any Series() call unless such a request
// isn't happening already. This helps a lot in cases when a dashboard gets opened with lots
// of different queries that use the same metrics.
func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
var (
shouldSendQuery bool
dataIn chan *storepb.SeriesResponse = make(chan *storepb.SeriesResponse)
ctx context.Context = srv.Context()
g *errgroup.Group
)
stores := s.stores()

s.requestListenersLock.Lock()
listenerKey := findMostMatchingKey(stores, r, s.requestListenersLRU)
val, ok := s.requestListenersLRU.Get(listenerKey)
if !ok {
val = &requestListenerVal{
valLock: &sync.Mutex{},
}
s.requestListenersLRU.Add(listenerKey, val)
}
s.requestListenersLock.Unlock()

rlk := val.(*requestListenerVal)

rlk.valLock.Lock()
shouldSendQuery = len(rlk.listeners) == 0
rlk.listeners = append(rlk.listeners, dataIn)
rlk.valLock.Unlock()

if shouldSendQuery {
g, ctx = errgroup.WithContext(ctx)

bss := &broadcastingSeriesServer{
ctx,
rlk,
srv,
[]*storepb.SeriesResponse{},
}
g.Go(func() error {
return s.realSeries(stores, r, bss)
})
} else {
s.metrics.coalescedSeriesRequests.Inc()
}

if shouldSendQuery {
g.Go(func() error {
for din := range dataIn {
if err := srv.Send(din); err != nil {
return errors.Wrap(err, "sending cached Series() response")
}
}
return nil
})

return g.Wait()
}

for din := range dataIn {
if err := srv.Send(din); err != nil {
return errors.Wrap(err, "sending cached Series() response")
}
}
return nil

}

// realSeries returns all series for a requested time range and label matcher. Requested series are taken from other
// stores and proxied to RPC client. NOTE: Resulted data are not trimmed exactly to min and max time range.
func (s *ProxyStore) realSeries(stores []Client, r *storepb.SeriesRequest, srv *broadcastingSeriesServer) error {
defer runutil.CloseWithLogOnErr(s.logger, srv, "closing broadcastingSeriesServer")
// TODO(bwplotka): This should be part of request logger, otherwise it does not make much sense. Also, could be
// tiggered by tracing span to reduce cognitive load.
reqLogger := log.With(s.logger, "component", "proxy", "request", r.String())
Expand Down Expand Up @@ -234,7 +472,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
close(respCh)
}()

for _, st := range s.stores() {
for _, st := range stores {
// We might be able to skip the store if its meta information indicates it cannot have series matching our query.
if ok, reason := storeMatches(gctx, st, r.MinTime, r.MaxTime, matchers...); !ok {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out: %v", st, reason))
Expand Down
Loading