Skip to content

Commit 38392b3

Browse files
authored
Merge pull request #501 from streamich/ws-improvements
WebSocket improvements
2 parents cc314a5 + c209dc2 commit 38392b3

File tree

2 files changed

+356
-38
lines changed

2 files changed

+356
-38
lines changed

src/reactive-rpc/server/ws/server/WsServerConnection.ts

Lines changed: 69 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,73 +1,104 @@
1-
import * as net from 'net';
21
import * as crypto from 'crypto';
2+
import * as stream from 'stream';
33
import {WsCloseFrame, WsFrameDecoder, WsFrameHeader, WsFrameOpcode, WsPingFrame, WsPongFrame} from '../codec';
44
import {utf8Size} from '../../../../util/strings/utf8';
5-
import {FanOut} from 'thingies/es2020/fanout';
5+
import {listToUint8} from '../../../../util/buffers/concat';
66
import type {WsFrameEncoder} from '../codec/WsFrameEncoder';
77

8+
export type WsServerConnectionSocket = stream.Duplex;
9+
810
export class WsServerConnection {
911
public closed: boolean = false;
1012
public maxIncomingMessage: number = 2 * 1024 * 1024;
1113
public maxBackpressure: number = 2 * 1024 * 1024;
1214

13-
/**
14-
* If this is not null, then the connection is receiving a stream: a sequence
15-
* of fragment frames.
16-
*/
17-
protected stream: FanOut<Uint8Array> | null = null;
18-
1915
public readonly defaultOnPing = (data: Uint8Array | null): void => {
2016
this.sendPong(data);
2117
};
2218

19+
private _fragments: Uint8Array[] = [];
20+
private _fragmentsSize: number = 0;
21+
public readonly defaultOnFragment = (isLast: boolean, data: Uint8Array, isUtf8: boolean): void => {
22+
const fragments = this._fragments;
23+
this._fragmentsSize += data.length;
24+
if (this._fragmentsSize > this.maxIncomingMessage) {
25+
this.onClose(1009, 'TOO_LARGE');
26+
return;
27+
}
28+
fragments.push(data);
29+
if (!isLast) return;
30+
this._fragments = [];
31+
this._fragmentsSize = 0;
32+
const message = listToUint8(fragments);
33+
this.onmessage(message, isUtf8);
34+
};
35+
2336
public onmessage: (data: Uint8Array, isUtf8: boolean) => void = () => {};
37+
public onfragment: (isLast: boolean, data: Uint8Array, isUtf8: boolean) => void = this.defaultOnFragment;
2438
public onping: (data: Uint8Array | null) => void = this.defaultOnPing;
2539
public onpong: (data: Uint8Array | null) => void = () => {};
2640
public onclose: (code: number, reason: string) => void = () => {};
2741

28-
constructor(protected readonly encoder: WsFrameEncoder, public readonly socket: net.Socket) {
42+
constructor(protected readonly encoder: WsFrameEncoder, public readonly socket: WsServerConnectionSocket) {
2943
const decoder = new WsFrameDecoder();
30-
let currentFrame: WsFrameHeader | null = null;
44+
let currentFrameHeader: WsFrameHeader | null = null;
45+
let fragmentStartFrameHeader: WsFrameHeader | null = null;
3146
const handleData = (data: Uint8Array): void => {
3247
try {
3348
decoder.push(data);
34-
if (currentFrame) {
35-
const length = currentFrame.length;
36-
if (length <= decoder.reader.size()) {
37-
const buf = new Uint8Array(length);
38-
decoder.copyFrameData(currentFrame, buf, 0);
39-
const isText = currentFrame.opcode === WsFrameOpcode.TEXT;
40-
currentFrame = null;
41-
this.onmessage(buf, isText);
49+
main: while (true) {
50+
if (currentFrameHeader instanceof WsFrameHeader) {
51+
const length = currentFrameHeader.length;
52+
if (length > this.maxIncomingMessage) {
53+
this.onClose(1009, 'TOO_LARGE');
54+
return;
55+
}
56+
if (length <= decoder.reader.size()) {
57+
const buf = new Uint8Array(length);
58+
decoder.copyFrameData(currentFrameHeader, buf, 0);
59+
if (fragmentStartFrameHeader instanceof WsFrameHeader) {
60+
const isText = fragmentStartFrameHeader.opcode === WsFrameOpcode.TEXT;
61+
const isLast = currentFrameHeader.fin === 1;
62+
currentFrameHeader = null;
63+
if (isLast) fragmentStartFrameHeader = null;
64+
this.onfragment(isLast, buf, isText);
65+
} else {
66+
const isText = currentFrameHeader.opcode === WsFrameOpcode.TEXT;
67+
currentFrameHeader = null;
68+
this.onmessage(buf, isText);
69+
}
70+
} else break;
4271
}
43-
}
44-
while (true) {
4572
const frame = decoder.readFrameHeader();
4673
if (!frame) break;
47-
else if (frame instanceof WsPingFrame) this.onping(frame.data);
48-
else if (frame instanceof WsPongFrame) this.onpong(frame.data);
49-
else if (frame instanceof WsCloseFrame) this.onClose(frame.code, frame.reason);
50-
else if (frame instanceof WsFrameHeader) {
51-
if (this.stream) {
74+
if (frame instanceof WsPingFrame) {
75+
this.onping(frame.data);
76+
continue main;
77+
}
78+
if (frame instanceof WsPongFrame) {
79+
this.onpong(frame.data);
80+
continue main;
81+
}
82+
if (frame instanceof WsCloseFrame) {
83+
decoder.readCloseFrameData(frame);
84+
this.onClose(frame.code, frame.reason);
85+
continue main;
86+
}
87+
if (frame instanceof WsFrameHeader) {
88+
if (fragmentStartFrameHeader) {
5289
if (frame.opcode !== WsFrameOpcode.CONTINUE) {
5390
this.onClose(1002, 'DATA');
5491
return;
5592
}
56-
throw new Error('streaming not implemented');
93+
currentFrameHeader = frame;
5794
}
58-
const length = frame.length;
59-
if (length > this.maxIncomingMessage) {
60-
this.onClose(1009, 'TOO_LARGE');
61-
return;
62-
}
63-
if (length <= decoder.reader.size()) {
64-
const buf = new Uint8Array(length);
65-
decoder.copyFrameData(frame, buf, 0);
66-
const isText = frame.opcode === WsFrameOpcode.TEXT;
67-
this.onmessage(buf, isText);
68-
} else {
69-
currentFrame = frame;
95+
if (frame.fin === 0) {
96+
fragmentStartFrameHeader = frame;
97+
currentFrameHeader = frame;
98+
continue main;
7099
}
100+
currentFrameHeader = frame;
101+
continue main;
71102
}
72103
}
73104
} catch (error) {

0 commit comments

Comments
 (0)