diff --git a/.changeset/tricky-parents-mate.md b/.changeset/tricky-parents-mate.md new file mode 100644 index 00000000..220c9129 --- /dev/null +++ b/.changeset/tricky-parents-mate.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-rsocket-router': patch +--- + +Fix WebSocket is not open: readyState 2 (CLOSING) diff --git a/packages/rsocket-router/src/router/transport/WebsocketDuplexConnection.ts b/packages/rsocket-router/src/router/transport/WebsocketDuplexConnection.ts index b9e5f088..a5c3d951 100644 --- a/packages/rsocket-router/src/router/transport/WebsocketDuplexConnection.ts +++ b/packages/rsocket-router/src/router/transport/WebsocketDuplexConnection.ts @@ -62,7 +62,11 @@ export class WebsocketDuplexConnection extends Deferred implements DuplexConnect return; } - this.websocketDuplex.removeAllListeners(); + // Important: do not remove the error handler here. + // That causes uncaught error handlers in some edge cases. + this.websocketDuplex.removeAllListeners('close'); + this.websocketDuplex.removeAllListeners('data'); + this.websocketDuplex.end(); super.close(error); @@ -88,13 +92,15 @@ export class WebsocketDuplexConnection extends Deferred implements DuplexConnect } } - private handleClosed = (e: WebSocket.CloseEvent): void => { - this.close(new Error(e.reason || 'WebsocketDuplexConnection: Socket closed unexpectedly.')); + private handleClosed = (e?: WebSocket.CloseEvent): void => { + this.close(new Error(e?.reason || 'WebsocketDuplexConnection: Socket closed unexpectedly.')); }; private handleError = (e: WebSocket.ErrorEvent): void => { logger.error(`Error in WebSocket duplex connection: ${e}`); - this.close(e.error); + if (!this.done) { + this.close(e.error); + } }; private handleMessage = (buffer: Buffer): void => { diff --git a/packages/rsocket-router/tests/src/socket.test.ts b/packages/rsocket-router/tests/src/socket.test.ts index c9214788..fc6b7797 100644 --- a/packages/rsocket-router/tests/src/socket.test.ts +++ b/packages/rsocket-router/tests/src/socket.test.ts @@ -7,15 +7,21 @@ import { WebsocketServerTransport } from '../../src/router/transport/WebSocketSe import { WebsocketDuplexConnection } from '../../src/router/transport/WebsocketDuplexConnection.js'; import { Duplex } from 'stream'; -const WS_PORT = process.env.WS_PORT ? parseInt(process.env.WS_PORT) : 4532; -const WS_ADDRESS = `ws://localhost:${WS_PORT}`; +let nextPort = 5433; describe('Sockets', () => { let server: WebSocket.WebSocketServer; let closeServer: () => void; + let WS_PORT = 0; + let WS_ADDRESS = ''; + beforeEach(() => { let closed = false; + + WS_PORT = process.env.WS_PORT ? parseInt(process.env.WS_PORT) : nextPort++; + WS_ADDRESS = `ws://localhost:${WS_PORT}`; + server = new WebSocket.WebSocketServer({ port: WS_PORT }); @@ -198,4 +204,92 @@ describe('Sockets', () => { await Promise.all(promises); await vi.waitFor(() => expect(serverCancelSpy.mock.calls.length).equals(testCount), { timeout: 2000 }); }); + + /** + * Similar to the above test, but checks for the case where + * the server closes the connection due to a keepalive timeout. + */ + it('should handle closed server connections correctly', async () => { + const transport = new WebsocketServerTransport({ + wsCreator: () => server + }); + + const onCancelWrapper = (callback: () => void) => callback(); + const serverCancelSpy = vi.fn(onCancelWrapper); + + // Create a simple server which will spam a lot of data to any connection + const rSocketServer = new RSocketServer({ + transport, + acceptor: { + accept: async () => { + return { + requestStream: (payload, initialN, responder) => { + let stop = false; + + setImmediate(async () => { + while (!stop) { + // To trigger the issue, we need to send multiple individual large messages. + // This builds up a buffer that will be sent after closing the connection. + for (let i = 0; i < 5; i++) { + responder.onNext({ data: Buffer.from('some payload'.repeat(100_000)) }, false); + } + await new Promise((r) => setTimeout(r, 1)); + } + }); + return { + request: () => {}, + onExtension: () => {}, + cancel: () => { + serverCancelSpy(() => { + stop = true; + }); + } + }; + } + }; + } + } + }); + rSocketServer.bind(); + + // Try and connect 10 times. Without the fix, + // more than 50% of these should fail. + // The socket will be closed by the server + const testCount = 10; + const promises = new Array(testCount).fill(null).map(async () => { + const testSocket = new WebSocket.WebSocket(WS_ADDRESS); + + const connector = new RSocketConnector({ + transport: new WebsocketClientTransport({ + url: WS_ADDRESS, + wsCreator: (url) => testSocket as any + }), + + setup: { + dataMimeType: 'application/bson', + metadataMimeType: 'application/bson', + + keepAlive: 20000, + // This should be long enough to trigger after the initial setup + lifetime: 20, + + payload: { + data: null + } + } + }); + + const connection = await connector.connect(); + + connection.requestStream({ data: null }, 1, { + onNext() {}, + onComplete: () => {}, + onExtension: () => {}, + onError: () => {} + }); + }); + + await Promise.all(promises); + await vi.waitFor(() => expect(serverCancelSpy.mock.calls.length).equals(testCount), { timeout: 2000 }); + }); });