diff --git a/cmd/queue/main.go b/cmd/queue/main.go index 91e581934865..11506b30fb19 100644 --- a/cmd/queue/main.go +++ b/cmd/queue/main.go @@ -172,7 +172,7 @@ func main() { // Setup probe to run for checking user-application healthiness. probe := buildProbe(logger, env) - healthState := health.NewState() + healthState := health.NewState(drainSleepDuration) mainServer := buildServer(ctx, env, healthState, probe, stats, logger) servers := map[string]*http.Server{ @@ -238,7 +238,7 @@ func main() { logger.Info("Received TERM signal, attempting to gracefully shutdown servers.") healthState.Shutdown(func() { logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration) - time.Sleep(drainSleepDuration) + healthState.Drainer.Drain() // Calling server.Shutdown() allows pending requests to // complete, while no new work is accepted. @@ -379,16 +379,9 @@ func supportsMetrics(ctx context.Context, logger *zap.SugaredLogger, env config) } func buildAdminServer(logger *zap.SugaredLogger, healthState *health.State) *http.Server { - adminMux := http.NewServeMux() - drainHandler := healthState.DrainHandlerFunc() - adminMux.HandleFunc(queue.RequestQueueDrainPath, func(w http.ResponseWriter, r *http.Request) { - logger.Info("Attached drain handler from user-container") - drainHandler(w, r) - }) - return &http.Server{ Addr: ":" + strconv.Itoa(networking.QueueAdminPort), - Handler: adminMux, + Handler: healthState.Drainer, } } diff --git a/pkg/queue/health/health_state.go b/pkg/queue/health/health_state.go index 0e9750bc5123..bca8bf54eabb 100644 --- a/pkg/queue/health/health_state.go +++ b/pkg/queue/health/health_state.go @@ -20,24 +20,33 @@ import ( "io" "net/http" "sync" + "time" + "knative.dev/pkg/network/handlers" "knative.dev/serving/pkg/queue" ) // State holds state about the current healthiness of the component. type State struct { - alive bool - shuttingDown bool - mutex sync.RWMutex - - drainCh chan struct{} + alive bool + shuttingDown bool + mutex sync.RWMutex + Drainer *handlers.Drainer + AdminMux *http.ServeMux drainCompleted bool } // NewState returns a new State with both alive and shuttingDown set to false. -func NewState() *State { +// drainSleepDuration configures the QuietPeriod for the Drainer. +func NewState(drainSleepDuration time.Duration) *State { + adminMux := http.NewServeMux() + drainHandler := &handlers.Drainer{ + Inner: adminMux, + QuietPeriod: drainSleepDuration, + } return &State{ - drainCh: make(chan struct{}), + AdminMux: adminMux, + Drainer: drainHandler, } } @@ -82,10 +91,6 @@ func (h *State) drainFinished() { h.mutex.Lock() defer h.mutex.Unlock() - if !h.drainCompleted { - close(h.drainCh) - } - h.drainCompleted = true } @@ -115,13 +120,6 @@ func (h *State) HandleHealthProbe(prober func() bool, w http.ResponseWriter) { } } -// DrainHandlerFunc constructs an HTTP handler that waits until the proxy server is shut down. -func (h *State) DrainHandlerFunc() func(_ http.ResponseWriter, _ *http.Request) { - return func(_ http.ResponseWriter, _ *http.Request) { - <-h.drainCh - } -} - // Shutdown marks the proxy server as not ready and begins its shutdown process. This // results in unblocking any connections waiting for drain. func (h *State) Shutdown(drain func()) { diff --git a/pkg/queue/health/health_state_test.go b/pkg/queue/health/health_state_test.go index 74d004e7c357..907bb027673d 100644 --- a/pkg/queue/health/health_state_test.go +++ b/pkg/queue/health/health_state_test.go @@ -25,8 +25,10 @@ import ( "knative.dev/serving/pkg/queue" ) +const testDrainSleepDuration = 1 * time.Second + func TestHealthStateSetsState(t *testing.T) { - s := NewState() + s := NewState(testDrainSleepDuration) wantAlive := func() { if !s.isAlive() { @@ -123,7 +125,7 @@ func TestHealthStateHealthHandler(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - state := NewState() + state := NewState(testDrainSleepDuration) state.alive = test.alive state.shuttingDown = test.shuttingDown @@ -142,14 +144,14 @@ func TestHealthStateHealthHandler(t *testing.T) { } func TestHealthStateDrainHandler(t *testing.T) { - state := NewState() + state := NewState(testDrainSleepDuration) state.setAlive() req := httptest.NewRequest(http.MethodGet, "/", nil) rr := httptest.NewRecorder() completedCh := make(chan struct{}, 1) - handler := http.HandlerFunc(state.DrainHandlerFunc()) + handler := http.HandlerFunc(func(http.ResponseWriter, *http.Request) {}) go func(handler http.Handler, recorder *httptest.ResponseRecorder) { handler.ServeHTTP(recorder, req) close(completedCh) @@ -165,17 +167,16 @@ func TestHealthStateDrainHandler(t *testing.T) { } func TestHealthStateDrainHandlerNotRacy(t *testing.T) { - state := NewState() + state := NewState(testDrainSleepDuration) state.setAlive() - // Complete the drain _before_ the DrainHandlerFunc is attached. state.drainFinished() req := httptest.NewRequest(http.MethodGet, "/", nil) rr := httptest.NewRecorder() completedCh := make(chan struct{}, 1) - handler := http.HandlerFunc(state.DrainHandlerFunc()) + handler := http.HandlerFunc(func(http.ResponseWriter, *http.Request) {}) go func(handler http.Handler, recorder *httptest.ResponseRecorder) { handler.ServeHTTP(recorder, req) close(completedCh) @@ -194,7 +195,7 @@ func TestHealthStateDrainHandlerNotRacy(t *testing.T) { } func TestHealthStateShutdown(t *testing.T) { - state := NewState() + state := NewState(testDrainSleepDuration) state.setAlive() calledCh := make(chan struct{}, 1)