Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

user pkg drain for draining in queue proxy #12033

Merged
merged 2 commits into from
Nov 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 22 additions & 26 deletions cmd/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,15 @@ func main() {
probe = buildProbe(logger, env.ServingReadinessProbe, env.EnableHTTP2AutoDetection).ProbeContainer
}

healthState := health.NewState()
mainServer := buildServer(ctx, env, healthState, probe, stats, logger)
drainer := &pkghandler.Drainer{
QuietPeriod: drainSleepDuration,
julz marked this conversation as resolved.
Show resolved Hide resolved
// Add Activator probe header to the drainer so it can handle probes directly from activator
HealthCheckUAPrefixes: []string{network.ActivatorUserAgent},
}
mainServer := buildServer(ctx, env, drainer, probe, stats, logger)
servers := map[string]*http.Server{
"main": mainServer,
"admin": buildAdminServer(logger, healthState),
"admin": buildAdminServer(logger, drainer),
"metrics": buildMetricsServer(promStatReporter, protoStatReporter),
}
if env.EnableProfiling {
Expand Down Expand Up @@ -225,19 +229,11 @@ func main() {
os.Exit(1)
case <-ctx.Done():
logger.Info("Received TERM signal, attempting to gracefully shutdown servers.")
healthState.Shutdown(func() {
logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration)
time.Sleep(drainSleepDuration)

// Calling server.Shutdown() allows pending requests to
// complete, while no new work is accepted.
logger.Info("Shutting down main server")
if err := mainServer.Shutdown(context.Background()); err != nil {
logger.Errorw("Failed to shutdown proxy server", zap.Error(err))
}
// Removing the main server from the shutdown logic as we've already shut it down.
delete(servers, "main")
julz marked this conversation as resolved.
Show resolved Hide resolved
})
logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration)
drainer.Drain()

// Removing the main server from the shutdown logic as we've already shut it down.
delete(servers, "main")

for serverName, srv := range servers {
logger.Info("Shutting down server: ", serverName)
Expand All @@ -260,7 +256,7 @@ func buildProbe(logger *zap.SugaredLogger, encodedProbe string, autodetectHTTP2
return readiness.NewProbe(coreProbe)
}

func buildServer(ctx context.Context, env config, healthState *health.State, probeContainer func() bool, stats *network.RequestStats,
func buildServer(ctx context.Context, env config, drainer *pkghandler.Drainer, probeContainer func() bool, stats *network.RequestStats,
logger *zap.SugaredLogger) *http.Server {

maxIdleConns := 1000 // TODO: somewhat arbitrary value for CC=0, needs experimental validation.
Expand Down Expand Up @@ -316,15 +312,16 @@ func buildServer(ctx context.Context, env config, healthState *health.State, pro
composedHandler = tracing.HTTPSpanMiddleware(composedHandler)
}

composedHandler = health.ProbeHandler(healthState, probeContainer, tracingEnabled, composedHandler)
composedHandler = network.NewProbeHandler(composedHandler)
// We might sometimes want to capture the probes/healthchecks in the request
// logs. Hence we need to have RequestLogHandler to be the first one.
var healthcheckHandler http.Handler
healthcheckHandler = health.ProbeHandler(probeContainer, tracingEnabled, healthcheckHandler)
if env.ServingEnableRequestLog {
composedHandler = requestLogHandler(logger, composedHandler, env)
healthcheckHandler = requestLogHandler(logger, healthcheckHandler, env)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

composedHandler needs to be wrapped with the requestLogHandler here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*in addition to the healthcheckHandler

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its required in both? the RequestLogTest is passing without it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we should - otherwise normal requests to the user container aren't logged

}

return pkgnet.NewServer(":"+env.QueueServingPort, composedHandler)
drainer.Inner = composedHandler
drainer.HealthCheck = healthcheckHandler.ServeHTTP

return pkgnet.NewServer(":"+env.QueueServingPort, drainer)
}

func buildTransport(env config, logger *zap.SugaredLogger, maxConns int) http.RoundTripper {
Expand Down Expand Up @@ -380,12 +377,11 @@ func supportsMetrics(ctx context.Context, logger *zap.SugaredLogger, env config)
return true
}

func buildAdminServer(logger *zap.SugaredLogger, healthState *health.State) *http.Server {
func buildAdminServer(logger *zap.SugaredLogger, drainer *pkghandler.Drainer) *http.Server {
adminMux := http.NewServeMux()
drainHandler := healthState.DrainHandlerFunc()
adminMux.HandleFunc(queue.RequestQueueDrainPath, func(w http.ResponseWriter, r *http.Request) {
nader-ziada marked this conversation as resolved.
Show resolved Hide resolved
logger.Info("Attached drain handler from user-container")
drainHandler(w, r)
drainer.Drain()
})

return &http.Server{
Expand Down
3 changes: 1 addition & 2 deletions cmd/queue/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ func TestQueueTraceSpans(t *testing.T) {
enableTrace: false,
}}

healthState := &health.State{}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
// Create tracer with reporter recorder
Expand Down Expand Up @@ -149,7 +148,7 @@ func TestQueueTraceSpans(t *testing.T) {
h := queue.ProxyHandler(breaker, network.NewRequestStats(time.Now()), true /*tracingEnabled*/, proxy)
h(writer, req)
} else {
h := health.ProbeHandler(healthState, tc.prober, true /*tracingEnabled*/, nil)
h := health.ProbeHandler(tc.prober, true /*tracingEnabled*/, nil)
req.Header.Set(network.ProbeHeaderName, tc.requestHeader)
h(writer, req)
}
Expand Down
19 changes: 10 additions & 9 deletions pkg/queue/health/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package health

import (
"io"
"net/http"

"go.opencensus.io/trace"
Expand All @@ -29,7 +30,7 @@ const badProbeTemplate = "unexpected probe header value: "

// ProbeHandler returns a http.HandlerFunc that responds to health checks if the
// knative network probe header is passed, and otherwise delegates to the next handler.
func ProbeHandler(healthState *State, prober func() bool, tracingEnabled bool, next http.Handler) http.HandlerFunc {
func ProbeHandler(prober func() bool, tracingEnabled bool, next http.Handler) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ph := network.KnativeProbeHeader(r)

Expand Down Expand Up @@ -58,13 +59,13 @@ func ProbeHandler(healthState *State, prober func() bool, tracingEnabled bool, n
return
}

healthState.HandleHealthProbe(func() bool {
if !prober() {
probeSpan.Annotate([]trace.Attribute{
trace.StringAttribute("queueproxy.probe.error", "container not ready")}, "error")
return false
}
return true
}, w)
if !prober() {
probeSpan.Annotate([]trace.Attribute{
trace.StringAttribute("queueproxy.probe.error", "container not ready")}, "error")
w.WriteHeader(http.StatusServiceUnavailable)
return
}

io.WriteString(w, queue.Name)
}
}
3 changes: 1 addition & 2 deletions pkg/queue/health/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func TestProbeHandler(t *testing.T) {
wantCode: http.StatusOK,
}}

healthState := &State{}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
writer := httptest.NewRecorder()
Expand All @@ -80,7 +79,7 @@ func TestProbeHandler(t *testing.T) {
req.Header.Set(network.ProbeHeaderName, tc.requestHeader)
}

h := ProbeHandler(healthState, tc.prober, true /*tracingEnabled*/, incHandler)
h := ProbeHandler(tc.prober, true /*tracingEnabled*/, incHandler)
h(writer, req)

if got, want := writer.Code, tc.wantCode; got != want {
Expand Down
135 changes: 0 additions & 135 deletions pkg/queue/health/health_state.go

This file was deleted.

Loading