Skip to content

Commit 9df37a0

Browse files
committed
feat(reactive-rpc): 🎸 build full messages out of fragments by default
1 parent 6e2470f commit 9df37a0

File tree

2 files changed

+112
-2
lines changed

2 files changed

+112
-2
lines changed

src/reactive-rpc/server/ws/server/WsServerConnection.ts

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import * as crypto from 'crypto';
22
import * as stream from 'stream';
33
import {WsCloseFrame, WsFrameDecoder, WsFrameHeader, WsFrameOpcode, WsPingFrame, WsPongFrame} from '../codec';
44
import {utf8Size} from '../../../../util/strings/utf8';
5+
import {listToUint8} from '../../../../util/buffers/concat';
56
import type {WsFrameEncoder} from '../codec/WsFrameEncoder';
67

78
export type WsServerConnectionSocket = stream.Duplex;
@@ -15,8 +16,25 @@ export class WsServerConnection {
1516
this.sendPong(data);
1617
};
1718

19+
private _fragments: Uint8Array[] = [];
20+
private _fragmentsSize: number = 0;
21+
public readonly defaultOnFragment = (isLast: boolean, data: Uint8Array, isUtf8: boolean): void => {
22+
const fragments = this._fragments;
23+
this._fragmentsSize += data.length;
24+
if (this._fragmentsSize > this.maxIncomingMessage) {
25+
this.onClose(1009, 'TOO_LARGE');
26+
return;
27+
}
28+
fragments.push(data);
29+
if (!isLast) return;
30+
this._fragments = [];
31+
this._fragmentsSize = 0;
32+
const message = listToUint8(fragments);
33+
this.onmessage(message, isUtf8);
34+
};
35+
1836
public onmessage: (data: Uint8Array, isUtf8: boolean) => void = () => {};
19-
public onfragment: (isLast: boolean, data: Uint8Array, isUtf8: boolean) => void = () => {};
37+
public onfragment: (isLast: boolean, data: Uint8Array, isUtf8: boolean) => void = this.defaultOnFragment;
2038
public onping: (data: Uint8Array | null) => void = this.defaultOnPing;
2139
public onpong: (data: Uint8Array | null) => void = () => {};
2240
public onclose: (code: number, reason: string) => void = () => {};

src/reactive-rpc/server/ws/server/__tests__/WsServerConnection.spec.ts

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,37 @@ describe('.onmessage', () => {
115115
expect(messages[0]).toEqual([new Uint8Array([0x01, 0x02, 0x03]), false]);
116116
expect(messages[1]).toEqual([new Uint8Array([0x04, 0x05, 0x06]), false]);
117117
});
118+
119+
test('errors when incoming message is too large', async () => {
120+
const {socket, encoder, connection} = setup();
121+
connection.maxIncomingMessage = 3;
122+
const messages: [data: Uint8Array, isUtf8: boolean][] = [];
123+
connection.onmessage = (data: Uint8Array, isUtf8: boolean): void => {
124+
messages.push([data, isUtf8]);
125+
};
126+
const closes: [code: number, reason: string][] = [];
127+
connection.onclose = (code: number, reason: string): void => {
128+
closes.push([code, reason]);
129+
};
130+
const pingFrame = encoder.encodePong(Buffer.from([0x01, 0x02, 0x03]));
131+
const frame1 = encoder.encodeHdr(1, WsFrameOpcode.BINARY, 3, 0);
132+
encoder.writer.buf(Buffer.from([0x01, 0x02, 0x03]), 3);
133+
const payload1 = encoder.writer.flush();
134+
const frame2 = encoder.encodeHdr(1, WsFrameOpcode.BINARY, 4, 0);
135+
encoder.writer.buf(Buffer.from([0x04, 0x05, 0x06, 0x07]), 4);
136+
const payload2 = encoder.writer.flush();
137+
socket.write(pingFrame);
138+
socket.write(listToUint8([
139+
frame1,
140+
payload1,
141+
frame2,
142+
payload2,
143+
]));
144+
await until(() => messages.length === 1);
145+
await until(() => closes.length === 1);
146+
expect(messages[0]).toEqual([new Uint8Array([0x01, 0x02, 0x03]), false]);
147+
expect(closes[0]).toEqual([1009, 'TOO_LARGE']);
148+
});
118149

119150
test('text frame', async () => {
120151
const {socket, encoder, connection} = setup();
@@ -175,7 +206,7 @@ describe('.onmessage', () => {
175206
});
176207
});
177208

178-
describe('.fragment', () => {
209+
describe('.onfragment', () => {
179210
test('parses out message fragments', async () => {
180211
const {socket, encoder, connection} = setup();
181212
const fragments: [isLast: boolean, data: Uint8Array, isUtf8: boolean][] = [];
@@ -206,4 +237,65 @@ describe('.fragment', () => {
206237
[true, new Uint8Array([0x07, 0x08, 0x09]), false],
207238
]);
208239
});
240+
241+
describe('when .onfragment is not defined', () => {
242+
test('emits an .onmessage with fully assembled message', async () => {
243+
const {socket, encoder, connection} = setup();
244+
const messages: [data: Uint8Array, isUtf8: boolean][] = [];
245+
connection.onmessage = (data: Uint8Array, isUtf8: boolean): void => {
246+
messages.push([data, isUtf8]);
247+
};
248+
const pingFrame = encoder.encodePong(Buffer.from([0x01, 0x02, 0x03]));
249+
const buf1 = encoder.encodeHdr(0, WsFrameOpcode.BINARY, 3, 0);
250+
encoder.writer.buf(Buffer.from([0x01, 0x02, 0x03]), 3);
251+
const buf2 = encoder.writer.flush();
252+
const buf3 = encoder.encodeHdr(0, WsFrameOpcode.CONTINUE, 3, 0);
253+
encoder.writer.buf(Buffer.from([0x04, 0x05, 0x06]), 3);
254+
const buf4 = encoder.writer.flush();
255+
const buf5 = encoder.encodeHdr(1, WsFrameOpcode.CONTINUE, 3, 0);
256+
encoder.writer.buf(Buffer.from([0x07, 0x08, 0x09]), 3);
257+
const buf6 = encoder.writer.flush();
258+
socket.write(pingFrame);
259+
socket.write(buf1);
260+
socket.write(buf2);
261+
socket.write(buf3);
262+
socket.write(buf4);
263+
socket.write(buf5);
264+
socket.write(buf6);
265+
await until(() => messages.length === 1);
266+
expect(messages).toEqual([
267+
[new Uint8Array([0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07,0x08, 0x09]), false],
268+
]);
269+
});
270+
271+
test('errors out when incoming message is too large', async () => {
272+
const {socket, encoder, connection} = setup();
273+
connection.maxIncomingMessage = 8;
274+
const closes: [code: number, reason: string][] = [];
275+
connection.onclose = (code: number, reason: string): void => {
276+
closes.push([code, reason]);
277+
};
278+
const pingFrame = encoder.encodePong(Buffer.from([0x01, 0x02, 0x03]));
279+
const buf1 = encoder.encodeHdr(0, WsFrameOpcode.BINARY, 3, 0);
280+
encoder.writer.buf(Buffer.from([0x01, 0x02, 0x03]), 3);
281+
const buf2 = encoder.writer.flush();
282+
const buf3 = encoder.encodeHdr(0, WsFrameOpcode.CONTINUE, 3, 0);
283+
encoder.writer.buf(Buffer.from([0x04, 0x05, 0x06]), 3);
284+
const buf4 = encoder.writer.flush();
285+
const buf5 = encoder.encodeHdr(1, WsFrameOpcode.CONTINUE, 3, 0);
286+
encoder.writer.buf(Buffer.from([0x07, 0x08, 0x09]), 3);
287+
const buf6 = encoder.writer.flush();
288+
socket.write(pingFrame);
289+
socket.write(buf1);
290+
socket.write(buf2);
291+
socket.write(buf3);
292+
socket.write(buf4);
293+
socket.write(buf5);
294+
socket.write(buf6);
295+
await until(() => closes.length === 1);
296+
expect(closes).toEqual([
297+
[1009, 'TOO_LARGE'],
298+
]);
299+
});
300+
});
209301
});

0 commit comments

Comments
 (0)