Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] WebSocket Close Bugs #11

Merged
merged 9 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/happy-flies-tease.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-core': patch
---

Fix missing authentication errors for websocket sync stream requests
5 changes: 5 additions & 0 deletions .changeset/smooth-frogs-wait.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-rsocket-router': patch
---

Fix issue where sending data during socket closing would throw an exception.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ npm-error.log
.local-dev
.probes


packages/*/manifest.json

.clinic
6 changes: 3 additions & 3 deletions packages/rsocket-router/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@
"test": "vitest"
},
"dependencies": {
"rsocket-core": "1.0.0-alpha.3",
"rsocket-websocket-server": "1.0.0-alpha.3",
"@journeyapps-platform/micro": "^17.0.1",
"rsocket-core": "1.0.0-alpha.3",
"ts-codec": "^1.2.2",
"uuid": "^9.0.1",
"ws": "~8.2.3"
"ws": "^8.17.0"
},
"devDependencies": {
"@types/uuid": "^9.0.4",
"@types/ws": "~8.2.0",
"bson": "^6.6.0",
"rsocket-websocket-client": "1.0.0-alpha.3",
"typescript": "^5.2.2",
"vitest": "^0.34.6"
}
Expand Down
2 changes: 1 addition & 1 deletion packages/rsocket-router/src/router/ReactiveSocketRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import * as micro from '@journeyapps-platform/micro';
import * as http from 'http';
import { Payload, RSocketServer } from 'rsocket-core';
import { WebsocketServerTransport } from 'rsocket-websocket-server';
import * as ws from 'ws';
import { SocketRouterObserver } from './SocketRouterListener.js';
import {
Expand All @@ -17,6 +16,7 @@ import {
ReactiveSocketRouterOptions,
SocketResponder
} from './types.js';
import { WebsocketServerTransport } from './transport/WebSocketServerTransport.js';

export class ReactiveSocketRouter<C> {
constructor(protected options?: ReactiveSocketRouterOptions<C>) {}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Adapted from https://github.com/rsocket/rsocket-js/blob/1.0.x-alpha/packages/rsocket-websocket-client/src/WebsocketClientTransport.ts
* Copyright 2021-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {
Closeable,
Deferred,
Demultiplexer,
DuplexConnection,
Frame,
FrameHandler,
Multiplexer,
Outbound,
ServerTransport
} from 'rsocket-core';
import * as WebSocket from 'ws';
import { WebsocketDuplexConnection } from './WebsocketDuplexConnection.js';
import * as micro from '@journeyapps-platform/micro';

export type SocketFactory = (options: SocketOptions) => WebSocket.WebSocketServer;

export type SocketOptions = {
host?: string;
port?: number;
};

export type ServerOptions = SocketOptions & {
wsCreator?: SocketFactory;
debug?: boolean;
};

const defaultFactory: SocketFactory = (options: SocketOptions) => {
return new WebSocket.WebSocketServer({
host: options.host,
port: options.port
});
};

export class WebsocketServerTransport implements ServerTransport {
private readonly host: string | undefined;
private readonly port: number | undefined;
private readonly factory: SocketFactory;

constructor(options: ServerOptions) {
this.host = options.host;
this.port = options.port;
this.factory = options.wsCreator ?? defaultFactory;
}

async bind(
connectionAcceptor: (frame: Frame, connection: DuplexConnection) => Promise<void>,
multiplexerDemultiplexerFactory: (
frame: Frame,
outbound: Outbound & Closeable
) => Multiplexer & Demultiplexer & FrameHandler
): Promise<Closeable> {
const websocketServer: WebSocket.WebSocketServer = await this.connectServer();
const serverCloseable = new ServerCloseable(websocketServer);

const connectionListener = (websocket: WebSocket.WebSocket) => {
try {
websocket.binaryType = 'nodebuffer';
const duplex = WebSocket.createWebSocketStream(websocket);
WebsocketDuplexConnection.create(duplex, connectionAcceptor, multiplexerDemultiplexerFactory, websocket);
} catch (ex) {
micro.logger.error(`Could not create duplex connection`, ex);
if (websocket.readyState == websocket.OPEN) {
websocket.close();
}
}
};

const closeListener = (error?: Error) => {
serverCloseable.close(error);
};

websocketServer.addListener('connection', connectionListener);
websocketServer.addListener('close', closeListener);
websocketServer.addListener('error', closeListener);

return serverCloseable;
}

private connectServer(): Promise<WebSocket.WebSocketServer> {
return new Promise((resolve, reject) => {
const websocketServer = this.factory({
host: this.host,
port: this.port
});

const earlyCloseListener = (error?: Error) => {
reject(error);
};

websocketServer.addListener('close', earlyCloseListener);
websocketServer.addListener('error', earlyCloseListener);
websocketServer.addListener('listening', () => resolve(websocketServer));
});
}
}

class ServerCloseable extends Deferred {
constructor(private readonly server: WebSocket.WebSocketServer) {
super();
}

close(error?: Error) {
if (this.done) {
super.close(error);
return;
}

// For this package's use case the server is externally closed

super.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Adapted from https://github.com/rsocket/rsocket-js/blob/1.0.x-alpha/packages/rsocket-websocket-client/src/WebsocketDuplexConnection.ts
* Copyright 2021-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as micro from '@journeyapps-platform/micro';
import {
Closeable,
Deferred,
Demultiplexer,
deserializeFrame,
DuplexConnection,
Frame,
FrameHandler,
Multiplexer,
Outbound,
serializeFrame
} from 'rsocket-core';
import { Duplex } from 'stream';
import WebSocket from 'ws';

export class WebsocketDuplexConnection extends Deferred implements DuplexConnection, Outbound {
readonly multiplexerDemultiplexer: Multiplexer & Demultiplexer & FrameHandler;

constructor(
private websocketDuplex: Duplex,
frame: Frame,
multiplexerDemultiplexerFactory: (
frame: Frame,
outbound: Outbound & Closeable
) => Multiplexer & Demultiplexer & FrameHandler,
private rawSocket: WebSocket.WebSocket
) {
super();

websocketDuplex.on('close', this.handleClosed);
websocketDuplex.on('error', this.handleError);
websocketDuplex.on('data', this.handleMessage);

this.multiplexerDemultiplexer = multiplexerDemultiplexerFactory(frame, this);
}

get availability(): number {
return this.websocketDuplex.destroyed ? 0 : 1;
}

close(error?: Error) {
if (this.done) {
super.close(error);
return;
}

this.websocketDuplex.removeAllListeners();
this.websocketDuplex.end();

super.close(error);
}

send(frame: Frame): void {
if (this.done) {
return;
}

try {
const buffer = serializeFrame(frame);
// Work around for this issue
// https://github.com/websockets/ws/issues/1515
if (this.rawSocket.readyState == this.rawSocket.CLOSING || this.rawSocket.readyState == this.rawSocket.CLOSED) {
this.close(new Error('WebSocket is closing'));
return;
}

this.websocketDuplex.write(buffer);
} catch (ex) {
this.close(new Error(ex.reason || `Could not write to WebSocket duplex connection: ${ex}`));
}
}

private handleClosed = (e: WebSocket.CloseEvent): void => {
this.close(new Error(e.reason || 'WebsocketDuplexConnection: Socket closed unexpectedly.'));
};

private handleError = (e: WebSocket.ErrorEvent): void => {
micro.logger.error(`Error in WebSocket duplex connection: ${e}`);
this.close(e.error);
};

private handleMessage = (buffer: Buffer): void => {
try {
const frame = deserializeFrame(buffer);
this.multiplexerDemultiplexer.handle(frame);
} catch (error) {
this.close(error);
}
};

static create(
socket: Duplex,
connectionAcceptor: (frame: Frame, connection: DuplexConnection) => Promise<void>,
multiplexerDemultiplexerFactory: (
frame: Frame,
outbound: Outbound & Closeable
) => Multiplexer & Demultiplexer & FrameHandler,
rawSocket: WebSocket.WebSocket
): void {
socket.once('data', async (buffer) => {
let frame: Frame | undefined = undefined;
try {
frame = deserializeFrame(buffer);
if (!frame) {
throw new Error(`Unable to deserialize frame`);
}
} catch (ex) {
micro.logger.info(`Received error deserializing initial frame buffer. Skipping connection request.`, ex);
// The initial frame should always be parsable
return socket.end();
}

const connection = new WebsocketDuplexConnection(socket, frame, multiplexerDemultiplexerFactory, rawSocket);
if (connection.done) {
return;
}
try {
socket.pause();
await connectionAcceptor(frame, connection);
socket.resume();
} catch (error) {
micro.logger.info(`Error accepting connection:`, error);
connection.close(error);
}
});
}
}
Loading
Loading