Skip to content

Commit

Permalink
user pkg drain for draining in queue proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
nader-ziada committed Oct 7, 2021
1 parent a6f740b commit bac24b4
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 127 deletions.
27 changes: 11 additions & 16 deletions cmd/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func main() {
}

healthState := health.NewState()
mainServer := buildServer(ctx, env, healthState, probe, stats, logger)
mainServer, drainer := buildServer(ctx, env, healthState, probe, stats, logger)
servers := map[string]*http.Server{
"main": mainServer,
"admin": buildAdminServer(logger, healthState),
Expand Down Expand Up @@ -240,19 +240,9 @@ func main() {
os.Exit(1)
case <-ctx.Done():
logger.Info("Received TERM signal, attempting to gracefully shutdown servers.")
healthState.Shutdown(func() {
logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration)
time.Sleep(drainSleepDuration)

// Calling server.Shutdown() allows pending requests to
// complete, while no new work is accepted.
logger.Info("Shutting down main server")
if err := mainServer.Shutdown(context.Background()); err != nil {
logger.Errorw("Failed to shutdown proxy server", zap.Error(err))
}
// Removing the main server from the shutdown logic as we've already shut it down.
delete(servers, "main")
})
logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration)
drainer.Drain()
healthState.DrainFinished()

for serverName, srv := range servers {
logger.Info("Shutting down server: ", serverName)
Expand All @@ -276,7 +266,7 @@ func buildProbe(logger *zap.SugaredLogger, encodedProbe string, autodetectHTTP2
}

func buildServer(ctx context.Context, env config, healthState *health.State, probeContainer func() bool, stats *network.RequestStats,
logger *zap.SugaredLogger) *http.Server {
logger *zap.SugaredLogger) (*http.Server, *pkghandler.Drainer) {

maxIdleConns := 1000 // TODO: somewhat arbitrary value for CC=0, needs experimental validation.
if env.ContainerConcurrency > 0 {
Expand Down Expand Up @@ -332,7 +322,12 @@ func buildServer(ctx context.Context, env config, healthState *health.State, pro
// logs. Hence we need to have RequestLogHandler to be the first one.
composedHandler = pushRequestLogHandler(logger, composedHandler, env)

return pkgnet.NewServer(":"+env.QueueServingPort, composedHandler)
drainer := &pkghandler.Drainer{
Inner: composedHandler,
QuietPeriod: drainSleepDuration,
}

return pkgnet.NewServer(":"+env.QueueServingPort, drainer), drainer
}

func buildTransport(env config, logger *zap.SugaredLogger, maxConns int) http.RoundTripper {
Expand Down
49 changes: 4 additions & 45 deletions pkg/queue/health/health_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ import (

// State holds state about the current healthiness of the component.
type State struct {
alive bool
shuttingDown bool
mutex sync.RWMutex
alive bool
mutex sync.RWMutex

drainCh chan struct{}
drainCompleted bool
Expand All @@ -50,42 +49,20 @@ func (h *State) isAlive() bool {
return h.alive
}

// isShuttingDown returns whether or not the health server is currently
// shutting down.
func (h *State) isShuttingDown() bool {
h.mutex.RLock()
defer h.mutex.RUnlock()

return h.shuttingDown
}

// setAlive updates the state to declare the service alive.
func (h *State) setAlive() {
h.mutex.Lock()
defer h.mutex.Unlock()

h.alive = true
h.shuttingDown = false
}

// shutdown updates the state to declare the service shutting down.
func (h *State) shutdown() {
h.mutex.Lock()
defer h.mutex.Unlock()

h.alive = false
h.shuttingDown = true
}

// drainFinish updates that we finished draining.
func (h *State) drainFinished() {
func (h *State) DrainFinished() {
h.mutex.Lock()
defer h.mutex.Unlock()

if !h.drainCompleted {
close(h.drainCh)
}

close(h.drainCh)
h.drainCompleted = true
}

Expand All @@ -100,13 +77,7 @@ func (h *State) HandleHealthProbe(prober func() bool, w http.ResponseWriter) {
w.WriteHeader(http.StatusServiceUnavailable)
}

sendShuttingDown := func() {
w.WriteHeader(http.StatusGone)
}

switch {
case h.isShuttingDown():
sendShuttingDown()
case prober != nil && !prober():
sendNotAlive()
default:
Expand All @@ -121,15 +92,3 @@ func (h *State) DrainHandlerFunc() func(_ http.ResponseWriter, _ *http.Request)
<-h.drainCh
}
}

// Shutdown marks the proxy server as not ready and begins its shutdown process. This
// results in unblocking any connections waiting for drain.
func (h *State) Shutdown(drain func()) {
h.shutdown()

if drain != nil {
drain()
}

h.drainFinished()
}
72 changes: 6 additions & 66 deletions pkg/queue/health/health_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,11 @@ func TestHealthStateSetsState(t *testing.T) {
t.Error("State was alive but it shouldn't have been")
}
}
wantShuttingDown := func() {
if !s.isShuttingDown() {
t.Error("State was not shutting down but it should have been")
}
}
wantNotShuttingDown := func() {
if s.isShuttingDown() {
t.Error("State was shutting down but it shouldn't have been")
}
}

wantNotAlive()
wantNotShuttingDown()

s.setAlive()
wantAlive()
wantNotShuttingDown()

s.shutdown()
wantNotAlive()
wantShuttingDown()
}

func TestHealthStateHealthHandler(t *testing.T) {
Expand Down Expand Up @@ -88,34 +72,20 @@ func TestHealthStateHealthHandler(t *testing.T) {
wantStatus: http.StatusOK,
wantBody: queue.Name,
}, {
name: "shuttingDown: true",
shuttingDown: true,
wantStatus: http.StatusGone,
}, {
name: "no prober, shuttingDown: false",
name: "no prober",
wantStatus: http.StatusOK,
wantBody: queue.Name,
}, {
name: "prober: true, shuttingDown: true",
shuttingDown: true,
prober: func() bool { return true },
wantStatus: http.StatusGone,
}, {
name: "prober: true, shuttingDown: false",
name: "prober: true",
prober: func() bool { return true },
wantStatus: http.StatusOK,
wantBody: queue.Name,
}, {
name: "prober: false, shuttingDown: false",
name: "prober: false",
prober: func() bool { return false },
wantStatus: http.StatusServiceUnavailable,
}, {
name: "prober: false, shuttingDown: true",
shuttingDown: true,
prober: func() bool { return false },
wantStatus: http.StatusGone,
}, {
name: "alive: true, prober: false, shuttingDown: false",
name: "alive: true, prober: false",
alive: true,
prober: func() bool { return false },
wantStatus: http.StatusServiceUnavailable,
Expand All @@ -125,7 +95,6 @@ func TestHealthStateHealthHandler(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
state := NewState()
state.alive = test.alive
state.shuttingDown = test.shuttingDown

rr := httptest.NewRecorder()
state.HandleHealthProbe(test.prober, rr)
Expand Down Expand Up @@ -155,7 +124,7 @@ func TestHealthStateDrainHandler(t *testing.T) {
close(completedCh)
}(handler, rr)

state.drainFinished()
state.DrainFinished()
<-completedCh

if rr.Code != http.StatusOK {
Expand All @@ -169,7 +138,7 @@ func TestHealthStateDrainHandlerNotRacy(t *testing.T) {
state.setAlive()

// Complete the drain _before_ the DrainHandlerFunc is attached.
state.drainFinished()
state.DrainFinished()

req := httptest.NewRequest(http.MethodGet, "/", nil)
rr := httptest.NewRecorder()
Expand All @@ -192,32 +161,3 @@ func TestHealthStateDrainHandlerNotRacy(t *testing.T) {
rr.Code, http.StatusOK)
}
}

func TestHealthStateShutdown(t *testing.T) {
state := NewState()
state.setAlive()

calledCh := make(chan struct{}, 1)
state.Shutdown(func() {
close(calledCh)
})

// The channel should be closed as the cleaner is called.
select {
case <-calledCh:
case <-time.After(2 * time.Second):
t.Error("drain function not called when shutting down")
}

if !state.drainCompleted {
t.Error("shutdown did not complete draining")
}

if !state.shuttingDown {
t.Errorf("wrong shutdown state: got %v want %v", state.shuttingDown, true)
}

if state.alive {
t.Errorf("wrong alive state: got %v want %v", state.alive, false)
}
}

0 comments on commit bac24b4

Please sign in to comment.