Skip to content

Commit 1806a57

Browse files
committed
user pkg drain for draining in queue proxy
1 parent a6f740b commit 1806a57

File tree

3 files changed

+30
-19
lines changed

3 files changed

+30
-19
lines changed

cmd/queue/main.go

+5-12
Original file line numberDiff line numberDiff line change
@@ -177,11 +177,11 @@ func main() {
177177
probe = buildProbe(logger, env.ServingReadinessProbe, env.EnableHTTP2AutoDetection).ProbeContainer
178178
}
179179

180-
healthState := health.NewState()
180+
healthState := health.NewState(drainSleepDuration)
181181
mainServer := buildServer(ctx, env, healthState, probe, stats, logger)
182182
servers := map[string]*http.Server{
183183
"main": mainServer,
184-
"admin": buildAdminServer(logger, healthState),
184+
"admin": buildAdminServer(healthState),
185185
"metrics": buildMetricsServer(promStatReporter, protoStatReporter),
186186
}
187187
if env.EnableProfiling {
@@ -242,7 +242,7 @@ func main() {
242242
logger.Info("Received TERM signal, attempting to gracefully shutdown servers.")
243243
healthState.Shutdown(func() {
244244
logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration)
245-
time.Sleep(drainSleepDuration)
245+
healthState.Drainer.Drain()
246246

247247
// Calling server.Shutdown() allows pending requests to
248248
// complete, while no new work is accepted.
@@ -388,17 +388,10 @@ func supportsMetrics(ctx context.Context, logger *zap.SugaredLogger, env config)
388388
return true
389389
}
390390

391-
func buildAdminServer(logger *zap.SugaredLogger, healthState *health.State) *http.Server {
392-
adminMux := http.NewServeMux()
393-
drainHandler := healthState.DrainHandlerFunc()
394-
adminMux.HandleFunc(queue.RequestQueueDrainPath, func(w http.ResponseWriter, r *http.Request) {
395-
logger.Info("Attached drain handler from user-container")
396-
drainHandler(w, r)
397-
})
398-
391+
func buildAdminServer(healthState *health.State) *http.Server {
399392
return &http.Server{
400393
Addr: ":" + strconv.Itoa(networking.QueueAdminPort),
401-
Handler: adminMux,
394+
Handler: healthState.Drainer,
402395
}
403396
}
404397

pkg/queue/health/health_state.go

+18-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ import (
2020
"io"
2121
"net/http"
2222
"sync"
23+
"time"
2324

25+
"knative.dev/pkg/network/handlers"
2426
"knative.dev/serving/pkg/queue"
2527
)
2628

@@ -30,15 +32,29 @@ type State struct {
3032
shuttingDown bool
3133
mutex sync.RWMutex
3234

35+
Drainer *handlers.Drainer
3336
drainCh chan struct{}
3437
drainCompleted bool
3538
}
3639

3740
// NewState returns a new State with both alive and shuttingDown set to false.
38-
func NewState() *State {
39-
return &State{
41+
// drainSleepDuration configures the QuietPeriod for the Drainer.
42+
func NewState(drainSleepDuration time.Duration) *State {
43+
state := &State{
4044
drainCh: make(chan struct{}),
4145
}
46+
47+
adminMux := http.NewServeMux()
48+
adminMux.HandleFunc(queue.RequestQueueDrainPath, func(w http.ResponseWriter, r *http.Request) {
49+
state.DrainHandlerFunc()
50+
})
51+
52+
state.Drainer = &handlers.Drainer{
53+
Inner: adminMux,
54+
QuietPeriod: drainSleepDuration,
55+
}
56+
57+
return state
4258
}
4359

4460
// isAlive returns whether or not the health server is in a known

pkg/queue/health/health_state_test.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@ import (
2525
"knative.dev/serving/pkg/queue"
2626
)
2727

28+
const testDrainSleepDuration = 1 * time.Second
29+
2830
func TestHealthStateSetsState(t *testing.T) {
29-
s := NewState()
31+
s := NewState(testDrainSleepDuration)
3032

3133
wantAlive := func() {
3234
if !s.isAlive() {
@@ -123,7 +125,7 @@ func TestHealthStateHealthHandler(t *testing.T) {
123125

124126
for _, test := range tests {
125127
t.Run(test.name, func(t *testing.T) {
126-
state := NewState()
128+
state := NewState(testDrainSleepDuration)
127129
state.alive = test.alive
128130
state.shuttingDown = test.shuttingDown
129131

@@ -142,7 +144,7 @@ func TestHealthStateHealthHandler(t *testing.T) {
142144
}
143145

144146
func TestHealthStateDrainHandler(t *testing.T) {
145-
state := NewState()
147+
state := NewState(testDrainSleepDuration)
146148
state.setAlive()
147149

148150
req := httptest.NewRequest(http.MethodGet, "/", nil)
@@ -165,7 +167,7 @@ func TestHealthStateDrainHandler(t *testing.T) {
165167
}
166168

167169
func TestHealthStateDrainHandlerNotRacy(t *testing.T) {
168-
state := NewState()
170+
state := NewState(testDrainSleepDuration)
169171
state.setAlive()
170172

171173
// Complete the drain _before_ the DrainHandlerFunc is attached.
@@ -194,7 +196,7 @@ func TestHealthStateDrainHandlerNotRacy(t *testing.T) {
194196
}
195197

196198
func TestHealthStateShutdown(t *testing.T) {
197-
state := NewState()
199+
state := NewState(testDrainSleepDuration)
198200
state.setAlive()
199201

200202
calledCh := make(chan struct{}, 1)

0 commit comments

Comments
 (0)