Skip to content

Commit b3417f9

Browse files
committed
Add support for the v5 protocol
1 parent 04a4dc5 commit b3417f9

File tree

2 files changed

+167
-7
lines changed

2 files changed

+167
-7
lines changed

src/web-socket-handler.ts

+62-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,13 @@ import stream = require('stream');
44
import { V1Status } from './api';
55
import { KubeConfig } from './config';
66

7-
const protocols = ['v4.channel.k8s.io', 'v3.channel.k8s.io', 'v2.channel.k8s.io', 'channel.k8s.io'];
7+
const protocols = [
8+
'v5.channel.k8s.io',
9+
'v4.channel.k8s.io',
10+
'v3.channel.k8s.io',
11+
'v2.channel.k8s.io',
12+
'channel.k8s.io',
13+
];
814

915
export type TextHandler = (text: string) => boolean;
1016
export type BinaryHandler = (stream: number, buff: Buffer) => boolean;
@@ -17,12 +23,41 @@ export interface WebSocketInterface {
1723
): Promise<WebSocket.WebSocket>;
1824
}
1925

26+
export interface StreamInterface {
27+
stdin: stream.Readable;
28+
stdout: stream.Writable;
29+
stderr: stream.Writable;
30+
}
31+
2032
export class WebSocketHandler implements WebSocketInterface {
2133
public static readonly StdinStream: number = 0;
2234
public static readonly StdoutStream: number = 1;
2335
public static readonly StderrStream: number = 2;
2436
public static readonly StatusStream: number = 3;
2537
public static readonly ResizeStream: number = 4;
38+
public static readonly CloseStream: number = 255;
39+
40+
public negotiatedProtocol: string | null = null;
41+
42+
public static supportsClose(protocol: string): boolean {
43+
return protocol === 'v5.channel.k8s.io';
44+
}
45+
46+
public static closeStream(streamNum: number, streams: StreamInterface): void {
47+
console.log('Closing stream: ' + streamNum);
48+
switch (streamNum) {
49+
case WebSocketHandler.StdinStream:
50+
streams.stdin.pause();
51+
break;
52+
case WebSocketHandler.StdoutStream:
53+
console.log('closing stdout');
54+
streams.stdout.end();
55+
break;
56+
case WebSocketHandler.StderrStream:
57+
streams.stderr.end();
58+
break;
59+
}
60+
}
2661

2762
public static handleStandardStreams(
2863
streamNum: number,
@@ -39,6 +74,7 @@ export class WebSocketHandler implements WebSocketInterface {
3974
stderr.write(buff);
4075
} else if (streamNum === WebSocketHandler.StatusStream) {
4176
// stream closing.
77+
// Hacky, change tests to use the stream interface
4278
if (stdout && stdout !== process.stdout) {
4379
stdout.end();
4480
}
@@ -69,6 +105,12 @@ export class WebSocketHandler implements WebSocketInterface {
69105
});
70106

71107
stdin.on('end', () => {
108+
if (WebSocketHandler.supportsClose(ws.protocol)) {
109+
const buff = Buffer.alloc(2);
110+
buff.writeUint8(this.CloseStream, 0);
111+
buff.writeUint8(this.StdinStream, 1);
112+
ws.send(buff);
113+
}
72114
ws.close();
73115
});
74116
// Keep the stream open
@@ -141,7 +183,16 @@ export class WebSocketHandler implements WebSocketInterface {
141183
// factory is really just for test injection
142184
public constructor(
143185
readonly config: KubeConfig,
144-
readonly socketFactory?: (uri: string, opts: WebSocket.ClientOptions) => WebSocket.WebSocket,
186+
readonly socketFactory?: (
187+
uri: string,
188+
protocols: string[],
189+
opts: WebSocket.ClientOptions,
190+
) => WebSocket.WebSocket,
191+
readonly streams: StreamInterface = {
192+
stdin: process.stdin,
193+
stdout: process.stdout,
194+
stderr: process.stderr,
195+
},
145196
) {}
146197

147198
/**
@@ -173,7 +224,7 @@ export class WebSocketHandler implements WebSocketInterface {
173224

174225
return await new Promise<WebSocket.WebSocket>((resolve, reject) => {
175226
const client = this.socketFactory
176-
? this.socketFactory(uri, opts)
227+
? this.socketFactory(uri, protocols, opts)
177228
: new WebSocket(uri, protocols, opts);
178229
let resolved = false;
179230

@@ -191,11 +242,18 @@ export class WebSocketHandler implements WebSocketInterface {
191242
client.onmessage = ({ data }: { data: WebSocket.Data }) => {
192243
// TODO: support ArrayBuffer and Buffer[] data types?
193244
if (typeof data === 'string') {
245+
if (data.charCodeAt(0) === WebSocketHandler.CloseStream) {
246+
WebSocketHandler.closeStream(data.charCodeAt(1), this.streams);
247+
}
194248
if (textHandler && !textHandler(data)) {
195249
client.close();
196250
}
197251
} else if (data instanceof Buffer) {
198-
const streamNum = data.readInt8(0);
252+
const streamNum = data.readUint8(0);
253+
if (streamNum === WebSocketHandler.CloseStream) {
254+
console.log('Closing stream!');
255+
WebSocketHandler.closeStream(data.readInt8(1), this.streams);
256+
}
199257
if (binaryHandler && !binaryHandler(streamNum, data.subarray(1))) {
200258
client.close();
201259
}

src/web-socket-handler_test.ts

+105-3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { promisify } from 'util';
22
import { expect } from 'chai';
33
import WebSocket = require('isomorphic-ws');
44
import { ReadableStreamBuffer, WritableStreamBuffer } from 'stream-buffers';
5+
import stream = require('stream');
56

67
import { V1Status } from './api';
78
import { KubeConfig } from './config';
@@ -119,7 +120,7 @@ describe('WebSocket', () => {
119120

120121
const handler = new WebSocketHandler(
121122
kc,
122-
(uri: string, opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
123+
(uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
123124
uriOut = uri;
124125
return mockWs as WebSocket.WebSocket;
125126
},
@@ -170,7 +171,7 @@ describe('WebSocket', () => {
170171

171172
const handler = new WebSocketHandler(
172173
kc,
173-
(uri: string, opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
174+
(uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
174175
uriOut = uri;
175176
return mockWs as WebSocket.WebSocket;
176177
},
@@ -239,7 +240,7 @@ describe('WebSocket', () => {
239240

240241
const handler = new WebSocketHandler(
241242
kc,
242-
(uri: string, opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
243+
(uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
243244
uriOut = uri;
244245
return mockWs as WebSocket.WebSocket;
245246
},
@@ -303,6 +304,107 @@ describe('WebSocket', () => {
303304
});
304305
});
305306

307+
describe('V5 protocol support', () => {
308+
it('should handle close', async () => {
309+
const kc = new KubeConfig();
310+
const host = 'foo.company.com';
311+
const server = `https://${host}`;
312+
kc.clusters = [
313+
{
314+
name: 'cluster',
315+
server,
316+
} as Cluster,
317+
] as Cluster[];
318+
kc.contexts = [
319+
{
320+
cluster: 'cluster',
321+
user: 'user',
322+
} as Context,
323+
] as Context[];
324+
kc.users = [
325+
{
326+
name: 'user',
327+
} as User,
328+
];
329+
330+
const mockWs = {
331+
protocol: 'v5.channel.k8s.io',
332+
} as WebSocket.WebSocket;
333+
let uriOut = '';
334+
let endCalled = false;
335+
const handler = new WebSocketHandler(
336+
kc,
337+
(uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
338+
uriOut = uri;
339+
return mockWs as WebSocket.WebSocket;
340+
},
341+
{
342+
stdin: process.stdin,
343+
stderr: process.stderr,
344+
stdout: {
345+
end: () => {
346+
endCalled = true;
347+
},
348+
} as stream.Writable,
349+
},
350+
);
351+
const path = '/some/path';
352+
353+
const promise = handler.connect(path, null, null);
354+
await setImmediatePromise();
355+
356+
expect(uriOut).to.equal(`wss://${host}${path}`);
357+
358+
const event = {
359+
target: mockWs,
360+
type: 'open',
361+
};
362+
mockWs.onopen!(event);
363+
const errEvt = {
364+
error: {},
365+
message: 'some message',
366+
type: 'some type',
367+
target: mockWs,
368+
};
369+
const closeBuff = Buffer.alloc(2);
370+
closeBuff.writeUint8(255, 0);
371+
closeBuff.writeUint8(WebSocketHandler.StdoutStream, 1);
372+
373+
mockWs.onmessage!({
374+
data: closeBuff,
375+
type: 'type',
376+
target: mockWs,
377+
});
378+
await promise;
379+
expect(endCalled).to.be.true;
380+
});
381+
it('should handle closing stdin < v4 protocol', () => {
382+
const ws = {
383+
// send is not defined, so this will throw if we try to send the close message.
384+
close: () => {},
385+
} as WebSocket;
386+
const stdinStream = new ReadableStreamBuffer();
387+
WebSocketHandler.handleStandardInput(ws, stdinStream);
388+
stdinStream.emit('end');
389+
});
390+
it('should handle closing stdin v5 protocol', () => {
391+
let sent: Buffer | null = null;
392+
const ws = {
393+
protocol: 'v5.channel.k8s.io',
394+
send: (data) => {
395+
sent = data;
396+
},
397+
close: () => {},
398+
} as WebSocket;
399+
const stdinStream = new ReadableStreamBuffer();
400+
WebSocketHandler.handleStandardInput(ws, stdinStream);
401+
stdinStream.emit('end');
402+
expect(sent).to.not.be.null;
403+
expect(sent!.readUint8(0)).to.equal(255); // CLOSE signal
404+
expect(sent!.readUInt8(1)).to.equal(0); // Stdin stream is #0
405+
});
406+
});
407+
306408
describe('Restartable Handle Standard Input', () => {
307409
it('should throw on negative retry', () => {
308410
const p = new Promise<WebSocket.WebSocket>(() => {});

0 commit comments

Comments
 (0)