Skip to content

Commit

Permalink
user pkg drain for draining in queue proxy (#12033)
Browse files Browse the repository at this point in the history
* user pkg drain for draining in queue proxy

* use different handler for healthcheck
  • Loading branch information
nader-ziada committed Nov 5, 2021
1 parent cad72a3 commit 525a4cb
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 397 deletions.
48 changes: 22 additions & 26 deletions cmd/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,15 @@ func main() {
probe = buildProbe(logger, env.ServingReadinessProbe, env.EnableHTTP2AutoDetection).ProbeContainer
}

healthState := health.NewState()
mainServer := buildServer(ctx, env, healthState, probe, stats, logger)
drainer := &pkghandler.Drainer{
QuietPeriod: drainSleepDuration,
// Add Activator probe header to the drainer so it can handle probes directly from activator
HealthCheckUAPrefixes: []string{network.ActivatorUserAgent},
}
mainServer := buildServer(ctx, env, drainer, probe, stats, logger)
servers := map[string]*http.Server{
"main": mainServer,
"admin": buildAdminServer(logger, healthState),
"admin": buildAdminServer(logger, drainer),
"metrics": buildMetricsServer(promStatReporter, protoStatReporter),
}
if env.EnableProfiling {
Expand Down Expand Up @@ -225,19 +229,11 @@ func main() {
os.Exit(1)
case <-ctx.Done():
logger.Info("Received TERM signal, attempting to gracefully shutdown servers.")
healthState.Shutdown(func() {
logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration)
time.Sleep(drainSleepDuration)

// Calling server.Shutdown() allows pending requests to
// complete, while no new work is accepted.
logger.Info("Shutting down main server")
if err := mainServer.Shutdown(context.Background()); err != nil {
logger.Errorw("Failed to shutdown proxy server", zap.Error(err))
}
// Removing the main server from the shutdown logic as we've already shut it down.
delete(servers, "main")
})
logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration)
drainer.Drain()

// Removing the main server from the shutdown logic as we've already shut it down.
delete(servers, "main")

for serverName, srv := range servers {
logger.Info("Shutting down server: ", serverName)
Expand All @@ -260,7 +256,7 @@ func buildProbe(logger *zap.SugaredLogger, encodedProbe string, autodetectHTTP2
return readiness.NewProbe(coreProbe)
}

func buildServer(ctx context.Context, env config, healthState *health.State, probeContainer func() bool, stats *network.RequestStats,
func buildServer(ctx context.Context, env config, drainer *pkghandler.Drainer, probeContainer func() bool, stats *network.RequestStats,
logger *zap.SugaredLogger) *http.Server {

maxIdleConns := 1000 // TODO: somewhat arbitrary value for CC=0, needs experimental validation.
Expand Down Expand Up @@ -316,15 +312,16 @@ func buildServer(ctx context.Context, env config, healthState *health.State, pro
composedHandler = tracing.HTTPSpanMiddleware(composedHandler)
}

composedHandler = health.ProbeHandler(healthState, probeContainer, tracingEnabled, composedHandler)
composedHandler = network.NewProbeHandler(composedHandler)
// We might sometimes want to capture the probes/healthchecks in the request
// logs. Hence we need to have RequestLogHandler to be the first one.
var healthcheckHandler http.Handler
healthcheckHandler = health.ProbeHandler(probeContainer, tracingEnabled, healthcheckHandler)
if env.ServingEnableRequestLog {
composedHandler = requestLogHandler(logger, composedHandler, env)
healthcheckHandler = requestLogHandler(logger, healthcheckHandler, env)
}

return pkgnet.NewServer(":"+env.QueueServingPort, composedHandler)
drainer.Inner = composedHandler
drainer.HealthCheck = healthcheckHandler.ServeHTTP

return pkgnet.NewServer(":"+env.QueueServingPort, drainer)
}

func buildTransport(env config, logger *zap.SugaredLogger, maxConns int) http.RoundTripper {
Expand Down Expand Up @@ -380,12 +377,11 @@ func supportsMetrics(ctx context.Context, logger *zap.SugaredLogger, env config)
return true
}

func buildAdminServer(logger *zap.SugaredLogger, healthState *health.State) *http.Server {
func buildAdminServer(logger *zap.SugaredLogger, drainer *pkghandler.Drainer) *http.Server {
adminMux := http.NewServeMux()
drainHandler := healthState.DrainHandlerFunc()
adminMux.HandleFunc(queue.RequestQueueDrainPath, func(w http.ResponseWriter, r *http.Request) {
logger.Info("Attached drain handler from user-container")
drainHandler(w, r)
drainer.Drain()
})

return &http.Server{
Expand Down
3 changes: 1 addition & 2 deletions cmd/queue/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ func TestQueueTraceSpans(t *testing.T) {
enableTrace: false,
}}

healthState := &health.State{}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
// Create tracer with reporter recorder
Expand Down Expand Up @@ -149,7 +148,7 @@ func TestQueueTraceSpans(t *testing.T) {
h := queue.ProxyHandler(breaker, network.NewRequestStats(time.Now()), true /*tracingEnabled*/, proxy)
h(writer, req)
} else {
h := health.ProbeHandler(healthState, tc.prober, true /*tracingEnabled*/, nil)
h := health.ProbeHandler(tc.prober, true /*tracingEnabled*/, nil)
req.Header.Set(network.ProbeHeaderName, tc.requestHeader)
h(writer, req)
}
Expand Down
19 changes: 10 additions & 9 deletions pkg/queue/health/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package health

import (
"io"
"net/http"

"go.opencensus.io/trace"
Expand All @@ -29,7 +30,7 @@ const badProbeTemplate = "unexpected probe header value: "

// ProbeHandler returns a http.HandlerFunc that responds to health checks if the
// knative network probe header is passed, and otherwise delegates to the next handler.
func ProbeHandler(healthState *State, prober func() bool, tracingEnabled bool, next http.Handler) http.HandlerFunc {
func ProbeHandler(prober func() bool, tracingEnabled bool, next http.Handler) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ph := network.KnativeProbeHeader(r)

Expand Down Expand Up @@ -58,13 +59,13 @@ func ProbeHandler(healthState *State, prober func() bool, tracingEnabled bool, n
return
}

healthState.HandleHealthProbe(func() bool {
if !prober() {
probeSpan.Annotate([]trace.Attribute{
trace.StringAttribute("queueproxy.probe.error", "container not ready")}, "error")
return false
}
return true
}, w)
if !prober() {
probeSpan.Annotate([]trace.Attribute{
trace.StringAttribute("queueproxy.probe.error", "container not ready")}, "error")
w.WriteHeader(http.StatusServiceUnavailable)
return
}

io.WriteString(w, queue.Name)
}
}
3 changes: 1 addition & 2 deletions pkg/queue/health/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func TestProbeHandler(t *testing.T) {
wantCode: http.StatusOK,
}}

healthState := &State{}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
writer := httptest.NewRecorder()
Expand All @@ -80,7 +79,7 @@ func TestProbeHandler(t *testing.T) {
req.Header.Set(network.ProbeHeaderName, tc.requestHeader)
}

h := ProbeHandler(healthState, tc.prober, true /*tracingEnabled*/, incHandler)
h := ProbeHandler(tc.prober, true /*tracingEnabled*/, incHandler)
h(writer, req)

if got, want := writer.Code, tc.wantCode; got != want {
Expand Down
135 changes: 0 additions & 135 deletions pkg/queue/health/health_state.go

This file was deleted.

Loading

0 comments on commit 525a4cb

Please sign in to comment.