Skip to content

Commit

Permalink
user pkg drain for draining in queue proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
nader-ziada committed Oct 6, 2021
1 parent a6f740b commit 1806a57
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 19 deletions.
17 changes: 5 additions & 12 deletions cmd/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,11 @@ func main() {
probe = buildProbe(logger, env.ServingReadinessProbe, env.EnableHTTP2AutoDetection).ProbeContainer
}

healthState := health.NewState()
healthState := health.NewState(drainSleepDuration)
mainServer := buildServer(ctx, env, healthState, probe, stats, logger)
servers := map[string]*http.Server{
"main": mainServer,
"admin": buildAdminServer(logger, healthState),
"admin": buildAdminServer(healthState),
"metrics": buildMetricsServer(promStatReporter, protoStatReporter),
}
if env.EnableProfiling {
Expand Down Expand Up @@ -242,7 +242,7 @@ func main() {
logger.Info("Received TERM signal, attempting to gracefully shutdown servers.")
healthState.Shutdown(func() {
logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration)
time.Sleep(drainSleepDuration)
healthState.Drainer.Drain()

// Calling server.Shutdown() allows pending requests to
// complete, while no new work is accepted.
Expand Down Expand Up @@ -388,17 +388,10 @@ func supportsMetrics(ctx context.Context, logger *zap.SugaredLogger, env config)
return true
}

func buildAdminServer(logger *zap.SugaredLogger, healthState *health.State) *http.Server {
adminMux := http.NewServeMux()
drainHandler := healthState.DrainHandlerFunc()
adminMux.HandleFunc(queue.RequestQueueDrainPath, func(w http.ResponseWriter, r *http.Request) {
logger.Info("Attached drain handler from user-container")
drainHandler(w, r)
})

func buildAdminServer(healthState *health.State) *http.Server {
return &http.Server{
Addr: ":" + strconv.Itoa(networking.QueueAdminPort),
Handler: adminMux,
Handler: healthState.Drainer,
}
}

Expand Down
20 changes: 18 additions & 2 deletions pkg/queue/health/health_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"io"
"net/http"
"sync"
"time"

"knative.dev/pkg/network/handlers"
"knative.dev/serving/pkg/queue"
)

Expand All @@ -30,15 +32,29 @@ type State struct {
shuttingDown bool
mutex sync.RWMutex

Drainer *handlers.Drainer
drainCh chan struct{}
drainCompleted bool
}

// NewState returns a new State with both alive and shuttingDown set to false.
func NewState() *State {
return &State{
// drainSleepDuration configures the QuietPeriod for the Drainer.
func NewState(drainSleepDuration time.Duration) *State {
state := &State{
drainCh: make(chan struct{}),
}

adminMux := http.NewServeMux()
adminMux.HandleFunc(queue.RequestQueueDrainPath, func(w http.ResponseWriter, r *http.Request) {
state.DrainHandlerFunc()
})

state.Drainer = &handlers.Drainer{
Inner: adminMux,
QuietPeriod: drainSleepDuration,
}

return state
}

// isAlive returns whether or not the health server is in a known
Expand Down
12 changes: 7 additions & 5 deletions pkg/queue/health/health_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
"knative.dev/serving/pkg/queue"
)

const testDrainSleepDuration = 1 * time.Second

func TestHealthStateSetsState(t *testing.T) {
s := NewState()
s := NewState(testDrainSleepDuration)

wantAlive := func() {
if !s.isAlive() {
Expand Down Expand Up @@ -123,7 +125,7 @@ func TestHealthStateHealthHandler(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
state := NewState()
state := NewState(testDrainSleepDuration)
state.alive = test.alive
state.shuttingDown = test.shuttingDown

Expand All @@ -142,7 +144,7 @@ func TestHealthStateHealthHandler(t *testing.T) {
}

func TestHealthStateDrainHandler(t *testing.T) {
state := NewState()
state := NewState(testDrainSleepDuration)
state.setAlive()

req := httptest.NewRequest(http.MethodGet, "/", nil)
Expand All @@ -165,7 +167,7 @@ func TestHealthStateDrainHandler(t *testing.T) {
}

func TestHealthStateDrainHandlerNotRacy(t *testing.T) {
state := NewState()
state := NewState(testDrainSleepDuration)
state.setAlive()

// Complete the drain _before_ the DrainHandlerFunc is attached.
Expand Down Expand Up @@ -194,7 +196,7 @@ func TestHealthStateDrainHandlerNotRacy(t *testing.T) {
}

func TestHealthStateShutdown(t *testing.T) {
state := NewState()
state := NewState(testDrainSleepDuration)
state.setAlive()

calledCh := make(chan struct{}, 1)
Expand Down

0 comments on commit 1806a57

Please sign in to comment.