From 57ac274a16c04904d7008e114cc219993d4f8ccd Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Thu, 6 Jun 2024 13:36:39 +0200 Subject: [PATCH 1/8] fix websocket closing issue and add authentication errors --- .changeset/happy-flies-tease.md | 5 +++++ .changeset/smooth-frogs-wait.md | 5 +++++ .../src/router/ReactiveSocketRouter.ts | 10 +++++++++ packages/service-core/src/routes/router.ts | 1 + .../service-core/src/routes/socket-route.ts | 2 +- service/src/runners/server.ts | 3 ++- service/tsconfig.json | 21 ++++++++++++++++++- 7 files changed, 44 insertions(+), 3 deletions(-) create mode 100644 .changeset/happy-flies-tease.md create mode 100644 .changeset/smooth-frogs-wait.md diff --git a/.changeset/happy-flies-tease.md b/.changeset/happy-flies-tease.md new file mode 100644 index 00000000..7df2be62 --- /dev/null +++ b/.changeset/happy-flies-tease.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-core': patch +--- + +Fix missing authentication errors for websocket sync stream requests diff --git a/.changeset/smooth-frogs-wait.md b/.changeset/smooth-frogs-wait.md new file mode 100644 index 00000000..3e96d809 --- /dev/null +++ b/.changeset/smooth-frogs-wait.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-rsocket-router': patch +--- + +Fix issue where sending data during socket closing would throw an exception. diff --git a/packages/rsocket-router/src/router/ReactiveSocketRouter.ts b/packages/rsocket-router/src/router/ReactiveSocketRouter.ts index 6c71223c..4457be18 100644 --- a/packages/rsocket-router/src/router/ReactiveSocketRouter.ts +++ b/packages/rsocket-router/src/router/ReactiveSocketRouter.ts @@ -43,6 +43,16 @@ export class ReactiveSocketRouter { server.on('upgrade', (request, socket, head) => { wss.handleUpgrade(request, socket as any, head, (ws) => { + const originalSend = ws.send.bind(ws); + ws.send = (...args) => { + // Work around for this issue + // https://github.com/websockets/ws/issues/1515 + if (ws.readyState == ws.CLOSING || ws.readyState == ws.CLOSED) { + return; + } + // @ts-expect-error the overloaded function causes type issues which should be fine in this case + return originalSend(...args); + }; wss.emit('connection', ws, request); }); }); diff --git a/packages/service-core/src/routes/router.ts b/packages/service-core/src/routes/router.ts index ba508955..452f12d8 100644 --- a/packages/service-core/src/routes/router.ts +++ b/packages/service-core/src/routes/router.ts @@ -8,6 +8,7 @@ export type Context = { system: CorePowerSyncSystem; token_payload?: auth.JwtPayload; + token_errors?: string[]; }; /** diff --git a/packages/service-core/src/routes/socket-route.ts b/packages/service-core/src/routes/socket-route.ts index d607a42c..7890fa1f 100644 --- a/packages/service-core/src/routes/socket-route.ts +++ b/packages/service-core/src/routes/socket-route.ts @@ -13,7 +13,7 @@ export const sync_stream_reactive: SocketRouteGenerator = (router) => authorize: ({ context }) => { return { authorized: !!context.token_payload, - errors: ['Authentication required'] + errors: ['Authentication required'].concat(context.token_errors ?? []) }; }, validator: micro.schema.createTsCodecValidator(util.StreamingSyncRequest, { allowAdditional: true }), diff --git a/service/src/runners/server.ts b/service/src/runners/server.ts index 3197cd6d..2b4b196d 100644 --- a/service/src/runners/server.ts +++ b/service/src/runners/server.ts @@ -60,10 +60,11 @@ export async function startServer(runnerConfig: utils.RunnerConfig) { try { const extracted_token = routes.auth.getTokenFromHeader(token); if (extracted_token != null) { - const { context } = await routes.auth.generateContext(system, extracted_token); + const { context, errors } = await routes.auth.generateContext(system, extracted_token); return { token, ...context, + token_errors: errors, system }; } diff --git a/service/tsconfig.json b/service/tsconfig.json index ac2464a2..c3388d0a 100644 --- a/service/tsconfig.json +++ b/service/tsconfig.json @@ -8,5 +8,24 @@ "sourceMap": true }, "include": ["src"], - "references": [] + "references": [ + { + "path": "../packages/jpgwire" + }, + { + "path": "../packages/jsonbig" + }, + { + "path": "../packages/rsocket-router" + }, + { + "path": "../packages/service-core" + }, + { + "path": "../packages/sync-rules" + }, + { + "path": "../packages/types" + } + ] } From 97325ea0381feeb77b892f305c7c2c584708095d Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Mon, 10 Jun 2024 16:41:29 +0200 Subject: [PATCH 2/8] fix websocket bugs --- packages/rsocket-router/package.json | 5 +- .../src/router/ReactiveSocketRouter.ts | 12 +- .../transport/WebSocketServerTransport.ts | 130 ++++++++++++ .../transport/WebsocketDuplexConnection.ts | 164 +++++++++++++++ .../rsocket-router/tests/src/requests.test.ts | 179 +++++++++-------- .../rsocket-router/tests/src/socket.test.ts | 188 ++++++++++++++++++ pnpm-lock.yaml | 7 +- 7 files changed, 582 insertions(+), 103 deletions(-) create mode 100644 packages/rsocket-router/src/router/transport/WebSocketServerTransport.ts create mode 100644 packages/rsocket-router/src/router/transport/WebsocketDuplexConnection.ts create mode 100644 packages/rsocket-router/tests/src/socket.test.ts diff --git a/packages/rsocket-router/package.json b/packages/rsocket-router/package.json index ec37f12f..53d1d2ba 100644 --- a/packages/rsocket-router/package.json +++ b/packages/rsocket-router/package.json @@ -18,17 +18,18 @@ "test": "vitest" }, "dependencies": { + "@journeyapps-platform/micro": "^17.0.1", "rsocket-core": "1.0.0-alpha.3", "rsocket-websocket-server": "1.0.0-alpha.3", - "@journeyapps-platform/micro": "^17.0.1", "ts-codec": "^1.2.2", "uuid": "^9.0.1", - "ws": "~8.2.3" + "ws": "^8.17.0" }, "devDependencies": { "@types/uuid": "^9.0.4", "@types/ws": "~8.2.0", "bson": "^6.6.0", + "rsocket-websocket-client": "1.0.0-alpha.3", "typescript": "^5.2.2", "vitest": "^0.34.6" } diff --git a/packages/rsocket-router/src/router/ReactiveSocketRouter.ts b/packages/rsocket-router/src/router/ReactiveSocketRouter.ts index 4457be18..5d1696e5 100644 --- a/packages/rsocket-router/src/router/ReactiveSocketRouter.ts +++ b/packages/rsocket-router/src/router/ReactiveSocketRouter.ts @@ -6,7 +6,6 @@ import * as micro from '@journeyapps-platform/micro'; import * as http from 'http'; import { Payload, RSocketServer } from 'rsocket-core'; -import { WebsocketServerTransport } from 'rsocket-websocket-server'; import * as ws from 'ws'; import { SocketRouterObserver } from './SocketRouterListener.js'; import { @@ -17,6 +16,7 @@ import { ReactiveSocketRouterOptions, SocketResponder } from './types.js'; +import { WebsocketServerTransport } from './transport/WebSocketServerTransport.js'; export class ReactiveSocketRouter { constructor(protected options?: ReactiveSocketRouterOptions) {} @@ -43,16 +43,6 @@ export class ReactiveSocketRouter { server.on('upgrade', (request, socket, head) => { wss.handleUpgrade(request, socket as any, head, (ws) => { - const originalSend = ws.send.bind(ws); - ws.send = (...args) => { - // Work around for this issue - // https://github.com/websockets/ws/issues/1515 - if (ws.readyState == ws.CLOSING || ws.readyState == ws.CLOSED) { - return; - } - // @ts-expect-error the overloaded function causes type issues which should be fine in this case - return originalSend(...args); - }; wss.emit('connection', ws, request); }); }); diff --git a/packages/rsocket-router/src/router/transport/WebSocketServerTransport.ts b/packages/rsocket-router/src/router/transport/WebSocketServerTransport.ts new file mode 100644 index 00000000..40e6338c --- /dev/null +++ b/packages/rsocket-router/src/router/transport/WebSocketServerTransport.ts @@ -0,0 +1,130 @@ +/* + * Adapted from https://github.com/rsocket/rsocket-js/blob/1.0.x-alpha/packages/rsocket-websocket-client/src/WebsocketClientTransport.ts + * Copyright 2021-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { + Closeable, + Deferred, + Demultiplexer, + DuplexConnection, + Frame, + FrameHandler, + Multiplexer, + Outbound, + ServerTransport +} from 'rsocket-core'; +import * as WebSocket from 'ws'; +import { WebsocketDuplexConnection } from './WebsocketDuplexConnection.js'; +import * as micro from '@journeyapps-platform/micro'; + +export type SocketFactory = (options: SocketOptions) => WebSocket.WebSocketServer; + +export type SocketOptions = { + host?: string; + port?: number; +}; + +export type ServerOptions = SocketOptions & { + wsCreator?: SocketFactory; + debug?: boolean; +}; + +const defaultFactory: SocketFactory = (options: SocketOptions) => { + return new WebSocket.WebSocketServer({ + host: options.host, + port: options.port + }); +}; + +export class WebsocketServerTransport implements ServerTransport { + private readonly host: string | undefined; + private readonly port: number | undefined; + private readonly factory: SocketFactory; + + constructor(options: ServerOptions) { + this.host = options.host; + this.port = options.port; + this.factory = options.wsCreator ?? defaultFactory; + } + + async bind( + connectionAcceptor: (frame: Frame, connection: DuplexConnection) => Promise, + multiplexerDemultiplexerFactory: ( + frame: Frame, + outbound: Outbound & Closeable + ) => Multiplexer & Demultiplexer & FrameHandler + ): Promise { + const websocketServer: WebSocket.WebSocketServer = await this.connectServer(); + const serverCloseable = new ServerCloseable(websocketServer); + + const connectionListener = (websocket: WebSocket.WebSocket) => { + try { + websocket.binaryType = 'nodebuffer'; + const duplex = WebSocket.createWebSocketStream(websocket); + WebsocketDuplexConnection.create(duplex, connectionAcceptor, multiplexerDemultiplexerFactory, websocket); + } catch (ex) { + micro.logger.error(`Could not create duplex connection`, ex); + if (websocket.readyState == websocket.OPEN) { + websocket.close(); + } + } + }; + + const closeListener = (error?: Error) => { + serverCloseable.close(error); + }; + + websocketServer.addListener('connection', connectionListener); + websocketServer.addListener('close', closeListener); + websocketServer.addListener('error', closeListener); + + return serverCloseable; + } + + private connectServer(): Promise { + return new Promise((resolve, reject) => { + const websocketServer = this.factory({ + host: this.host, + port: this.port + }); + + const earlyCloseListener = (error?: Error) => { + reject(error); + }; + + websocketServer.addListener('close', earlyCloseListener); + websocketServer.addListener('error', earlyCloseListener); + websocketServer.addListener('listening', () => resolve(websocketServer)); + }); + } +} + +class ServerCloseable extends Deferred { + constructor(private readonly server: WebSocket.WebSocketServer) { + super(); + } + + close(error?: Error) { + if (this.done) { + super.close(error); + return; + } + + // For this package's use case the server is externally closed + + super.close(); + } +} diff --git a/packages/rsocket-router/src/router/transport/WebsocketDuplexConnection.ts b/packages/rsocket-router/src/router/transport/WebsocketDuplexConnection.ts new file mode 100644 index 00000000..f99e5b23 --- /dev/null +++ b/packages/rsocket-router/src/router/transport/WebsocketDuplexConnection.ts @@ -0,0 +1,164 @@ +/* + * Adapted from https://github.com/rsocket/rsocket-js/blob/1.0.x-alpha/packages/rsocket-websocket-client/src/WebsocketDuplexConnection.ts + * Copyright 2021-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as micro from '@journeyapps-platform/micro'; +import { + Closeable, + Deferred, + Demultiplexer, + deserializeFrame, + DuplexConnection, + Frame, + FrameHandler, + Multiplexer, + Outbound, + serializeFrame +} from 'rsocket-core'; +import { Duplex } from 'stream'; +import WebSocket from 'ws'; + +export class WebsocketDuplexConnection extends Deferred implements DuplexConnection, Outbound { + readonly multiplexerDemultiplexer: Multiplexer & Demultiplexer & FrameHandler; + + constructor( + private websocketDuplex: Duplex, + frame: Frame, + multiplexerDemultiplexerFactory: ( + frame: Frame, + outbound: Outbound & Closeable + ) => Multiplexer & Demultiplexer & FrameHandler, + private rawSocket: WebSocket.WebSocket + ) { + super(); + + websocketDuplex.on('close', this.handleClosed); + websocketDuplex.on('error', this.handleError); + websocketDuplex.on('data', this.handleMessage); + + this.multiplexerDemultiplexer = multiplexerDemultiplexerFactory(frame, this); + } + + get availability(): number { + return this.websocketDuplex.destroyed ? 0 : 1; + } + + close(error?: Error) { + if (this.done) { + super.close(error); + return; + } + + this.websocketDuplex.removeAllListeners(); + this.websocketDuplex.end(); + + super.close(error); + } + + send(frame: Frame): void { + if (this.done) { + return; + } + + try { + const buffer = serializeFrame(frame); + // Work around for this issue + // https://github.com/websockets/ws/issues/1515 + if (this.rawSocket.readyState == this.rawSocket.CLOSING || this.rawSocket.readyState == this.rawSocket.CLOSED) { + this.close(new Error('WebSocket is closing')); + return; + } + + this.websocketDuplex.write(buffer, (error: Error | null | undefined) => { + /** + * This callback will fire during the first write that the raw socket changes to the closing state. + * If any subsequent write calls are made, it will not fire. This will be caught above. + * */ + this.close(new Error(error?.message || `Could not write to WebSocket duplex connection: ${error}`)); + return true; + }); + } catch (ex) { + this.close(new Error(ex.reason || `Could not write to WebSocket duplex connection: ${ex}`)); + } + } + + private handleClosed = (e: WebSocket.CloseEvent): void => { + this.close(new Error(e.reason || 'WebsocketDuplexConnection: Socket closed unexpectedly.')); + }; + + private handleError = (e: WebSocket.ErrorEvent): void => { + micro.logger.error(`Error in WebSocket duplex connection: ${e}`); + this.close(e.error); + }; + + private handleMessage = (buffer: Buffer): void => { + try { + const frame = deserializeFrame(buffer); + this.multiplexerDemultiplexer.handle(frame); + } catch (error) { + this.close(error); + } + }; + + static create( + socket: Duplex, + connectionAcceptor: (frame: Frame, connection: DuplexConnection) => Promise, + multiplexerDemultiplexerFactory: ( + frame: Frame, + outbound: Outbound & Closeable + ) => Multiplexer & Demultiplexer & FrameHandler, + rawSocket: WebSocket.WebSocket + ): void { + /** + * Closes the Duplex socket stream and raw socket + */ + const closeSocket = () => { + rawSocket.close(); + socket.end(); + }; + socket.once('data', async (buffer) => { + if (!buffer || !Buffer.isBuffer(buffer)) { + micro.logger.info(`Received invalid initial frame buffer. Skipping connection request.`); + return closeSocket(); + } + + let frame: Frame | undefined = undefined; + try { + frame = deserializeFrame(buffer); + if (!frame) { + throw new Error(`Unable to deserialize frame`); + } + } catch (ex) { + micro.logger.info(`Received error deserializing initial frame buffer. Skipping connection request.`, ex); + // The initial frame should always be parsable + return closeSocket(); + } + + const connection = new WebsocketDuplexConnection(socket, frame, multiplexerDemultiplexerFactory, rawSocket); + if (connection.done) { + return; + } + try { + socket.pause(); + await connectionAcceptor(frame, connection); + socket.resume(); + } catch (error) { + micro.logger.info(`Error accepting connection:`, error); + connection.close(error); + } + }); + } +} diff --git a/packages/rsocket-router/tests/src/requests.test.ts b/packages/rsocket-router/tests/src/requests.test.ts index af103f11..4594a9ae 100644 --- a/packages/rsocket-router/tests/src/requests.test.ts +++ b/packages/rsocket-router/tests/src/requests.test.ts @@ -1,4 +1,5 @@ -import { expect, it, vi } from 'vitest'; +import '@journeyapps-platform/micro/register'; +import { describe, expect, it, vi } from 'vitest'; import { createMockObserver, createMockResponder } from './utils/mock-responder.js'; import { handleReactiveStream } from '../../src/router/ReactiveSocketRouter.js'; import { deserialize, serialize } from 'bson'; @@ -32,101 +33,103 @@ async function handleRoute(path: string, endpoints: ReactiveEndpoint[], responde ); } -it('should get successful response from route', async () => { - const responder = createMockResponder(); - const spy = vi.spyOn(responder, 'onNext'); - - const path = '/test-route'; - - await handleRoute( - path, - [ - { - path, - type: RS_ENDPOINT_TYPE.STREAM, - handler: async (p) => { - // Send data to client - p.responder.onNext({ data: Buffer.from(serialize({})) }, true); +describe('Requests', () => { + it('should get successful response from route', async () => { + const responder = createMockResponder(); + const spy = vi.spyOn(responder, 'onNext'); + + const path = '/test-route'; + + await handleRoute( + path, + [ + { + path, + type: RS_ENDPOINT_TYPE.STREAM, + handler: async (p) => { + // Send data to client + p.responder.onNext({ data: Buffer.from(serialize({})) }, true); + } } - } - ], - responder - ); - - // The onNext() method should have been called to send data to client - expect(spy).toHaveBeenCalledTimes(1); -}); - -it('should get validation error response from route', async () => { - const responder = createMockResponder(); - const spy = vi.spyOn(responder, 'onError'); - - const path = '/test-route'; - - const validationError = 'Test validation error'; - - await handleRoute( - path, - [ - { - path, - type: RS_ENDPOINT_TYPE.STREAM, - handler: async () => {}, - // This will always return an invalid error - validator: { - validate: () => { + ], + responder + ); + + // The onNext() method should have been called to send data to client + expect(spy).toHaveBeenCalledTimes(1); + }); + + it('should get validation error response from route', async () => { + const responder = createMockResponder(); + const spy = vi.spyOn(responder, 'onError'); + + const path = '/test-route'; + + const validationError = 'Test validation error'; + + await handleRoute( + path, + [ + { + path, + type: RS_ENDPOINT_TYPE.STREAM, + handler: async () => {}, + // This will always return an invalid error + validator: { + validate: () => { + return { + valid: false, + errors: [validationError] + }; + } + } + } + ], + responder + ); + + // Should be a validation error + expect(JSON.stringify(spy.mock.calls[0])).includes(validationError); + }); + + it('should get authorization error response from route', async () => { + const responder = createMockResponder(); + const spy = vi.spyOn(responder, 'onError'); + + const path = '/test-route'; + + await handleRoute( + path, + [ + { + path, + type: RS_ENDPOINT_TYPE.STREAM, + handler: async () => {}, + // This will always return unauthorized + authorize: async () => { return { - valid: false, - errors: [validationError] + authorized: false }; } } - } - ], - responder - ); - - // Should be a validation error - expect(JSON.stringify(spy.mock.calls[0])).includes(validationError); -}); - -it('should get authorization error response from route', async () => { - const responder = createMockResponder(); - const spy = vi.spyOn(responder, 'onError'); - - const path = '/test-route'; - - await handleRoute( - path, - [ - { - path, - type: RS_ENDPOINT_TYPE.STREAM, - handler: async () => {}, - // This will always return unauthorized - authorize: async () => { - return { - authorized: false - }; - } - } - ], - responder - ); + ], + responder + ); - // Should be a validation error - expect(JSON.stringify(spy.mock.calls[0])).includes('AUTHORIZATION'); -}); + // Should be a validation error + expect(JSON.stringify(spy.mock.calls[0])).includes('AUTHORIZATION'); + }); -it('should get invalid route error', async () => { - const responder = createMockResponder(); - const spy = vi.spyOn(responder, 'onError'); + it('should get invalid route error', async () => { + const responder = createMockResponder(); + const spy = vi.spyOn(responder, 'onError'); - const path = '/test-route'; + const path = '/test-route'; - // Providing no endpoints means there won't be any matching route - await handleRoute(path, [], responder); + // Providing no endpoints means there won't be any matching route + await handleRoute(path, [], responder); - // Should be a validation error - expect(JSON.stringify(spy.mock.calls[0])).includes('No route'); + // Should be a validation error + expect(JSON.stringify(spy.mock.calls[0])).includes('No route'); + }); }); diff --git a/packages/rsocket-router/tests/src/socket.test.ts b/packages/rsocket-router/tests/src/socket.test.ts new file mode 100644 index 00000000..2df03c83 --- /dev/null +++ b/packages/rsocket-router/tests/src/socket.test.ts @@ -0,0 +1,188 @@ +import '@journeyapps-platform/micro/register'; +import * as WebSocket from 'ws'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { RSocketConnector, RSocketServer } from 'rsocket-core'; +import { WebsocketClientTransport } from 'rsocket-websocket-client'; + +import { WebsocketServerTransport } from '../../src/router/transport/WebSocketServerTransport.js'; +import { WebsocketDuplexConnection } from '../../src/router/transport/WebsocketDuplexConnection.js'; +import { Duplex } from 'stream'; + +const WS_PORT = process.env.WS_PORT ? parseInt(process.env.WS_PORT) : 4532; +const WS_ADDRESS = `ws://localhost:${WS_PORT}`; + +describe('Sockets', () => { + let server: WebSocket.WebSocketServer; + let closeServer: () => void; + + beforeEach(() => { + let closed = false; + server = new WebSocket.WebSocketServer({ + port: WS_PORT + }); + + /** + * The server doesn't have a closed status. + * Some tests involve closing while others can be closed + * after each test. This method should prevent double closing. + */ + closeServer = () => { + if (closed) { + return; + } + server.close(); + closed = true; + }; + }); + + afterEach(() => { + closeServer(); + }); + + it('should only not close a server that is managed externally', async () => { + const transport = new WebsocketServerTransport({ + wsCreator: () => server + }); + + const rSocketServer = new RSocketServer({ + transport, + acceptor: { + accept: async () => { + return {}; + } + } + }); + + const closer = await rSocketServer.bind(); + + const closeableSpy = vi.spyOn(closer, 'close'); + + // Register a listener for when the RSocketServer has been closed + const isClosedPromise = new Promise((resolve) => { + closer.onClose(() => resolve()); + }); + + // This will be triggered externally when the HTTP(s) server closes + // linked to the internal WS server. + closeServer(); + await isClosedPromise; + expect(closeableSpy).toBeCalledTimes(1); + }); + + /** + * Anyone can connect to the WebSocket port and send any data. Frame decoding should handle + * invalid WebSocket data events. + */ + it('should handle incorrect initial frames', async () => { + const transport = new WebsocketServerTransport({ + wsCreator: () => server + }); + + const rSocketServer = new RSocketServer({ + transport, + acceptor: { + accept: async () => { + return {}; + } + } + }); + + await rSocketServer.bind(); + + const duplexSpy = vi.spyOn(WebsocketDuplexConnection, 'create'); + + // Connect a client WebSocket to the server + const client = new WebSocket.WebSocket(WS_ADDRESS); + await new Promise((resolve) => { + client.once('open', () => resolve()); + }); + + /** + * The connection should be closed if the client sends random data instead + * of a valid frame + */ + client.send('random text'); + + // it should try to create a duplex socket, but fail + await vi.waitFor(() => expect(duplexSpy.mock.calls.length).equals(1), { timeout: 3000 }); + + // It should perform cleanup. Sockets should be closed + const duplex: Duplex = duplexSpy.mock.calls[0][0]; + const rawSocket: WebSocket.WebSocket = duplexSpy.mock.calls[0][3]; + await vi.waitFor(() => expect(duplex.closed).equals(true), { timeout: 3000 }); + await vi.waitFor(() => expect(rawSocket.readyState).equals(rawSocket.CLOSED), { timeout: 3000 }); + }); + + /** + * The server should handle cases where the client closes the WebSocket connection + * at any point in the handshaking process. This test will create 100 connections which + * have their socket closed as soon as the connection has started. The standard RSocket + * WebSocket transport and Duplex connection will throw unhandled exceptions in this case. + * This package's custom implementations should handle exceptions correctly. + */ + it('should handle closed client connections correctly', async () => { + const transport = new WebsocketServerTransport({ + wsCreator: () => server + }); + + // Create a simple server which will spam a lot of data to any connection + const rSocketServer = new RSocketServer({ + transport, + acceptor: { + accept: async () => { + return { + requestStream: (payload, initialN, responder) => { + let stop = false; + + setImmediate(async () => { + while (!stop) { + responder.onNext({ data: Buffer.from('some payload') }, false); + await new Promise((r) => setTimeout(r, 10)); + } + }); + return { + request: () => {}, + onExtension: () => {}, + cancel: () => { + stop = true; + } + }; + } + }; + } + } + }); + rSocketServer.bind(); + + // Try and connect 100 times, closing the socket as soon as it is available + for (let i = 0; i < 100; i++) { + const testSocket = new WebSocket.WebSocket(WS_ADDRESS); + + const connector = new RSocketConnector({ + transport: new WebsocketClientTransport({ + url: WS_ADDRESS, + wsCreator: (url) => testSocket + }), + + setup: { + dataMimeType: 'application/bson', + metadataMimeType: 'application/bson', + payload: { + data: null + } + } + }); + + const connection = await connector.connect(); + connection.requestStream({ data: null }, 1, { + onNext(payload, isComplete) {}, + onComplete: () => {}, + onExtension: () => {}, + onError: () => {} + }); + + // The socket closing here should not throw any unhandled errors + testSocket.close(); + } + }); +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 9bc72e61..d68e70fb 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -105,8 +105,8 @@ importers: specifier: ^9.0.1 version: 9.0.1 ws: - specifier: ~8.2.3 - version: 8.2.3 + specifier: ^8.17.0 + version: 8.17.0 devDependencies: '@types/uuid': specifier: ^9.0.4 @@ -117,6 +117,9 @@ importers: bson: specifier: ^6.6.0 version: 6.7.0 + rsocket-websocket-client: + specifier: 1.0.0-alpha.3 + version: 1.0.0-alpha.3 typescript: specifier: ^5.2.2 version: 5.4.5 From fcdebdba7ff806ed55d69449d10140b52ab74e44 Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Mon, 10 Jun 2024 16:42:28 +0200 Subject: [PATCH 3/8] service probes --- .gitignore | 1 - service/.probes/.gitkeep | 0 2 files changed, 1 deletion(-) create mode 100644 service/.probes/.gitkeep diff --git a/.gitignore b/.gitignore index cf582c52..d7357216 100644 --- a/.gitignore +++ b/.gitignore @@ -14,7 +14,6 @@ dist npm-error.log .pnpm-debug.log .local-dev -.probes packages/*/manifest.json diff --git a/service/.probes/.gitkeep b/service/.probes/.gitkeep new file mode 100644 index 00000000..e69de29b From ee900d8783c0f4ff2532546daf1c724bb5fad7ee Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Mon, 10 Jun 2024 16:42:52 +0200 Subject: [PATCH 4/8] probes ignore fix --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index d7357216..63c24cd3 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,8 @@ dist npm-error.log .pnpm-debug.log .local-dev +.probes + packages/*/manifest.json From 5c3b5b0c513ec7854162c2a82cdf33cb89dfbd5b Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Mon, 10 Jun 2024 17:09:14 +0200 Subject: [PATCH 5/8] afterWrite error handle --- .../router/transport/WebsocketDuplexConnection.ts | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/packages/rsocket-router/src/router/transport/WebsocketDuplexConnection.ts b/packages/rsocket-router/src/router/transport/WebsocketDuplexConnection.ts index f99e5b23..4ed5a7aa 100644 --- a/packages/rsocket-router/src/router/transport/WebsocketDuplexConnection.ts +++ b/packages/rsocket-router/src/router/transport/WebsocketDuplexConnection.ts @@ -82,13 +82,14 @@ export class WebsocketDuplexConnection extends Deferred implements DuplexConnect return; } - this.websocketDuplex.write(buffer, (error: Error | null | undefined) => { - /** - * This callback will fire during the first write that the raw socket changes to the closing state. - * If any subsequent write calls are made, it will not fire. This will be caught above. - * */ - this.close(new Error(error?.message || `Could not write to WebSocket duplex connection: ${error}`)); - return true; + this.websocketDuplex.write(buffer, undefined, (error: Error | null | undefined) => { + if (error) { + /** + * This callback will fire during the first write that the raw socket changes to the closing state. + * If any subsequent write calls are made, it will not fire. This will be caught above. + * */ + this.close(new Error(error?.message || `Could not write to WebSocket duplex connection: ${error}`)); + } }); } catch (ex) { this.close(new Error(ex.reason || `Could not write to WebSocket duplex connection: ${ex}`)); From 0fdf2f636736c0460267466b7a2024d761702984 Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Mon, 10 Jun 2024 17:33:07 +0200 Subject: [PATCH 6/8] remove websocket package dependency --- packages/rsocket-router/package.json | 1 - pnpm-lock.yaml | 14 -------------- 2 files changed, 15 deletions(-) diff --git a/packages/rsocket-router/package.json b/packages/rsocket-router/package.json index 53d1d2ba..74d3f294 100644 --- a/packages/rsocket-router/package.json +++ b/packages/rsocket-router/package.json @@ -20,7 +20,6 @@ "dependencies": { "@journeyapps-platform/micro": "^17.0.1", "rsocket-core": "1.0.0-alpha.3", - "rsocket-websocket-server": "1.0.0-alpha.3", "ts-codec": "^1.2.2", "uuid": "^9.0.1", "ws": "^8.17.0" diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d68e70fb..226b12ee 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -95,9 +95,6 @@ importers: rsocket-core: specifier: 1.0.0-alpha.3 version: 1.0.0-alpha.3 - rsocket-websocket-server: - specifier: 1.0.0-alpha.3 - version: 1.0.0-alpha.3 ts-codec: specifier: ^1.2.2 version: 1.2.2 @@ -3824,9 +3821,6 @@ packages: rsocket-websocket-client@1.0.0-alpha.3: resolution: {integrity: sha512-CwTwTNMGa8BKvrWde/kM3q8IHuzO8RCIfzuj25BsVe9y8eehDQHt4fXk0g1i/wpsxTm+RY6DxE6Vr5snozKVOg==} - rsocket-websocket-server@1.0.0-alpha.3: - resolution: {integrity: sha512-x1Z4lZTEO+Fjt3xDfEDLPrP76Lmw2AW50UZynHEsB9PX1uDhTG5QDzPATiGbNoT7ihJVtb7YQftRIGUG8cC1aQ==} - run-async@2.4.1: resolution: {integrity: sha512-tvVnVv01b8c1RrA6Ep7JkStj85Guv/YrMcwqYQnwjsAS2cTmmPGBBjAjpCW7RrSodNSoE2/qg9O4bceNvUuDgQ==} engines: {node: '>=0.12.0'} @@ -8909,14 +8903,6 @@ snapshots: dependencies: rsocket-core: 1.0.0-alpha.3 - rsocket-websocket-server@1.0.0-alpha.3: - dependencies: - rsocket-core: 1.0.0-alpha.3 - ws: 8.2.3 - transitivePeerDependencies: - - bufferutil - - utf-8-validate - run-async@2.4.1: {} run-async@3.0.0: {} From 10bba5c3b8a4af9cbefa6739dace4f20b71aa5be Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Tue, 11 Jun 2024 10:55:06 +0200 Subject: [PATCH 7/8] update logic: should be able to check socket status before write only. Closing a Duplex stream will close raw socket --- .../transport/WebsocketDuplexConnection.ts | 24 ++----------------- 1 file changed, 2 insertions(+), 22 deletions(-) diff --git a/packages/rsocket-router/src/router/transport/WebsocketDuplexConnection.ts b/packages/rsocket-router/src/router/transport/WebsocketDuplexConnection.ts index 4ed5a7aa..665789e1 100644 --- a/packages/rsocket-router/src/router/transport/WebsocketDuplexConnection.ts +++ b/packages/rsocket-router/src/router/transport/WebsocketDuplexConnection.ts @@ -82,15 +82,7 @@ export class WebsocketDuplexConnection extends Deferred implements DuplexConnect return; } - this.websocketDuplex.write(buffer, undefined, (error: Error | null | undefined) => { - if (error) { - /** - * This callback will fire during the first write that the raw socket changes to the closing state. - * If any subsequent write calls are made, it will not fire. This will be caught above. - * */ - this.close(new Error(error?.message || `Could not write to WebSocket duplex connection: ${error}`)); - } - }); + this.websocketDuplex.write(buffer); } catch (ex) { this.close(new Error(ex.reason || `Could not write to WebSocket duplex connection: ${ex}`)); } @@ -123,19 +115,7 @@ export class WebsocketDuplexConnection extends Deferred implements DuplexConnect ) => Multiplexer & Demultiplexer & FrameHandler, rawSocket: WebSocket.WebSocket ): void { - /** - * Closes the Duplex socket stream and raw socket - */ - const closeSocket = () => { - rawSocket.close(); - socket.end(); - }; socket.once('data', async (buffer) => { - if (!buffer || !Buffer.isBuffer(buffer)) { - micro.logger.info(`Received invalid initial frame buffer. Skipping connection request.`); - return closeSocket(); - } - let frame: Frame | undefined = undefined; try { frame = deserializeFrame(buffer); @@ -145,7 +125,7 @@ export class WebsocketDuplexConnection extends Deferred implements DuplexConnect } catch (ex) { micro.logger.info(`Received error deserializing initial frame buffer. Skipping connection request.`, ex); // The initial frame should always be parsable - return closeSocket(); + return socket.end(); } const connection = new WebsocketDuplexConnection(socket, frame, multiplexerDemultiplexerFactory, rawSocket); From 7bce743987df001842c405d62c84b4cdee816258 Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Tue, 11 Jun 2024 16:08:12 +0200 Subject: [PATCH 8/8] test commit --- packages/rsocket-router/tests/src/socket.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/rsocket-router/tests/src/socket.test.ts b/packages/rsocket-router/tests/src/socket.test.ts index 2df03c83..689f5d49 100644 --- a/packages/rsocket-router/tests/src/socket.test.ts +++ b/packages/rsocket-router/tests/src/socket.test.ts @@ -175,7 +175,7 @@ describe('Sockets', () => { const connection = await connector.connect(); connection.requestStream({ data: null }, 1, { - onNext(payload, isComplete) {}, + onNext() {}, onComplete: () => {}, onExtension: () => {}, onError: () => {}