From ec3d057401a5edd4832d38d7d5c01863e99e81f9 Mon Sep 17 00:00:00 2001 From: Nader Ziada Date: Wed, 22 Sep 2021 16:59:18 -0400 Subject: [PATCH] user pkg drain for draining in queue proxy --- cmd/queue/main.go | 40 +++-- cmd/queue/main_test.go | 3 +- pkg/queue/health/handler.go | 19 +-- pkg/queue/health/handler_test.go | 3 +- pkg/queue/health/health_state.go | 135 ---------------- pkg/queue/health/health_state_test.go | 223 -------------------------- 6 files changed, 30 insertions(+), 393 deletions(-) delete mode 100644 pkg/queue/health/health_state.go delete mode 100644 pkg/queue/health/health_state_test.go diff --git a/cmd/queue/main.go b/cmd/queue/main.go index 1ebecc3c8d92..21e62eb6b6cf 100644 --- a/cmd/queue/main.go +++ b/cmd/queue/main.go @@ -162,11 +162,13 @@ func main() { probe = buildProbe(logger, env.ServingReadinessProbe, env.EnableHTTP2AutoDetection).ProbeContainer } - healthState := health.NewState() - mainServer := buildServer(ctx, env, healthState, probe, stats, logger) + drainer := &pkghandler.Drainer{ + QuietPeriod: drainSleepDuration, + } + mainServer := buildServer(ctx, env, drainer, probe, stats, logger) servers := map[string]*http.Server{ "main": mainServer, - "admin": buildAdminServer(logger, healthState), + "admin": buildAdminServer(logger, drainer), "metrics": buildMetricsServer(promStatReporter, protoStatReporter), } if env.EnableProfiling { @@ -225,19 +227,11 @@ func main() { os.Exit(1) case <-ctx.Done(): logger.Info("Received TERM signal, attempting to gracefully shutdown servers.") - healthState.Shutdown(func() { - logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration) - time.Sleep(drainSleepDuration) - - // Calling server.Shutdown() allows pending requests to - // complete, while no new work is accepted. - logger.Info("Shutting down main server") - if err := mainServer.Shutdown(context.Background()); err != nil { - logger.Errorw("Failed to shutdown proxy server", zap.Error(err)) - } - // Removing the main server from the shutdown logic as we've already shut it down. - delete(servers, "main") - }) + logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration) + drainer.Drain() + + // Removing the main server from the shutdown logic as we've already shut it down. + delete(servers, "main") for serverName, srv := range servers { logger.Info("Shutting down server: ", serverName) @@ -260,7 +254,7 @@ func buildProbe(logger *zap.SugaredLogger, encodedProbe string, autodetectHTTP2 return readiness.NewProbe(coreProbe) } -func buildServer(ctx context.Context, env config, healthState *health.State, probeContainer func() bool, stats *network.RequestStats, +func buildServer(ctx context.Context, env config, drainer *pkghandler.Drainer, probeContainer func() bool, stats *network.RequestStats, logger *zap.SugaredLogger) *http.Server { maxIdleConns := 1000 // TODO: somewhat arbitrary value for CC=0, needs experimental validation. @@ -311,7 +305,7 @@ func buildServer(ctx context.Context, env config, healthState *health.State, pro composedHandler = tracing.HTTPSpanMiddleware(composedHandler) } - composedHandler = health.ProbeHandler(healthState, probeContainer, tracingEnabled, composedHandler) + composedHandler = health.ProbeHandler(probeContainer, tracingEnabled, composedHandler) composedHandler = network.NewProbeHandler(composedHandler) // We might sometimes want to capture the probes/healthchecks in the request // logs. Hence we need to have RequestLogHandler to be the first one. @@ -319,7 +313,10 @@ func buildServer(ctx context.Context, env config, healthState *health.State, pro composedHandler = requestLogHandler(logger, composedHandler, env) } - return pkgnet.NewServer(":"+env.QueueServingPort, composedHandler) + drainer.Inner = composedHandler + drainer.HealthCheck = composedHandler.ServeHTTP + + return pkgnet.NewServer(":"+env.QueueServingPort, drainer) } func buildTransport(env config, logger *zap.SugaredLogger, maxConns int) http.RoundTripper { @@ -375,12 +372,11 @@ func supportsMetrics(ctx context.Context, logger *zap.SugaredLogger, env config) return true } -func buildAdminServer(logger *zap.SugaredLogger, healthState *health.State) *http.Server { +func buildAdminServer(logger *zap.SugaredLogger, drainer *pkghandler.Drainer) *http.Server { adminMux := http.NewServeMux() - drainHandler := healthState.DrainHandlerFunc() adminMux.HandleFunc(queue.RequestQueueDrainPath, func(w http.ResponseWriter, r *http.Request) { logger.Info("Attached drain handler from user-container") - drainHandler(w, r) + drainer.Drain() }) return &http.Server{ diff --git a/cmd/queue/main_test.go b/cmd/queue/main_test.go index 27ebcbf180dc..386a7a8720e2 100644 --- a/cmd/queue/main_test.go +++ b/cmd/queue/main_test.go @@ -105,7 +105,6 @@ func TestQueueTraceSpans(t *testing.T) { enableTrace: false, }} - healthState := &health.State{} for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { // Create tracer with reporter recorder @@ -149,7 +148,7 @@ func TestQueueTraceSpans(t *testing.T) { h := queue.ProxyHandler(breaker, network.NewRequestStats(time.Now()), true /*tracingEnabled*/, proxy) h(writer, req) } else { - h := health.ProbeHandler(healthState, tc.prober, true /*tracingEnabled*/, nil) + h := health.ProbeHandler(tc.prober, true /*tracingEnabled*/, nil) req.Header.Set(network.ProbeHeaderName, tc.requestHeader) h(writer, req) } diff --git a/pkg/queue/health/handler.go b/pkg/queue/health/handler.go index 201187c228f7..2cab6718b6f4 100644 --- a/pkg/queue/health/handler.go +++ b/pkg/queue/health/handler.go @@ -17,6 +17,7 @@ limitations under the License. package health import ( + "io" "net/http" "go.opencensus.io/trace" @@ -29,7 +30,7 @@ const badProbeTemplate = "unexpected probe header value: " // ProbeHandler returns a http.HandlerFunc that responds to health checks if the // knative network probe header is passed, and otherwise delegates to the next handler. -func ProbeHandler(healthState *State, prober func() bool, tracingEnabled bool, next http.Handler) http.HandlerFunc { +func ProbeHandler(prober func() bool, tracingEnabled bool, next http.Handler) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { ph := network.KnativeProbeHeader(r) @@ -58,13 +59,13 @@ func ProbeHandler(healthState *State, prober func() bool, tracingEnabled bool, n return } - healthState.HandleHealthProbe(func() bool { - if !prober() { - probeSpan.Annotate([]trace.Attribute{ - trace.StringAttribute("queueproxy.probe.error", "container not ready")}, "error") - return false - } - return true - }, w) + if !prober() { + probeSpan.Annotate([]trace.Attribute{ + trace.StringAttribute("queueproxy.probe.error", "container not ready")}, "error") + w.WriteHeader(http.StatusServiceUnavailable) + return + } + + io.WriteString(w, queue.Name) } } diff --git a/pkg/queue/health/handler_test.go b/pkg/queue/health/handler_test.go index e26bf0b1a362..1fdcf9599177 100644 --- a/pkg/queue/health/handler_test.go +++ b/pkg/queue/health/handler_test.go @@ -70,7 +70,6 @@ func TestProbeHandler(t *testing.T) { wantCode: http.StatusOK, }} - healthState := &State{} for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { writer := httptest.NewRecorder() @@ -80,7 +79,7 @@ func TestProbeHandler(t *testing.T) { req.Header.Set(network.ProbeHeaderName, tc.requestHeader) } - h := ProbeHandler(healthState, tc.prober, true /*tracingEnabled*/, incHandler) + h := ProbeHandler(tc.prober, true /*tracingEnabled*/, incHandler) h(writer, req) if got, want := writer.Code, tc.wantCode; got != want { diff --git a/pkg/queue/health/health_state.go b/pkg/queue/health/health_state.go deleted file mode 100644 index 0e9750bc5123..000000000000 --- a/pkg/queue/health/health_state.go +++ /dev/null @@ -1,135 +0,0 @@ -/* -Copyright 2019 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package health - -import ( - "io" - "net/http" - "sync" - - "knative.dev/serving/pkg/queue" -) - -// State holds state about the current healthiness of the component. -type State struct { - alive bool - shuttingDown bool - mutex sync.RWMutex - - drainCh chan struct{} - drainCompleted bool -} - -// NewState returns a new State with both alive and shuttingDown set to false. -func NewState() *State { - return &State{ - drainCh: make(chan struct{}), - } -} - -// isAlive returns whether or not the health server is in a known -// working state currently. -func (h *State) isAlive() bool { - h.mutex.RLock() - defer h.mutex.RUnlock() - - return h.alive -} - -// isShuttingDown returns whether or not the health server is currently -// shutting down. -func (h *State) isShuttingDown() bool { - h.mutex.RLock() - defer h.mutex.RUnlock() - - return h.shuttingDown -} - -// setAlive updates the state to declare the service alive. -func (h *State) setAlive() { - h.mutex.Lock() - defer h.mutex.Unlock() - - h.alive = true - h.shuttingDown = false -} - -// shutdown updates the state to declare the service shutting down. -func (h *State) shutdown() { - h.mutex.Lock() - defer h.mutex.Unlock() - - h.alive = false - h.shuttingDown = true -} - -// drainFinish updates that we finished draining. -func (h *State) drainFinished() { - h.mutex.Lock() - defer h.mutex.Unlock() - - if !h.drainCompleted { - close(h.drainCh) - } - - h.drainCompleted = true -} - -// HandleHealthProbe handles the probe according to the current state of the -// health server. -func (h *State) HandleHealthProbe(prober func() bool, w http.ResponseWriter) { - sendAlive := func() { - io.WriteString(w, queue.Name) - } - - sendNotAlive := func() { - w.WriteHeader(http.StatusServiceUnavailable) - } - - sendShuttingDown := func() { - w.WriteHeader(http.StatusGone) - } - - switch { - case h.isShuttingDown(): - sendShuttingDown() - case prober != nil && !prober(): - sendNotAlive() - default: - h.setAlive() - sendAlive() - } -} - -// DrainHandlerFunc constructs an HTTP handler that waits until the proxy server is shut down. -func (h *State) DrainHandlerFunc() func(_ http.ResponseWriter, _ *http.Request) { - return func(_ http.ResponseWriter, _ *http.Request) { - <-h.drainCh - } -} - -// Shutdown marks the proxy server as not ready and begins its shutdown process. This -// results in unblocking any connections waiting for drain. -func (h *State) Shutdown(drain func()) { - h.shutdown() - - if drain != nil { - drain() - } - - h.drainFinished() -} diff --git a/pkg/queue/health/health_state_test.go b/pkg/queue/health/health_state_test.go deleted file mode 100644 index 74d004e7c357..000000000000 --- a/pkg/queue/health/health_state_test.go +++ /dev/null @@ -1,223 +0,0 @@ -/* -Copyright 2019 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package health - -import ( - "net/http" - "net/http/httptest" - "testing" - "time" - - "knative.dev/serving/pkg/queue" -) - -func TestHealthStateSetsState(t *testing.T) { - s := NewState() - - wantAlive := func() { - if !s.isAlive() { - t.Error("State was not alive but it should have been alive") - } - } - wantNotAlive := func() { - if s.isAlive() { - t.Error("State was alive but it shouldn't have been") - } - } - wantShuttingDown := func() { - if !s.isShuttingDown() { - t.Error("State was not shutting down but it should have been") - } - } - wantNotShuttingDown := func() { - if s.isShuttingDown() { - t.Error("State was shutting down but it shouldn't have been") - } - } - - wantNotAlive() - wantNotShuttingDown() - - s.setAlive() - wantAlive() - wantNotShuttingDown() - - s.shutdown() - wantNotAlive() - wantShuttingDown() -} - -func TestHealthStateHealthHandler(t *testing.T) { - tests := []struct { - name string - alive bool - shuttingDown bool - prober func() bool - wantStatus int - wantBody string - }{{ - name: "alive: true", - alive: true, - wantStatus: http.StatusOK, - wantBody: queue.Name, - }, { - name: "alive: false, prober: true", - prober: func() bool { return true }, - wantStatus: http.StatusOK, - wantBody: queue.Name, - }, { - name: "alive: false, prober: false", - prober: func() bool { return false }, - wantStatus: http.StatusServiceUnavailable, - }, { - name: "alive: false, no prober", - wantStatus: http.StatusOK, - wantBody: queue.Name, - }, { - name: "shuttingDown: true", - shuttingDown: true, - wantStatus: http.StatusGone, - }, { - name: "no prober, shuttingDown: false", - wantStatus: http.StatusOK, - wantBody: queue.Name, - }, { - name: "prober: true, shuttingDown: true", - shuttingDown: true, - prober: func() bool { return true }, - wantStatus: http.StatusGone, - }, { - name: "prober: true, shuttingDown: false", - prober: func() bool { return true }, - wantStatus: http.StatusOK, - wantBody: queue.Name, - }, { - name: "prober: false, shuttingDown: false", - prober: func() bool { return false }, - wantStatus: http.StatusServiceUnavailable, - }, { - name: "prober: false, shuttingDown: true", - shuttingDown: true, - prober: func() bool { return false }, - wantStatus: http.StatusGone, - }, { - name: "alive: true, prober: false, shuttingDown: false", - alive: true, - prober: func() bool { return false }, - wantStatus: http.StatusServiceUnavailable, - }} - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - state := NewState() - state.alive = test.alive - state.shuttingDown = test.shuttingDown - - rr := httptest.NewRecorder() - state.HandleHealthProbe(test.prober, rr) - - if got, want := rr.Code, test.wantStatus; got != want { - t.Errorf("handler returned wrong status code: got %v want %v", got, want) - } - - if got, want := rr.Body.String(), test.wantBody; got != want { - t.Errorf("handler returned unexpected body: got %v want %v", got, want) - } - }) - } -} - -func TestHealthStateDrainHandler(t *testing.T) { - state := NewState() - state.setAlive() - - req := httptest.NewRequest(http.MethodGet, "/", nil) - rr := httptest.NewRecorder() - - completedCh := make(chan struct{}, 1) - handler := http.HandlerFunc(state.DrainHandlerFunc()) - go func(handler http.Handler, recorder *httptest.ResponseRecorder) { - handler.ServeHTTP(recorder, req) - close(completedCh) - }(handler, rr) - - state.drainFinished() - <-completedCh - - if rr.Code != http.StatusOK { - t.Errorf("handler returned wrong status code: got %v want %v", - rr.Code, http.StatusOK) - } -} - -func TestHealthStateDrainHandlerNotRacy(t *testing.T) { - state := NewState() - state.setAlive() - - // Complete the drain _before_ the DrainHandlerFunc is attached. - state.drainFinished() - - req := httptest.NewRequest(http.MethodGet, "/", nil) - rr := httptest.NewRecorder() - - completedCh := make(chan struct{}, 1) - handler := http.HandlerFunc(state.DrainHandlerFunc()) - go func(handler http.Handler, recorder *httptest.ResponseRecorder) { - handler.ServeHTTP(recorder, req) - close(completedCh) - }(handler, rr) - - select { - case <-completedCh: - case <-time.After(5 * time.Second): - t.Fatalf("timed out waiting for drain handler to return") - } - - if rr.Code != http.StatusOK { - t.Errorf("handler returned wrong status code: got %v want %v", - rr.Code, http.StatusOK) - } -} - -func TestHealthStateShutdown(t *testing.T) { - state := NewState() - state.setAlive() - - calledCh := make(chan struct{}, 1) - state.Shutdown(func() { - close(calledCh) - }) - - // The channel should be closed as the cleaner is called. - select { - case <-calledCh: - case <-time.After(2 * time.Second): - t.Error("drain function not called when shutting down") - } - - if !state.drainCompleted { - t.Error("shutdown did not complete draining") - } - - if !state.shuttingDown { - t.Errorf("wrong shutdown state: got %v want %v", state.shuttingDown, true) - } - - if state.alive { - t.Errorf("wrong alive state: got %v want %v", state.alive, false) - } -}