Skip to content

Commit 7a48a83

Browse files
committed
fix: prevent PreStop deadlock with two-stage drain signal mechanism
The previous implementation had a circular dependency where: - User container PreStop waited for drain-complete file - Queue-proxy only wrote drain-complete after receiving SIGTERM - But SIGTERM was blocked waiting for PreStop to finish This fix implements a two-stage drain signal: 1. Queue-proxy PreStop writes drain-started immediately on pod deletion 2. User container PreStop waits for drain-started (with 3s timeout for safety) 3. Queue-proxy SIGTERM handler drains requests and writes drain-complete 4. User container waits for drain-complete before allowing termination This ensures proper shutdown sequencing without deadlock while still delaying user container termination until queue-proxy has drained. Also includes cleanup of stale drain signal files on queue-proxy startup. feat: improve PreStop drain coordination with exponential backoff - Replace fixed 3-second wait with exponential backoff (1, 2, 4, 8 seconds) - Change drain-complete check interval from 0.1s to 1s to reduce CPU usage - Exit gracefully if drain-started is never detected after retries - More robust handling of queue-proxy failures or slow PreStop execution This provides better resilience against timing issues while reducing unnecessary CPU usage during the wait loop. test: add comprehensive integration tests for shutdown coordination Add integration tests to verify the PreStop shutdown coordination works correctly in various scenarios: - Normal shutdown sequence with proper signal ordering - Queue-proxy crash/failure scenarios - High load conditions with many pending requests - File system permission issues - Race condition testing with 50 iterations - Long-running requests that exceed typical drain timeout These tests ensure the exponential backoff and two-stage drain signal mechanism handles edge cases gracefully. Run with: go test -tags=integration -race ./pkg/queue/sharedmain refactor: extract drain signal paths and logic to shared constants Based on PR review feedback, centralize drain signal configuration: - Create pkg/queue/drain/signals.go with all drain-related constants - Define signal file paths (DrainStartedFile, DrainCompleteFile) - Extract shell script logic into BuildDrainWaitScript() function - Define exponential backoff delays and check intervals as constants - Update all references to use the new constants package This improves code maintainability and makes it easier to modify drain behavior in the future. All file paths and timing parameters are now defined in a single location.
1 parent f6dd4b9 commit 7a48a83

File tree

8 files changed

+741
-12
lines changed

8 files changed

+741
-12
lines changed

pkg/queue/drain/signals.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
Copyright 2024 The Knative Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package drain
18+
19+
const (
20+
// SignalDirectory is the directory where drain signal files are created
21+
SignalDirectory = "/var/run/knative"
22+
23+
// DrainStartedFile indicates that pod termination has begun and queue-proxy is handling shutdown
24+
DrainStartedFile = SignalDirectory + "/drain-started"
25+
26+
// DrainCompleteFile indicates that queue-proxy has finished draining requests
27+
DrainCompleteFile = SignalDirectory + "/drain-complete"
28+
29+
// DrainCheckInterval is how often to check for drain completion
30+
DrainCheckInterval = "1" // seconds
31+
32+
// ExponentialBackoffDelays are the delays in seconds for checking drain-started file
33+
// Total max wait time: 1+2+4+8 = 15 seconds
34+
ExponentialBackoffDelays = "1 2 4 8"
35+
)
36+
37+
// BuildDrainWaitScript generates the shell script for waiting on drain signals.
38+
// If existingCommand is provided, it will be executed before the drain wait.
39+
func BuildDrainWaitScript(existingCommand string) string {
40+
drainLogic := `for i in ` + ExponentialBackoffDelays + `; do ` +
41+
` if [ -f ` + DrainStartedFile + ` ]; then ` +
42+
` until [ -f ` + DrainCompleteFile + ` ]; do sleep ` + DrainCheckInterval + `; done; ` +
43+
` exit 0; ` +
44+
` fi; ` +
45+
` sleep $i; ` +
46+
`done; ` +
47+
`exit 0`
48+
49+
if existingCommand != "" {
50+
return existingCommand + "; " + drainLogic
51+
}
52+
return drainLogic
53+
}
54+
55+
// QueueProxyPreStopScript is the script executed by queue-proxy's PreStop hook
56+
const QueueProxyPreStopScript = "touch " + DrainStartedFile

pkg/queue/drain/signals_test.go

Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
/*
2+
Copyright 2024 The Knative Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package drain
18+
19+
import (
20+
"strings"
21+
"testing"
22+
)
23+
24+
func TestConstants(t *testing.T) {
25+
tests := []struct {
26+
name string
27+
got string
28+
expected string
29+
}{
30+
{
31+
name: "SignalDirectory",
32+
got: SignalDirectory,
33+
expected: "/var/run/knative",
34+
},
35+
{
36+
name: "DrainStartedFile",
37+
got: DrainStartedFile,
38+
expected: "/var/run/knative/drain-started",
39+
},
40+
{
41+
name: "DrainCompleteFile",
42+
got: DrainCompleteFile,
43+
expected: "/var/run/knative/drain-complete",
44+
},
45+
{
46+
name: "DrainCheckInterval",
47+
got: DrainCheckInterval,
48+
expected: "1",
49+
},
50+
{
51+
name: "ExponentialBackoffDelays",
52+
got: ExponentialBackoffDelays,
53+
expected: "1 2 4 8",
54+
},
55+
{
56+
name: "QueueProxyPreStopScript",
57+
got: QueueProxyPreStopScript,
58+
expected: "touch /var/run/knative/drain-started",
59+
},
60+
}
61+
62+
for _, tt := range tests {
63+
t.Run(tt.name, func(t *testing.T) {
64+
if tt.got != tt.expected {
65+
t.Errorf("got %q, want %q", tt.got, tt.expected)
66+
}
67+
})
68+
}
69+
}
70+
71+
func TestBuildDrainWaitScript(t *testing.T) {
72+
tests := []struct {
73+
name string
74+
existingCommand string
75+
wantContains []string
76+
wantExact bool
77+
}{
78+
{
79+
name: "without existing command",
80+
existingCommand: "",
81+
wantContains: []string{
82+
"for i in 1 2 4 8",
83+
"if [ -f /var/run/knative/drain-started ]",
84+
"until [ -f /var/run/knative/drain-complete ]",
85+
"sleep 1",
86+
"sleep $i",
87+
"exit 0",
88+
},
89+
wantExact: false,
90+
},
91+
{
92+
name: "with existing command",
93+
existingCommand: "echo 'custom prestop'",
94+
wantContains: []string{
95+
"echo 'custom prestop'",
96+
"for i in 1 2 4 8",
97+
"if [ -f /var/run/knative/drain-started ]",
98+
"until [ -f /var/run/knative/drain-complete ]",
99+
"sleep 1",
100+
"sleep $i",
101+
"exit 0",
102+
},
103+
wantExact: false,
104+
},
105+
{
106+
name: "with complex existing command",
107+
existingCommand: "/bin/sh -c 'kill -TERM 1 && wait'",
108+
wantContains: []string{
109+
"/bin/sh -c 'kill -TERM 1 && wait'",
110+
"for i in 1 2 4 8",
111+
"if [ -f /var/run/knative/drain-started ]",
112+
"until [ -f /var/run/knative/drain-complete ]",
113+
},
114+
wantExact: false,
115+
},
116+
}
117+
118+
for _, tt := range tests {
119+
t.Run(tt.name, func(t *testing.T) {
120+
got := BuildDrainWaitScript(tt.existingCommand)
121+
122+
for _, want := range tt.wantContains {
123+
if !strings.Contains(got, want) {
124+
t.Errorf("BuildDrainWaitScript() missing expected substring %q\nGot: %q", want, got)
125+
}
126+
}
127+
128+
// Verify the command structure
129+
if tt.existingCommand != "" {
130+
// Should start with the existing command
131+
if !strings.HasPrefix(got, tt.existingCommand+"; ") {
132+
t.Errorf("BuildDrainWaitScript() should start with existing command followed by '; '\nGot: %q", got)
133+
}
134+
}
135+
136+
// Verify the script ends with exit 0
137+
if !strings.HasSuffix(got, "exit 0") {
138+
t.Errorf("BuildDrainWaitScript() should end with 'exit 0'\nGot: %q", got)
139+
}
140+
})
141+
}
142+
}
143+
144+
func TestBuildDrainWaitScriptStructure(t *testing.T) {
145+
// Test the exact structure of the generated script without existing command
146+
got := BuildDrainWaitScript("")
147+
expected := "for i in 1 2 4 8; do " +
148+
" if [ -f /var/run/knative/drain-started ]; then " +
149+
" until [ -f /var/run/knative/drain-complete ]; do sleep 1; done; " +
150+
" exit 0; " +
151+
" fi; " +
152+
" sleep $i; " +
153+
"done; " +
154+
"exit 0"
155+
156+
if got != expected {
157+
t.Errorf("BuildDrainWaitScript(\"\") structure mismatch\nGot: %q\nExpected: %q", got, expected)
158+
}
159+
}
160+
161+
func TestBuildDrainWaitScriptWithCommandStructure(t *testing.T) {
162+
// Test the exact structure of the generated script with existing command
163+
existingCmd := "echo 'test'"
164+
got := BuildDrainWaitScript(existingCmd)
165+
expected := "echo 'test'; for i in 1 2 4 8; do " +
166+
" if [ -f /var/run/knative/drain-started ]; then " +
167+
" until [ -f /var/run/knative/drain-complete ]; do sleep 1; done; " +
168+
" exit 0; " +
169+
" fi; " +
170+
" sleep $i; " +
171+
"done; " +
172+
"exit 0"
173+
174+
if got != expected {
175+
t.Errorf("BuildDrainWaitScript with command structure mismatch\nGot: %q\nExpected: %q", got, expected)
176+
}
177+
}
178+
179+
func TestBuildDrainWaitScriptEdgeCases(t *testing.T) {
180+
tests := []struct {
181+
name string
182+
existingCommand string
183+
checkFunc func(t *testing.T, result string)
184+
}{
185+
{
186+
name: "empty string produces valid script",
187+
existingCommand: "",
188+
checkFunc: func(t *testing.T, result string) {
189+
if result == "" {
190+
t.Error("BuildDrainWaitScript(\"\") should not return empty string")
191+
}
192+
if !strings.Contains(result, "for i in") {
193+
t.Error("BuildDrainWaitScript(\"\") should contain for loop")
194+
}
195+
},
196+
},
197+
{
198+
name: "command with semicolon",
199+
existingCommand: "cmd1; cmd2",
200+
checkFunc: func(t *testing.T, result string) {
201+
if !strings.HasPrefix(result, "cmd1; cmd2; ") {
202+
t.Error("BuildDrainWaitScript should preserve command with semicolons")
203+
}
204+
},
205+
},
206+
{
207+
name: "command with special characters",
208+
existingCommand: "echo '$VAR' && test -f /tmp/file",
209+
checkFunc: func(t *testing.T, result string) {
210+
if !strings.HasPrefix(result, "echo '$VAR' && test -f /tmp/file; ") {
211+
t.Error("BuildDrainWaitScript should preserve special characters")
212+
}
213+
},
214+
},
215+
}
216+
217+
for _, tt := range tests {
218+
t.Run(tt.name, func(t *testing.T) {
219+
result := BuildDrainWaitScript(tt.existingCommand)
220+
tt.checkFunc(t, result)
221+
})
222+
}
223+
}
224+
225+
func BenchmarkBuildDrainWaitScript(b *testing.B) {
226+
testCases := []struct {
227+
name string
228+
command string
229+
}{
230+
{"NoCommand", ""},
231+
{"SimpleCommand", "echo test"},
232+
{"ComplexCommand", "/bin/sh -c 'kill -TERM 1 && wait'"},
233+
}
234+
235+
for _, tc := range testCases {
236+
b.Run(tc.name, func(b *testing.B) {
237+
for range b.N {
238+
_ = BuildDrainWaitScript(tc.command)
239+
}
240+
})
241+
}
242+
}

pkg/queue/sharedmain/main.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import (
4949
"knative.dev/serving/pkg/observability"
5050
"knative.dev/serving/pkg/queue"
5151
"knative.dev/serving/pkg/queue/certificate"
52+
"knative.dev/serving/pkg/queue/drain"
5253
"knative.dev/serving/pkg/queue/readiness"
5354
)
5455

@@ -273,8 +274,9 @@ func Main(opts ...Option) error {
273274

274275
logger.Info("Starting queue-proxy")
275276

276-
// Clean up any stale drain signal file from previous runs
277-
os.Remove("/var/run/knative/drain-complete")
277+
// Clean up any stale drain signal files from previous runs
278+
os.Remove(drain.DrainStartedFile)
279+
os.Remove(drain.DrainCompleteFile)
278280

279281
errCh := make(chan error)
280282
for name, server := range httpServers {
@@ -322,14 +324,16 @@ func Main(opts ...Option) error {
322324
for range ticker.C {
323325
if pendingRequests.Load() <= 0 {
324326
logger.Infof("Drain: all pending requests completed")
325-
// Write drain signal file for PreStop hooks to detect
326-
if err := os.WriteFile("/var/run/knative/drain-complete", []byte(""), 0o600); err != nil {
327-
logger.Errorw("Failed to write drain signal file", zap.Error(err))
328-
}
329327
break WaitOnPendingRequests
330328
}
331329
}
332330

331+
// Write drain-complete signal file after draining is done
332+
// This signals to user containers that queue-proxy has finished draining
333+
if err := os.WriteFile(drain.DrainCompleteFile, []byte(""), 0o600); err != nil {
334+
logger.Errorw("Failed to write drain-complete signal file", zap.Error(err))
335+
}
336+
333337
for name, srv := range httpServers {
334338
logger.Info("Shutting down server: ", name)
335339
if err := srv.Shutdown(context.Background()); err != nil {

0 commit comments

Comments
 (0)