diff --git a/cmd/queue/main.go b/cmd/queue/main.go index 385dbb897257..481ba523809b 100644 --- a/cmd/queue/main.go +++ b/cmd/queue/main.go @@ -177,11 +177,11 @@ func main() { probe = buildProbe(logger, env.ServingReadinessProbe, env.EnableHTTP2AutoDetection).ProbeContainer } - healthState := health.NewState() + healthState := health.NewState(drainSleepDuration) mainServer := buildServer(ctx, env, healthState, probe, stats, logger) servers := map[string]*http.Server{ "main": mainServer, - "admin": buildAdminServer(logger, healthState), + "admin": buildAdminServer(healthState), "metrics": buildMetricsServer(promStatReporter, protoStatReporter), } if env.EnableProfiling { @@ -242,7 +242,7 @@ func main() { logger.Info("Received TERM signal, attempting to gracefully shutdown servers.") healthState.Shutdown(func() { logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration) - time.Sleep(drainSleepDuration) + healthState.Drainer.Drain() // Calling server.Shutdown() allows pending requests to // complete, while no new work is accepted. @@ -388,17 +388,10 @@ func supportsMetrics(ctx context.Context, logger *zap.SugaredLogger, env config) return true } -func buildAdminServer(logger *zap.SugaredLogger, healthState *health.State) *http.Server { - adminMux := http.NewServeMux() - drainHandler := healthState.DrainHandlerFunc() - adminMux.HandleFunc(queue.RequestQueueDrainPath, func(w http.ResponseWriter, r *http.Request) { - logger.Info("Attached drain handler from user-container") - drainHandler(w, r) - }) - +func buildAdminServer(healthState *health.State) *http.Server { return &http.Server{ Addr: ":" + strconv.Itoa(networking.QueueAdminPort), - Handler: adminMux, + Handler: healthState.Drainer, } } diff --git a/pkg/queue/health/health_state.go b/pkg/queue/health/health_state.go index 0e9750bc5123..4e22e42ba9cd 100644 --- a/pkg/queue/health/health_state.go +++ b/pkg/queue/health/health_state.go @@ -20,7 +20,9 @@ import ( "io" "net/http" "sync" + "time" + "knative.dev/pkg/network/handlers" "knative.dev/serving/pkg/queue" ) @@ -30,15 +32,29 @@ type State struct { shuttingDown bool mutex sync.RWMutex + Drainer *handlers.Drainer drainCh chan struct{} drainCompleted bool } // NewState returns a new State with both alive and shuttingDown set to false. -func NewState() *State { - return &State{ +// drainSleepDuration configures the QuietPeriod for the Drainer. +func NewState(drainSleepDuration time.Duration) *State { + state := &State{ drainCh: make(chan struct{}), } + + adminMux := http.NewServeMux() + adminMux.HandleFunc(queue.RequestQueueDrainPath, func(w http.ResponseWriter, r *http.Request) { + state.DrainHandlerFunc() + }) + + state.Drainer = &handlers.Drainer{ + Inner: adminMux, + QuietPeriod: drainSleepDuration, + } + + return state } // isAlive returns whether or not the health server is in a known diff --git a/pkg/queue/health/health_state_test.go b/pkg/queue/health/health_state_test.go index 74d004e7c357..0f90befe1ae4 100644 --- a/pkg/queue/health/health_state_test.go +++ b/pkg/queue/health/health_state_test.go @@ -25,8 +25,10 @@ import ( "knative.dev/serving/pkg/queue" ) +const testDrainSleepDuration = 1 * time.Second + func TestHealthStateSetsState(t *testing.T) { - s := NewState() + s := NewState(testDrainSleepDuration) wantAlive := func() { if !s.isAlive() { @@ -123,7 +125,7 @@ func TestHealthStateHealthHandler(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - state := NewState() + state := NewState(testDrainSleepDuration) state.alive = test.alive state.shuttingDown = test.shuttingDown @@ -142,7 +144,7 @@ func TestHealthStateHealthHandler(t *testing.T) { } func TestHealthStateDrainHandler(t *testing.T) { - state := NewState() + state := NewState(testDrainSleepDuration) state.setAlive() req := httptest.NewRequest(http.MethodGet, "/", nil) @@ -165,7 +167,7 @@ func TestHealthStateDrainHandler(t *testing.T) { } func TestHealthStateDrainHandlerNotRacy(t *testing.T) { - state := NewState() + state := NewState(testDrainSleepDuration) state.setAlive() // Complete the drain _before_ the DrainHandlerFunc is attached. @@ -194,7 +196,7 @@ func TestHealthStateDrainHandlerNotRacy(t *testing.T) { } func TestHealthStateShutdown(t *testing.T) { - state := NewState() + state := NewState(testDrainSleepDuration) state.setAlive() calledCh := make(chan struct{}, 1)