Skip to content

Commit 3f994ae

Browse files
[Chore] Neaten Routes (#49)
* add utilities for route generation * cleanup imports
1 parent bfc0dc5 commit 3f994ae

File tree

11 files changed

+189
-113
lines changed

11 files changed

+189
-113
lines changed

.changeset/fair-planes-flow.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/service-core': minor
3+
---
4+
5+
Added utility functions for registering routes

packages/rsocket-router/src/router/ReactiveSocketRouter.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
* to expose reactive websocket stream in an interface similar to
44
* other Journey micro routers.
55
*/
6+
import { errors, logger } from '@powersync/lib-services-framework';
67
import * as http from 'http';
78
import { Payload, RSocketServer } from 'rsocket-core';
89
import * as ws from 'ws';
910
import { SocketRouterObserver } from './SocketRouterListener.js';
11+
import { WebsocketServerTransport } from './transport/WebSocketServerTransport.js';
1012
import {
1113
CommonParams,
1214
IReactiveStream,
@@ -15,8 +17,6 @@ import {
1517
ReactiveSocketRouterOptions,
1618
SocketResponder
1719
} from './types.js';
18-
import { WebsocketServerTransport } from './transport/WebSocketServerTransport.js';
19-
import { errors, logger } from '@powersync/lib-services-framework';
2020

2121
export class ReactiveSocketRouter<C> {
2222
constructor(protected options?: ReactiveSocketRouterOptions<C>) {}
@@ -56,6 +56,7 @@ export class ReactiveSocketRouter<C> {
5656
acceptor: {
5757
accept: async (payload) => {
5858
const { max_concurrent_connections } = this.options ?? {};
59+
logger.info(`Currently have ${wss.clients.size} active WebSocket connection(s)`);
5960
// wss.clients.size includes this connection, so we check for greater than
6061
// TODO: Share connection limit between this and http stream connections
6162
if (max_concurrent_connections && wss.clients.size > max_concurrent_connections) {
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import type fastify from 'fastify';
2+
import { registerFastifyRoutes } from './route-register.js';
3+
4+
import * as system from '../system/system-index.js';
5+
6+
import { ADMIN_ROUTES } from './endpoints/admin.js';
7+
import { CHECKPOINT_ROUTES } from './endpoints/checkpointing.js';
8+
import { DEV_ROUTES } from './endpoints/dev.js';
9+
import { SYNC_RULES_ROUTES } from './endpoints/sync-rules.js';
10+
import { SYNC_STREAM_ROUTES } from './endpoints/sync-stream.js';
11+
import { createRequestQueueHook, CreateRequestQueueParams } from './hooks.js';
12+
import { RouteDefinition } from './router.js';
13+
14+
/**
15+
* A list of route definitions to be registered as endpoints.
16+
* Supplied concurrency limits will be applied to the grouped routes.
17+
*/
18+
export type RouteRegistrationOptions = {
19+
routes: RouteDefinition[];
20+
queueOptions: CreateRequestQueueParams;
21+
};
22+
23+
/**
24+
* HTTP routes separated by API and Sync stream categories.
25+
* This allows for separate concurrency limits.
26+
*/
27+
export type RouteDefinitions = {
28+
api?: Partial<RouteRegistrationOptions>;
29+
syncStream?: Partial<RouteRegistrationOptions>;
30+
};
31+
32+
export type FastifyServerConfig = {
33+
system: system.CorePowerSyncSystem;
34+
routes?: RouteDefinitions;
35+
};
36+
37+
export const DEFAULT_ROUTE_OPTIONS = {
38+
api: {
39+
routes: [...ADMIN_ROUTES, ...CHECKPOINT_ROUTES, ...DEV_ROUTES, ...SYNC_RULES_ROUTES],
40+
queueOptions: {
41+
concurrency: 10,
42+
max_queue_depth: 20
43+
}
44+
},
45+
syncStream: {
46+
routes: [...SYNC_STREAM_ROUTES],
47+
queueOptions: {
48+
concurrency: 200,
49+
max_queue_depth: 0
50+
}
51+
}
52+
};
53+
54+
/**
55+
* Registers default routes on a Fastify server. Consumers can optionally configure
56+
* concurrency queue limits or override routes.
57+
*/
58+
export function configureFastifyServer(server: fastify.FastifyInstance, options: FastifyServerConfig) {
59+
const { system, routes = DEFAULT_ROUTE_OPTIONS } = options;
60+
/**
61+
* Fastify creates an encapsulated context for each `.register` call.
62+
* Creating a separate context here to separate the concurrency limits for Admin APIs
63+
* and Sync Streaming routes.
64+
* https://github.com/fastify/fastify/blob/main/docs/Reference/Encapsulation.md
65+
*/
66+
server.register(async function (childContext) {
67+
registerFastifyRoutes(
68+
childContext,
69+
async () => {
70+
return {
71+
user_id: undefined,
72+
system: system
73+
};
74+
},
75+
routes.api?.routes ?? DEFAULT_ROUTE_OPTIONS.api.routes
76+
);
77+
// Limit the active concurrent requests
78+
childContext.addHook(
79+
'onRequest',
80+
createRequestQueueHook(routes.api?.queueOptions ?? DEFAULT_ROUTE_OPTIONS.api.queueOptions)
81+
);
82+
});
83+
84+
// Create a separate context for concurrency queueing
85+
server.register(async function (childContext) {
86+
registerFastifyRoutes(
87+
childContext,
88+
async () => {
89+
return {
90+
user_id: undefined,
91+
system: system
92+
};
93+
},
94+
routes.syncStream?.routes ?? DEFAULT_ROUTE_OPTIONS.syncStream.routes
95+
);
96+
// Limit the active concurrent requests
97+
childContext.addHook(
98+
'onRequest',
99+
createRequestQueueHook(routes.syncStream?.queueOptions ?? DEFAULT_ROUTE_OPTIONS.syncStream.queueOptions)
100+
);
101+
});
102+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import { deserialize } from 'bson';
2+
import * as http from 'http';
3+
4+
import { errors, logger } from '@powersync/lib-services-framework';
5+
import { ReactiveSocketRouter, RSocketRequestMeta } from '@powersync/service-rsocket-router';
6+
7+
import { CorePowerSyncSystem } from '../system/CorePowerSyncSystem.js';
8+
import { generateContext, getTokenFromHeader } from './auth.js';
9+
import { syncStreamReactive } from './endpoints/socket-route.js';
10+
import { RSocketContextMeta, SocketRouteGenerator } from './router-socket.js';
11+
import { Context } from './router.js';
12+
13+
export type RSockerRouterConfig = {
14+
system: CorePowerSyncSystem;
15+
server: http.Server;
16+
routeGenerators?: SocketRouteGenerator[];
17+
};
18+
19+
export const DEFAULT_SOCKET_ROUTES = [syncStreamReactive];
20+
21+
export function configureRSocket(router: ReactiveSocketRouter<Context>, options: RSockerRouterConfig) {
22+
const { routeGenerators = DEFAULT_SOCKET_ROUTES, server, system } = options;
23+
24+
router.applyWebSocketEndpoints(server, {
25+
contextProvider: async (data: Buffer) => {
26+
const { token } = RSocketContextMeta.decode(deserialize(data) as any);
27+
28+
if (!token) {
29+
throw new errors.AuthorizationError('No token provided');
30+
}
31+
32+
try {
33+
const extracted_token = getTokenFromHeader(token);
34+
if (extracted_token != null) {
35+
const { context, errors: token_errors } = await generateContext(system, extracted_token);
36+
if (context?.token_payload == null) {
37+
throw new errors.AuthorizationError(token_errors ?? 'Authentication required');
38+
}
39+
return {
40+
token,
41+
...context,
42+
token_errors: token_errors,
43+
system
44+
};
45+
} else {
46+
throw new errors.AuthorizationError('No token provided');
47+
}
48+
} catch (ex) {
49+
logger.error(ex);
50+
throw ex;
51+
}
52+
},
53+
endpoints: routeGenerators.map((generator) => generator(router)),
54+
metaDecoder: async (meta: Buffer) => {
55+
return RSocketRequestMeta.decode(deserialize(meta) as any);
56+
},
57+
payloadDecoder: async (rawData?: Buffer) => rawData && deserialize(rawData)
58+
});
59+
}

packages/service-core/src/routes/endpoints/socket-route.ts

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,13 @@ import { RequestParameters } from '@powersync/service-sync-rules';
33
import { serialize } from 'bson';
44

55
import { Metrics } from '../../metrics/Metrics.js';
6-
import { streamResponse } from '../../sync/sync.js';
6+
import * as sync from '../../sync/sync-index.js';
77
import * as util from '../../util/util-index.js';
88
import { SocketRouteGenerator } from '../router-socket.js';
99
import { SyncRoutes } from './sync-stream.js';
10-
import { RequestTracker } from '../../sync/RequestTracker.js';
1110

1211
export const syncStreamReactive: SocketRouteGenerator = (router) =>
1312
router.reactiveStream<util.StreamingSyncRequest, any>(SyncRoutes.STREAM, {
14-
authorize: ({ context }) => {
15-
return {
16-
authorized: !!context.token_payload,
17-
errors: ['Authentication required'].concat(context.token_errors ?? [])
18-
};
19-
},
2013
validator: schema.createTsCodecValidator(util.StreamingSyncRequest, { allowAdditional: true }),
2114
handler: async ({ context, params, responder, observer, initialN }) => {
2215
const { system } = context;
@@ -67,9 +60,9 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
6760
});
6861

6962
Metrics.getInstance().concurrent_connections.add(1);
70-
const tracker = new RequestTracker();
63+
const tracker = new sync.RequestTracker();
7164
try {
72-
for await (const data of streamResponse({
65+
for await (const data of sync.streamResponse({
7366
storage,
7467
params: {
7568
...params,

packages/service-core/src/routes/route-register.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
import fastify from 'fastify';
1+
import type fastify from 'fastify';
22

3-
import { errors, router, HTTPMethod, logger } from '@powersync/lib-services-framework';
3+
import { errors, HTTPMethod, logger, router } from '@powersync/lib-services-framework';
44
import { Context, ContextProvider, RequestEndpoint, RequestEndpointHandlerPayload } from './router.js';
55

66
export type FastifyEndpoint<I, O, C> = RequestEndpoint<I, O, C> & {
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1+
import { IReactiveStream, ReactiveSocketRouter } from '@powersync/service-rsocket-router';
12
import * as t from 'ts-codec';
2-
import { ReactiveSocketRouter, IReactiveStream } from '@powersync/service-rsocket-router';
33

44
import { Context } from './router.js';
55

6-
export const RSocketContextMeta = t.object({
7-
token: t.string
8-
});
9-
106
/**
117
* Creates a socket route handler given a router instance
128
*/
139
export type SocketRouteGenerator = (router: ReactiveSocketRouter<Context>) => IReactiveStream;
10+
11+
export const RSocketContextMeta = t.object({
12+
token: t.string
13+
});

packages/service-core/src/routes/router.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ export type RequestEndpointHandlerPayload<
3636
request: Request;
3737
};
3838

39+
export type RouteDefinition<I = any, O = any> = RequestEndpoint<I, O>;
40+
3941
/**
4042
* Helper function for making generics work well when defining routes
4143
*/

packages/service-core/src/routes/routes-index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
export * as auth from './auth.js';
2+
export * from './configure-fastify.js';
3+
export * from './configure-rsocket.js';
24
export * as endpoints from './endpoints/route-endpoints-index.js';
35
export * as hooks from './hooks.js';
46
export * from './route-register.js';
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
export * from './BroadcastIterable.js';
22
export * from './LastValueSink.js';
33
export * from './merge.js';
4+
export * from './RequestTracker.js';
45
export * from './safeRace.js';
56
export * from './sync.js';
67
export * from './util.js';

0 commit comments

Comments
 (0)