Skip to content

Commit f5f6b33

Browse files
committed
Add support for the v5 streaming protocol
1 parent 04a4dc5 commit f5f6b33

File tree

2 files changed

+165
-7
lines changed

2 files changed

+165
-7
lines changed

src/web-socket-handler.ts

+60-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,13 @@ import stream = require('stream');
44
import { V1Status } from './api';
55
import { KubeConfig } from './config';
66

7-
const protocols = ['v4.channel.k8s.io', 'v3.channel.k8s.io', 'v2.channel.k8s.io', 'channel.k8s.io'];
7+
const protocols = [
8+
'v5.channel.k8s.io',
9+
'v4.channel.k8s.io',
10+
'v3.channel.k8s.io',
11+
'v2.channel.k8s.io',
12+
'channel.k8s.io',
13+
];
814

915
export type TextHandler = (text: string) => boolean;
1016
export type BinaryHandler = (stream: number, buff: Buffer) => boolean;
@@ -17,12 +23,39 @@ export interface WebSocketInterface {
1723
): Promise<WebSocket.WebSocket>;
1824
}
1925

26+
export interface StreamInterface {
27+
stdin: stream.Readable;
28+
stdout: stream.Writable;
29+
stderr: stream.Writable;
30+
}
31+
2032
export class WebSocketHandler implements WebSocketInterface {
2133
public static readonly StdinStream: number = 0;
2234
public static readonly StdoutStream: number = 1;
2335
public static readonly StderrStream: number = 2;
2436
public static readonly StatusStream: number = 3;
2537
public static readonly ResizeStream: number = 4;
38+
public static readonly CloseStream: number = 255;
39+
40+
public static supportsClose(protocol: string): boolean {
41+
return protocol === 'v5.channel.k8s.io';
42+
}
43+
44+
public static closeStream(streamNum: number, streams: StreamInterface): void {
45+
console.log('Closing stream: ' + streamNum);
46+
switch (streamNum) {
47+
case WebSocketHandler.StdinStream:
48+
streams.stdin.pause();
49+
break;
50+
case WebSocketHandler.StdoutStream:
51+
console.log('closing stdout');
52+
streams.stdout.end();
53+
break;
54+
case WebSocketHandler.StderrStream:
55+
streams.stderr.end();
56+
break;
57+
}
58+
}
2659

2760
public static handleStandardStreams(
2861
streamNum: number,
@@ -39,6 +72,7 @@ export class WebSocketHandler implements WebSocketInterface {
3972
stderr.write(buff);
4073
} else if (streamNum === WebSocketHandler.StatusStream) {
4174
// stream closing.
75+
// Hacky, change tests to use the stream interface
4276
if (stdout && stdout !== process.stdout) {
4377
stdout.end();
4478
}
@@ -69,6 +103,12 @@ export class WebSocketHandler implements WebSocketInterface {
69103
});
70104

71105
stdin.on('end', () => {
106+
if (WebSocketHandler.supportsClose(ws.protocol)) {
107+
const buff = Buffer.alloc(2);
108+
buff.writeUint8(this.CloseStream, 0);
109+
buff.writeUint8(this.StdinStream, 1);
110+
ws.send(buff);
111+
}
72112
ws.close();
73113
});
74114
// Keep the stream open
@@ -141,7 +181,16 @@ export class WebSocketHandler implements WebSocketInterface {
141181
// factory is really just for test injection
142182
public constructor(
143183
readonly config: KubeConfig,
144-
readonly socketFactory?: (uri: string, opts: WebSocket.ClientOptions) => WebSocket.WebSocket,
184+
readonly socketFactory?: (
185+
uri: string,
186+
protocols: string[],
187+
opts: WebSocket.ClientOptions,
188+
) => WebSocket.WebSocket,
189+
readonly streams: StreamInterface = {
190+
stdin: process.stdin,
191+
stdout: process.stdout,
192+
stderr: process.stderr,
193+
},
145194
) {}
146195

147196
/**
@@ -173,7 +222,7 @@ export class WebSocketHandler implements WebSocketInterface {
173222

174223
return await new Promise<WebSocket.WebSocket>((resolve, reject) => {
175224
const client = this.socketFactory
176-
? this.socketFactory(uri, opts)
225+
? this.socketFactory(uri, protocols, opts)
177226
: new WebSocket(uri, protocols, opts);
178227
let resolved = false;
179228

@@ -191,11 +240,18 @@ export class WebSocketHandler implements WebSocketInterface {
191240
client.onmessage = ({ data }: { data: WebSocket.Data }) => {
192241
// TODO: support ArrayBuffer and Buffer[] data types?
193242
if (typeof data === 'string') {
243+
if (data.charCodeAt(0) === WebSocketHandler.CloseStream) {
244+
WebSocketHandler.closeStream(data.charCodeAt(1), this.streams);
245+
}
194246
if (textHandler && !textHandler(data)) {
195247
client.close();
196248
}
197249
} else if (data instanceof Buffer) {
198-
const streamNum = data.readInt8(0);
250+
const streamNum = data.readUint8(0);
251+
if (streamNum === WebSocketHandler.CloseStream) {
252+
console.log('Closing stream!');
253+
WebSocketHandler.closeStream(data.readInt8(1), this.streams);
254+
}
199255
if (binaryHandler && !binaryHandler(streamNum, data.subarray(1))) {
200256
client.close();
201257
}

src/web-socket-handler_test.ts

+105-3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { promisify } from 'util';
22
import { expect } from 'chai';
33
import WebSocket = require('isomorphic-ws');
44
import { ReadableStreamBuffer, WritableStreamBuffer } from 'stream-buffers';
5+
import stream = require('stream');
56

67
import { V1Status } from './api';
78
import { KubeConfig } from './config';
@@ -119,7 +120,7 @@ describe('WebSocket', () => {
119120

120121
const handler = new WebSocketHandler(
121122
kc,
122-
(uri: string, opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
123+
(uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
123124
uriOut = uri;
124125
return mockWs as WebSocket.WebSocket;
125126
},
@@ -170,7 +171,7 @@ describe('WebSocket', () => {
170171

171172
const handler = new WebSocketHandler(
172173
kc,
173-
(uri: string, opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
174+
(uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
174175
uriOut = uri;
175176
return mockWs as WebSocket.WebSocket;
176177
},
@@ -239,7 +240,7 @@ describe('WebSocket', () => {
239240

240241
const handler = new WebSocketHandler(
241242
kc,
242-
(uri: string, opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
243+
(uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
243244
uriOut = uri;
244245
return mockWs as WebSocket.WebSocket;
245246
},
@@ -303,6 +304,107 @@ describe('WebSocket', () => {
303304
});
304305
});
305306

307+
describe('V5 protocol support', () => {
308+
it('should handle close', async () => {
309+
const kc = new KubeConfig();
310+
const host = 'foo.company.com';
311+
const server = `https://${host}`;
312+
kc.clusters = [
313+
{
314+
name: 'cluster',
315+
server,
316+
} as Cluster,
317+
] as Cluster[];
318+
kc.contexts = [
319+
{
320+
cluster: 'cluster',
321+
user: 'user',
322+
} as Context,
323+
] as Context[];
324+
kc.users = [
325+
{
326+
name: 'user',
327+
} as User,
328+
];
329+
330+
const mockWs = {
331+
protocol: 'v5.channel.k8s.io',
332+
} as WebSocket.WebSocket;
333+
let uriOut = '';
334+
let endCalled = false;
335+
const handler = new WebSocketHandler(
336+
kc,
337+
(uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => {
338+
uriOut = uri;
339+
return mockWs as WebSocket.WebSocket;
340+
},
341+
{
342+
stdin: process.stdin,
343+
stderr: process.stderr,
344+
stdout: {
345+
end: () => {
346+
endCalled = true;
347+
},
348+
} as stream.Writable,
349+
},
350+
);
351+
const path = '/some/path';
352+
353+
const promise = handler.connect(path, null, null);
354+
await setImmediatePromise();
355+
356+
expect(uriOut).to.equal(`wss://${host}${path}`);
357+
358+
const event = {
359+
target: mockWs,
360+
type: 'open',
361+
};
362+
mockWs.onopen!(event);
363+
const errEvt = {
364+
error: {},
365+
message: 'some message',
366+
type: 'some type',
367+
target: mockWs,
368+
};
369+
const closeBuff = Buffer.alloc(2);
370+
closeBuff.writeUint8(255, 0);
371+
closeBuff.writeUint8(WebSocketHandler.StdoutStream, 1);
372+
373+
mockWs.onmessage!({
374+
data: closeBuff,
375+
type: 'type',
376+
target: mockWs,
377+
});
378+
await promise;
379+
expect(endCalled).to.be.true;
380+
});
381+
it('should handle closing stdin < v4 protocol', () => {
382+
const ws = {
383+
// send is not defined, so this will throw if we try to send the close message.
384+
close: () => {},
385+
} as WebSocket;
386+
const stdinStream = new ReadableStreamBuffer();
387+
WebSocketHandler.handleStandardInput(ws, stdinStream);
388+
stdinStream.emit('end');
389+
});
390+
it('should handle closing stdin v5 protocol', () => {
391+
let sent: Buffer | null = null;
392+
const ws = {
393+
protocol: 'v5.channel.k8s.io',
394+
send: (data) => {
395+
sent = data;
396+
},
397+
close: () => {},
398+
} as WebSocket;
399+
const stdinStream = new ReadableStreamBuffer();
400+
WebSocketHandler.handleStandardInput(ws, stdinStream);
401+
stdinStream.emit('end');
402+
expect(sent).to.not.be.null;
403+
expect(sent!.readUint8(0)).to.equal(255); // CLOSE signal
404+
expect(sent!.readUInt8(1)).to.equal(0); // Stdin stream is #0
405+
});
406+
});
407+
306408
describe('Restartable Handle Standard Input', () => {
307409
it('should throw on negative retry', () => {
308410
const p = new Promise<WebSocket.WebSocket>(() => {});

0 commit comments

Comments
 (0)