Skip to content

Commit 133f960

Browse files
authored
Merge pull request #2389 from bverhoeven/fix/watch-double-done-callback
watch: prevent done callback from being called twice on connection loss
2 parents 0cb7802 + c3e4d62 commit 133f960

File tree

2 files changed

+66
-2
lines changed

2 files changed

+66
-2
lines changed

src/watch.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ export class Watch {
4646
let doneCalled: boolean = false;
4747
const doneCallOnce = (err: any) => {
4848
if (!doneCalled) {
49-
controller.abort();
5049
doneCalled = true;
50+
controller.abort();
5151
done(err);
5252
}
5353
};

src/watch_test.ts

+65-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ import { PassThrough } from 'node:stream';
55
import { KubeConfig } from './config.js';
66
import { Cluster, Context, User } from './config_types.js';
77
import { Watch } from './watch.js';
8-
import { IncomingMessage } from 'node:http';
8+
import { IncomingMessage, createServer } from 'node:http';
9+
import { AddressInfo } from 'node:net';
910

1011
const server = 'https://foo.company.com';
1112

@@ -167,6 +168,69 @@ describe('Watch', () => {
167168
stream.destroy();
168169
});
169170

171+
it('should not call the done callback more than once on unexpected connection loss', async () => {
172+
// Create a server that accepts the connection and flushes headers, then
173+
// immediately destroys the connection (causing a "Premature close"
174+
// error).
175+
//
176+
// This reproduces a bug where AbortController.abort() inside
177+
// doneCallOnce could cause done() to be invoked twice.
178+
179+
const mockServer = createServer((req, res) => {
180+
res.writeHead(200, {
181+
'Content-Type': 'application/json',
182+
'Transfer-Encoding': 'chunked',
183+
});
184+
185+
res.flushHeaders();
186+
res.destroy(); // Prematurely close the connection
187+
});
188+
189+
const mockServerPort = await new Promise<number>((resolve) => {
190+
mockServer.listen(0, () => {
191+
resolve((mockServer.address() as AddressInfo).port);
192+
});
193+
});
194+
195+
const kc = new KubeConfig();
196+
197+
kc.loadFromClusterAndUser(
198+
{
199+
name: 'cluster',
200+
server: `http://localhost:${mockServerPort}`,
201+
skipTLSVerify: true,
202+
},
203+
{
204+
name: 'user',
205+
},
206+
);
207+
208+
const watch = new Watch(kc);
209+
210+
let doneCalled = 0;
211+
let doneResolve: () => void;
212+
213+
const donePromise = new Promise<void>((resolve) => {
214+
doneResolve = resolve;
215+
});
216+
217+
await watch.watch(
218+
'/some/path/to/object',
219+
{},
220+
() => {},
221+
() => {
222+
doneCalled += 1;
223+
doneResolve();
224+
},
225+
);
226+
227+
await donePromise;
228+
229+
mockServer.close();
230+
231+
strictEqual(doneCalled, 1);
232+
});
233+
170234
it('should call setKeepAlive on the socket to extend the default of 5 mins', async () => {
171235
const kc = new KubeConfig();
172236

0 commit comments

Comments
 (0)