Skip to content

Commit

Permalink
fixup and use query stats
Browse files Browse the repository at this point in the history
Signed-off-by: milesbryant <miles@milesbxf.net>
  • Loading branch information
milesbxf committed Oct 18, 2024
1 parent d1b0795 commit d59e491
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 71 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/query_frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func runQueryFrontend(
return err
}

roundTripper, err := cortexfrontend.NewDownstreamRoundTripper(cfg.DownstreamURL, downstreamTripper)
roundTripper, err := cortexfrontend.NewDownstreamRoundTripper(cfg.DownstreamURL, downstreamTripper, cfg.CortexHandlerConfig.QueryStatsEnabled)
if err != nil {
return errors.Wrap(err, "setup downstream roundtripper")
}
Expand Down
15 changes: 11 additions & 4 deletions internal/cortex/frontend/downstream_roundtripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@ import (

// RoundTripper that forwards requests to downstream URL.
type downstreamRoundTripper struct {
downstreamURL *url.URL
transport http.RoundTripper
downstreamURL *url.URL
transport http.RoundTripper
queryStatsEnabled bool
}

func NewDownstreamRoundTripper(downstreamURL string, transport http.RoundTripper) (http.RoundTripper, error) {
func NewDownstreamRoundTripper(downstreamURL string, transport http.RoundTripper, queryStatsEnabled bool) (http.RoundTripper, error) {
u, err := url.Parse(downstreamURL)
if err != nil {
return nil, err
}

return &downstreamRoundTripper{downstreamURL: u, transport: transport}, nil
return &downstreamRoundTripper{downstreamURL: u, transport: transport, queryStatsEnabled: queryStatsEnabled}, nil
}

func (d downstreamRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
Expand All @@ -36,6 +37,12 @@ func (d downstreamRoundTripper) RoundTrip(r *http.Request) (*http.Response, erro
}
}

if d.queryStatsEnabled {
q := r.URL.Query()
q.Set("stats", "true")
r.URL.RawQuery = q.Encode()
}

r.URL.Scheme = d.downstreamURL.Scheme
r.URL.Host = d.downstreamURL.Host
r.URL.Path = path.Join(d.downstreamURL.Path, r.URL.Path)
Expand Down
125 changes: 59 additions & 66 deletions internal/cortex/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,23 @@ package transport
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/prometheus/prometheus/util/stats"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"syscall"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/httpgrpc/server"

querier_stats "github.com/thanos-io/thanos/internal/cortex/querier/stats"
"github.com/thanos-io/thanos/internal/cortex/tenant"
"github.com/thanos-io/thanos/internal/cortex/util"
util_log "github.com/thanos-io/thanos/internal/cortex/util/log"
Expand Down Expand Up @@ -70,48 +69,43 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
roundTripper: roundTripper,
}

if cfg.QueryStatsEnabled {
h.querySeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_seconds_total",
Help: "Total amount of wall clock time spend processing queries.",
}, []string{"user"})

h.querySeries = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_fetched_series_total",
Help: "Number of series fetched to execute a query.",
}, []string{"user"})

h.queryBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_fetched_chunks_bytes_total",
Help: "Size of all chunks fetched to execute a query in bytes.",
}, []string{"user"})

h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) {
h.querySeconds.DeleteLabelValues(user)
h.querySeries.DeleteLabelValues(user)
h.queryBytes.DeleteLabelValues(user)
})
// If cleaner stops or fail, we will simply not clean the metrics for inactive users.
_ = h.activeUsers.StartAsync(context.Background())
}
//if cfg.QueryStatsEnabled {
// h.querySeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
// Name: "cortex_query_seconds_total",
// Help: "Total amount of wall clock time spend processing queries.",
// }, []string{"user"})
//
// h.querySeries = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
// Name: "cortex_query_fetched_series_total",
// Help: "Number of series fetched to execute a query.",
// }, []string{"user"})
//
// h.queryBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
// Name: "cortex_query_fetched_chunks_bytes_total",
// Help: "Size of all chunks fetched to execute a query in bytes.",
// }, []string{"user"})
//
// h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) {
// h.querySeconds.DeleteLabelValues(user)
// h.querySeries.DeleteLabelValues(user)
// h.queryBytes.DeleteLabelValues(user)
// })
// // If cleaner stops or fail, we will simply not clean the metrics for inactive users.
// _ = h.activeUsers.StartAsync(context.Background())
//}

return h
}

type ResponseWithStats struct {
Stats *stats.BuiltinStats `json:"stats"`
}

func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var (
stats *querier_stats.Stats
queryString url.Values
)

// Initialise the stats in the context and make sure it's propagated
// down the request chain.
if f.cfg.QueryStatsEnabled {
var ctx context.Context
stats, ctx = querier_stats.ContextWithEmptyStats(r.Context())
r = r.WithContext(ctx)
}

defer func() {
_ = r.Body.Close()
}()
Expand All @@ -136,7 +130,16 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

if f.cfg.QueryStatsEnabled {
writeServiceTimingHeader(queryResponseTime, hs, stats)
var statsResponse ResponseWithStats
if err := json.Unmarshal(buf.Bytes(), &statsResponse); err == nil {
if statsResponse.Stats != nil {
f.reportQueryStats(r, queryString, queryResponseTime, statsResponse.Stats)
} else {
level.Warn(util_log.WithContext(r.Context(), f.log)).Log("msg", "error parsing query stats", "err", errors.New("stats are nil"))
}
} else {
level.Warn(util_log.WithContext(r.Context(), f.log)).Log("msg", "error parsing query stats", "err", err)
}
}

w.WriteHeader(resp.StatusCode)
Expand All @@ -155,9 +158,6 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if shouldReportSlowQuery {
f.reportSlowQuery(r, hs, queryString, queryResponseTime)
}
if f.cfg.QueryStatsEnabled {
f.reportQueryStats(r, queryString, queryResponseTime, stats)
}
}

// reportSlowQuery reports slow queries.
Expand Down Expand Up @@ -194,36 +194,43 @@ func (f *Handler) reportSlowQuery(r *http.Request, responseHeaders http.Header,
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
}

func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.Stats) {
func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats stats.QueryStats) {
tenantIDs, err := tenant.TenantIDs(r.Context())
if err != nil {
return
}
userID := tenant.JoinTenantIDs(tenantIDs)
wallTime := stats.LoadWallTime()
numSeries := stats.LoadFetchedSeries()
numBytes := stats.LoadFetchedChunkBytes()
remoteUser, _, _ := r.BasicAuth()

// Track stats.
f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds())
f.querySeries.WithLabelValues(userID).Add(float64(numSeries))
f.queryBytes.WithLabelValues(userID).Add(float64(numBytes))
f.activeUsers.UpdateUserTimestamp(userID, time.Now())

// Log stats.
logMessage := append([]interface{}{
fields := []interface{}{
"msg", "query stats",
"component", "query-frontend",
"method", r.Method,
"path", r.URL.Path,
"remote_user", remoteUser,
"remote_addr", r.RemoteAddr,
"response_time", queryResponseTime,
"query_wall_time_seconds", wallTime.Seconds(),
"fetched_series_count", numSeries,
"fetched_chunks_bytes", numBytes,
}, formatQueryString(queryString)...)
"query_timings_preparation_time", stats.Builtin().Timings.QueryPreparationTime,
"query_timings_eval_total_time", stats.Builtin().Timings.EvalTotalTime,
"query_timings_exec_total_time", stats.Builtin().Timings.ExecTotalTime,
"query_timings_exec_queue_time", stats.Builtin().Timings.ExecQueueTime,
"query_timings_inner_eval_time", stats.Builtin().Timings.InnerEvalTime,
"query_timings_result_sort_time", stats.Builtin().Timings.ResultSortTime,
}
logMessage := append(fields, formatQueryString(queryString)...)

if stats.Builtin().Samples != nil {
samples := stats.Builtin().Samples

logMessage = append(logMessage, []interface{}{
"total_queryable_samples", samples.TotalQueryableSamples,
"peak_samples", samples.PeakSamples,
})
}

level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
}
Expand Down Expand Up @@ -262,17 +269,3 @@ func writeError(w http.ResponseWriter, err error) {
}
server.WriteError(w, err)
}

func writeServiceTimingHeader(queryResponseTime time.Duration, headers http.Header, stats *querier_stats.Stats) {
if stats != nil {
parts := make([]string, 0)
parts = append(parts, statsValue("querier_wall_time", stats.LoadWallTime()))
parts = append(parts, statsValue("response_time", queryResponseTime))
headers.Set(ServiceTimingHeaderName, strings.Join(parts, ", "))
}
}

func statsValue(name string, d time.Duration) string {
durationInMs := strconv.FormatFloat(float64(d)/float64(time.Millisecond), 'f', -1, 64)
return name + ";dur=" + durationInMs
}

0 comments on commit d59e491

Please sign in to comment.