Skip to content

Commit 0e1c23b

Browse files
authored
Merge pull request #36 from powersync-ja/fix-concurrency-limit
Fix concurrency limit for websockets
2 parents 1066f86 + 909f71a commit 0e1c23b

File tree

3 files changed

+26
-12
lines changed

3 files changed

+26
-12
lines changed

.changeset/silly-impalas-walk.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@powersync/service-rsocket-router': patch
3+
'@powersync/lib-services-framework': patch
4+
'powersync-open-service': patch
5+
---
6+
7+
Fix concurrent connection limiting for websockets

libs/lib-services/src/errors/framework-errors.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,16 @@ export class JourneyError extends Error {
3030
return input instanceof JourneyError || input?.is_journey_error == true;
3131
}
3232

33+
private static errorMessage(data: ErrorData) {
34+
let message = `[${data.code}] ${data.description}`;
35+
if (data.details) {
36+
message += `\n ${data.details}`;
37+
}
38+
return message;
39+
}
40+
3341
constructor(data: ErrorData) {
34-
super(`[${data.code}] ${data.description}\n ${data.details}`);
42+
super(JourneyError.errorMessage(data));
3543

3644
this.errorData = data;
3745
if (data.stack) {

packages/rsocket-router/src/router/ReactiveSocketRouter.ts

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,7 @@ import { WebsocketServerTransport } from './transport/WebSocketServerTransport.j
1919
import { errors, logger } from '@powersync/lib-services-framework';
2020

2121
export class ReactiveSocketRouter<C> {
22-
protected activeConnections: number;
23-
24-
constructor(protected options?: ReactiveSocketRouterOptions<C>) {
25-
this.activeConnections = 0;
26-
}
22+
constructor(protected options?: ReactiveSocketRouterOptions<C>) {}
2723

2824
reactiveStream<I, O>(path: string, stream: IReactiveStreamInput<I, O, C>): IReactiveStream<I, O, C> {
2925
return {
@@ -60,11 +56,16 @@ export class ReactiveSocketRouter<C> {
6056
acceptor: {
6157
accept: async (payload) => {
6258
const { max_concurrent_connections } = this.options ?? {};
63-
if (max_concurrent_connections && this.activeConnections >= max_concurrent_connections) {
64-
throw new errors.JourneyError({
65-
code: '429',
59+
// wss.clients.size includes this connection, so we check for greater than
60+
// TODO: Share connection limit between this and http stream connections
61+
if (max_concurrent_connections && wss.clients.size > max_concurrent_connections) {
62+
const err = new errors.JourneyError({
63+
status: 429,
64+
code: 'SERVER_BUSY',
6665
description: `Maximum active concurrent connections limit has been reached`
6766
});
67+
logger.warn(err);
68+
throw err;
6869
}
6970

7071
// Throwing an exception in this context will be returned to the client side request
@@ -80,16 +81,14 @@ export class ReactiveSocketRouter<C> {
8081
requestStream: (payload, initialN, responder) => {
8182
const observer = new SocketRouterObserver();
8283

84+
// TODO: Consider limiting the number of active streams per connection to prevent abuse
8385
handleReactiveStream(context, { payload, initialN, responder }, observer, params).catch((ex) => {
8486
logger.error(ex);
8587
responder.onError(ex);
8688
responder.onComplete();
8789
});
88-
89-
this.activeConnections++;
9090
return {
9191
cancel: () => {
92-
this.activeConnections--;
9392
observer.triggerCancel();
9493
},
9594
onExtension: () => observer.triggerExtension(),

0 commit comments

Comments
 (0)