Skip to content

Commit

Permalink
store: add fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS committed May 31, 2021
1 parent 5a72a8a commit 90fa8c5
Showing 1 changed file with 91 additions and 77 deletions.
168 changes: 91 additions & 77 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/strutil"
Expand Down Expand Up @@ -63,22 +64,20 @@ type ProxyStore struct {
responseTimeout time.Duration
metrics *proxyStoreMetrics

// Request -> add yourself to list of listeners that are listening on that store+request.
// Request -> add yourself to list of listeners that are listening on those stores+request.
// At the end, send the same data to each worker.
// Delete the request from the map at the end!
requestListeners map[string]*requestListenerVal
requestListenerMtx *sync.Mutex
}

type requestListenerVal struct {
listeners []chan *storepb.SeriesResponse
listenerCnt int
sentFirstResponse bool
listeners []chan *storepb.SeriesResponse
}

type proxyStoreMetrics struct {
emptyStreamResponses prometheus.Counter
deduplicatedStreamRequests prometheus.Counter
emptyStreamResponses prometheus.Counter
coalescedSeriesRequests prometheus.Counter
}

func newProxyStoreMetrics(reg prometheus.Registerer) *proxyStoreMetrics {
Expand All @@ -89,9 +88,9 @@ func newProxyStoreMetrics(reg prometheus.Registerer) *proxyStoreMetrics {
Help: "Total number of empty responses received.",
})

m.deduplicatedStreamRequests = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_proxy_store_deduplicated_stream_requests_total",
Help: "How many requests we've avoided sending due to deduplication.",
m.coalescedSeriesRequests = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_proxy_store_coalesced_series_requests_total",
Help: "How many Series() requests we've avoided sending due to coalescing.",
})

return &m
Expand Down Expand Up @@ -207,44 +206,87 @@ func (s cancelableRespSender) send(r *storepb.SeriesResponse) {
}

type broadcastingSeriesServer struct {
cacheKey string
s *ProxyStore
srv storepb.Store_SeriesServer
setFirstResponse *sync.Once
ctx context.Context

cacheKey string
s *ProxyStore
srv storepb.Store_SeriesServer
resps []*storepb.SeriesResponse
}

// Send is like a regular Send() but it fans out those responses to multiple channels.
func (b *broadcastingSeriesServer) Send(resp *storepb.SeriesResponse) error {
b.s.requestListenerMtx.Lock()
defer b.s.requestListenerMtx.Unlock()

b.setFirstResponse.Do(func() {
b.s.requestListeners[b.cacheKey].sentFirstResponse = true
})

for _, listener := range b.s.requestListeners[b.cacheKey].listeners {
select {
case listener <- resp:
case <-b.Context().Done():
for _, l := range b.s.requestListeners[b.cacheKey].listeners {
close(l)
}
return b.Context().Err()
}
}
b.resps = append(b.resps, resp)
return nil
}

func (b *broadcastingSeriesServer) Context() context.Context {
return b.srv.Context()
return b.ctx
}

// copySeriesResponse makes a copy of the given SeriesResponse if it is a Series.
// If not then the original response is returned.
func copySeriesResponse(r *storepb.SeriesResponse) *storepb.SeriesResponse {
originalSeries := r.GetSeries()
if originalSeries == nil {
return r
}
resp := &storepb.SeriesResponse{}

newLabels := labels.Labels{}
for _, lbl := range originalSeries.Labels {
newLabels = append(newLabels, labels.Label{
Name: fmt.Sprintf("%s", lbl.Name),
Value: fmt.Sprintf("%s", lbl.Value),
})
}

chunks := make([]storepb.AggrChunk, len(originalSeries.Chunks))
copy(chunks, originalSeries.Chunks)

resp.Result = &storepb.SeriesResponse_Series{
Series: &storepb.Series{
Labels: labelpb.ZLabelsFromPromLabels(newLabels),
Chunks: chunks,
},
}

return resp
}

func (b *broadcastingSeriesServer) Close() {
func (b *broadcastingSeriesServer) Close() error {
b.s.requestListenerMtx.Lock()
defer b.s.requestListenerMtx.Unlock()
for _, l := range b.s.requestListeners[b.cacheKey].listeners {
defer func() {
delete(b.s.requestListeners, b.cacheKey)
}()

for li, l := range b.s.requestListeners[b.cacheKey].listeners {
for _, resp := range b.resps {
// Make a copy here if it is sent to other listeners.
// This is because _a lot_ of upper-level code assumes
// that they have sole ownership of the series.
// For example, the replica labels might be different from query to query
// and deduplication happens in the upper layer.
// TODO(GiedriusS): remove this assumption.
if li > 0 {
resp = copySeriesResponse(resp)
}

select {
case l <- resp:
case <-b.srv.Context().Done():
err := b.srv.Context().Err()
for _, lc := range b.s.requestListeners[b.cacheKey].listeners {
lc <- storepb.NewWarnSeriesResponse(err)
close(lc)
}
return b.srv.Context().Err()
}
}
close(l)
}
return nil
}

func (b *broadcastingSeriesServer) RecvMsg(m interface{}) error { return b.srv.RecvMsg(m) }
Expand All @@ -261,53 +303,36 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
cacheKey string
shouldSendQuery bool
dataIn chan *storepb.SeriesResponse = make(chan *storepb.SeriesResponse)
cacheKeyID int
)
stores := s.stores()
for _, st := range stores {
cacheKey += st.String()
}
cacheKey += r.String()

// If already sending response back -> create a new cache key.
g, gctx := errgroup.WithContext(srv.Context())

s.requestListenerMtx.Lock()
finalCacheKey := fmt.Sprintf("%s-%d", cacheKey, cacheKeyID)
if s.requestListeners[finalCacheKey] == nil {
s.requestListeners[finalCacheKey] = &requestListenerVal{}
if s.requestListeners[cacheKey] == nil {
s.requestListeners[cacheKey] = &requestListenerVal{}
}
if s.requestListeners[finalCacheKey].sentFirstResponse {
for {
cacheKeyID++
finalCacheKey := fmt.Sprintf("%s-%d", cacheKey, cacheKeyID)
if s.requestListeners[finalCacheKey] == nil {
s.requestListeners[finalCacheKey] = &requestListenerVal{}
break
}
if !s.requestListeners[finalCacheKey].sentFirstResponse {
break
}
}
}
shouldSendQuery = s.requestListeners[finalCacheKey].listenerCnt == 0
s.requestListeners[finalCacheKey].listenerCnt++
s.requestListeners[finalCacheKey].listeners = append(s.requestListeners[finalCacheKey].listeners, dataIn)
shouldSendQuery = len(s.requestListeners[cacheKey].listeners) == 0
s.requestListeners[cacheKey].listeners = append(s.requestListeners[cacheKey].listeners, dataIn)
s.requestListenerMtx.Unlock()

g, _ := errgroup.WithContext(srv.Context())

bss := &broadcastingSeriesServer{
finalCacheKey,
s,
srv,
&sync.Once{},
}

if shouldSendQuery {
bss := &broadcastingSeriesServer{
gctx,
cacheKey,
s,
srv,
[]*storepb.SeriesResponse{},
}
g.Go(func() error {
return s.realSeries(stores, r, bss)
})
} else {
s.metrics.deduplicatedStreamRequests.Inc()
s.metrics.coalescedSeriesRequests.Inc()
}

g.Go(func() error {
Expand All @@ -319,24 +344,13 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
return nil
})

if err := g.Wait(); err != nil {
return err
}

s.requestListenerMtx.Lock()
s.requestListeners[finalCacheKey].listenerCnt--
if s.requestListeners[finalCacheKey].listenerCnt == 0 {
delete(s.requestListeners, finalCacheKey)
}
s.requestListenerMtx.Unlock()

return nil
return g.Wait()
}

// realSeries returns all series for a requested time range and label matcher. Requested series are taken from other
// stores and proxied to RPC client. NOTE: Resulted data are not trimmed exactly to min and max time range.
func (s *ProxyStore) realSeries(stores []Client, r *storepb.SeriesRequest, srv *broadcastingSeriesServer) error {
defer srv.Close()
defer runutil.CloseWithLogOnErr(s.logger, srv, "closing broadcastingSeriesServer")
// TODO(bwplotka): This should be part of request logger, otherwise it does not make much sense. Also, could be
// tiggered by tracing span to reduce cognitive load.
reqLogger := log.With(s.logger, "component", "proxy", "request", r.String())
Expand Down

0 comments on commit 90fa8c5

Please sign in to comment.