Skip to content

Commit

Permalink
user pkg drain for draining in queue proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
nader-ziada committed Sep 27, 2021
1 parent 5f1ee08 commit f9ca3e1
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 36 deletions.
13 changes: 3 additions & 10 deletions cmd/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func main() {

// Setup probe to run for checking user-application healthiness.
probe := buildProbe(logger, env)
healthState := health.NewState()
healthState := health.NewState(drainSleepDuration)

mainServer := buildServer(ctx, env, healthState, probe, stats, logger)
servers := map[string]*http.Server{
Expand Down Expand Up @@ -238,7 +238,7 @@ func main() {
logger.Info("Received TERM signal, attempting to gracefully shutdown servers.")
healthState.Shutdown(func() {
logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration)
time.Sleep(drainSleepDuration)
healthState.Drainer.Drain()

// Calling server.Shutdown() allows pending requests to
// complete, while no new work is accepted.
Expand Down Expand Up @@ -379,16 +379,9 @@ func supportsMetrics(ctx context.Context, logger *zap.SugaredLogger, env config)
}

func buildAdminServer(logger *zap.SugaredLogger, healthState *health.State) *http.Server {
adminMux := http.NewServeMux()
drainHandler := healthState.DrainHandlerFunc()
adminMux.HandleFunc(queue.RequestQueueDrainPath, func(w http.ResponseWriter, r *http.Request) {
logger.Info("Attached drain handler from user-container")
drainHandler(w, r)
})

return &http.Server{
Addr: ":" + strconv.Itoa(networking.QueueAdminPort),
Handler: adminMux,
Handler: healthState.Drainer,
}
}

Expand Down
34 changes: 16 additions & 18 deletions pkg/queue/health/health_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,33 @@ import (
"io"
"net/http"
"sync"
"time"

"knative.dev/pkg/network/handlers"
"knative.dev/serving/pkg/queue"
)

// State holds state about the current healthiness of the component.
type State struct {
alive bool
shuttingDown bool
mutex sync.RWMutex

drainCh chan struct{}
alive bool
shuttingDown bool
mutex sync.RWMutex
Drainer *handlers.Drainer
AdminMux *http.ServeMux
drainCompleted bool
}

// NewState returns a new State with both alive and shuttingDown set to false.
func NewState() *State {
// drainSleepDuration configures the QuietPeriod for the Drainer.
func NewState(drainSleepDuration time.Duration) *State {
adminMux := http.NewServeMux()
drainHandler := &handlers.Drainer{
Inner: adminMux,
QuietPeriod: drainSleepDuration,
}
return &State{
drainCh: make(chan struct{}),
AdminMux: adminMux,
Drainer: drainHandler,
}
}

Expand Down Expand Up @@ -82,10 +91,6 @@ func (h *State) drainFinished() {
h.mutex.Lock()
defer h.mutex.Unlock()

if !h.drainCompleted {
close(h.drainCh)
}

h.drainCompleted = true
}

Expand Down Expand Up @@ -115,13 +120,6 @@ func (h *State) HandleHealthProbe(prober func() bool, w http.ResponseWriter) {
}
}

// DrainHandlerFunc constructs an HTTP handler that waits until the proxy server is shut down.
func (h *State) DrainHandlerFunc() func(_ http.ResponseWriter, _ *http.Request) {
return func(_ http.ResponseWriter, _ *http.Request) {
<-h.drainCh
}
}

// Shutdown marks the proxy server as not ready and begins its shutdown process. This
// results in unblocking any connections waiting for drain.
func (h *State) Shutdown(drain func()) {
Expand Down
17 changes: 9 additions & 8 deletions pkg/queue/health/health_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
"knative.dev/serving/pkg/queue"
)

const testDrainSleepDuration = 1 * time.Second

func TestHealthStateSetsState(t *testing.T) {
s := NewState()
s := NewState(testDrainSleepDuration)

wantAlive := func() {
if !s.isAlive() {
Expand Down Expand Up @@ -123,7 +125,7 @@ func TestHealthStateHealthHandler(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
state := NewState()
state := NewState(testDrainSleepDuration)
state.alive = test.alive
state.shuttingDown = test.shuttingDown

Expand All @@ -142,14 +144,14 @@ func TestHealthStateHealthHandler(t *testing.T) {
}

func TestHealthStateDrainHandler(t *testing.T) {
state := NewState()
state := NewState(testDrainSleepDuration)
state.setAlive()

req := httptest.NewRequest(http.MethodGet, "/", nil)
rr := httptest.NewRecorder()

completedCh := make(chan struct{}, 1)
handler := http.HandlerFunc(state.DrainHandlerFunc())
handler := http.HandlerFunc(func(http.ResponseWriter, *http.Request) {})
go func(handler http.Handler, recorder *httptest.ResponseRecorder) {
handler.ServeHTTP(recorder, req)
close(completedCh)
Expand All @@ -165,17 +167,16 @@ func TestHealthStateDrainHandler(t *testing.T) {
}

func TestHealthStateDrainHandlerNotRacy(t *testing.T) {
state := NewState()
state := NewState(testDrainSleepDuration)
state.setAlive()

// Complete the drain _before_ the DrainHandlerFunc is attached.
state.drainFinished()

req := httptest.NewRequest(http.MethodGet, "/", nil)
rr := httptest.NewRecorder()

completedCh := make(chan struct{}, 1)
handler := http.HandlerFunc(state.DrainHandlerFunc())
handler := http.HandlerFunc(func(http.ResponseWriter, *http.Request) {})
go func(handler http.Handler, recorder *httptest.ResponseRecorder) {
handler.ServeHTTP(recorder, req)
close(completedCh)
Expand All @@ -194,7 +195,7 @@ func TestHealthStateDrainHandlerNotRacy(t *testing.T) {
}

func TestHealthStateShutdown(t *testing.T) {
state := NewState()
state := NewState(testDrainSleepDuration)
state.setAlive()

calledCh := make(chan struct{}, 1)
Expand Down

0 comments on commit f9ca3e1

Please sign in to comment.