Skip to content

Commit 06c211d

Browse files
committed
improvement(client-container-loader): tighten Signal expectations
Signals specify `type` within message `content` next to inner content that is stringified JSON. Assert that internally (to avoid legacy API change) and leverage. Additionally, clean up system signal typing and type guard to reduce casts.
1 parent 53f528a commit 06c211d

File tree

5 files changed

+119
-35
lines changed

5 files changed

+119
-35
lines changed

packages/loader/container-loader/src/connectionManager.ts

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import {
1414
type ITelemetryBaseProperties,
1515
LogLevel,
1616
} from "@fluidframework/core-interfaces";
17+
import type { JsonString } from "@fluidframework/core-interfaces/internal";
18+
import { JsonStringify } from "@fluidframework/core-interfaces/internal";
1719
import { assert } from "@fluidframework/core-utils/internal";
1820
import type {
1921
ConnectionMode,
@@ -197,6 +199,19 @@ interface IPendingConnection {
197199
connectionMode: ConnectionMode;
198200
}
199201

202+
function assertExpectedSignals(
203+
signals: ISignalMessage[],
204+
): asserts signals is ISignalMessage<{ type: never; content: JsonString<unknown> }>[] {
205+
for (const signal of signals) {
206+
if ("type" in signal) {
207+
throw new Error("Unexpected type in ISignalMessage");
208+
}
209+
if (typeof signal.content !== "string") {
210+
throw new TypeError("Non-string content in ISignalMessage");
211+
}
212+
}
213+
}
214+
200215
/**
201216
* Implementation of IConnectionManager, used by Container class
202217
* Implements constant connectivity to relay service, by reconnecting in case of lost connection or error.
@@ -1003,29 +1018,29 @@ export class ConnectionManager implements IConnectionManager {
10031018
// Synthesize clear & join signals out of initialClients state.
10041019
// This allows us to have single way to process signals, and makes it simpler to initialize
10051020
// protocol in Container.
1006-
const clearSignal: ISignalMessage = {
1021+
const clearSignal = {
10071022
// API uses null
10081023
// eslint-disable-next-line unicorn/no-null
10091024
clientId: null, // system message
1010-
content: JSON.stringify({
1025+
content: JsonStringify({
10111026
type: SignalType.Clear,
10121027
}),
10131028
};
10141029

10151030
// list of signals to process due to this new connection
1016-
let signalsToProcess: ISignalMessage[] = [clearSignal];
1017-
1018-
const clientJoinSignals: ISignalMessage[] = (connection.initialClients ?? []).map(
1019-
(priorClient) => ({
1020-
// API uses null
1021-
// eslint-disable-next-line unicorn/no-null
1022-
clientId: null, // system signal
1023-
content: JSON.stringify({
1024-
type: SignalType.ClientJoin,
1025-
content: priorClient, // ISignalClient
1026-
}),
1031+
let signalsToProcess: ISignalMessage<{ type: never; content: JsonString<unknown> }>[] = [
1032+
clearSignal,
1033+
];
1034+
1035+
const clientJoinSignals = (connection.initialClients ?? []).map((priorClient) => ({
1036+
// API uses null
1037+
// eslint-disable-next-line unicorn/no-null
1038+
clientId: null, // system signal
1039+
content: JsonStringify({
1040+
type: SignalType.ClientJoin,
1041+
content: priorClient, // ISignalClient
10271042
}),
1028-
);
1043+
}));
10291044
if (clientJoinSignals.length > 0) {
10301045
signalsToProcess = [...signalsToProcess, ...clientJoinSignals];
10311046
}
@@ -1035,6 +1050,7 @@ export class ConnectionManager implements IConnectionManager {
10351050
// for "self" and connection.initialClients does not contain "self", so we have to process them after
10361051
// "clear" signal above.
10371052
if (connection.initialSignals !== undefined && connection.initialSignals.length > 0) {
1053+
assertExpectedSignals(connection.initialSignals);
10381054
signalsToProcess = [...signalsToProcess, ...connection.initialSignals];
10391055
}
10401056

@@ -1261,6 +1277,7 @@ export class ConnectionManager implements IConnectionManager {
12611277

12621278
private readonly signalHandler = (signalsArg: ISignalMessage | ISignalMessage[]): void => {
12631279
const signals = Array.isArray(signalsArg) ? signalsArg : [signalsArg];
1280+
assertExpectedSignals(signals);
12641281
this.props.signalHandler(signals);
12651282
};
12661283

packages/loader/container-loader/src/container.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,10 @@ import { NoopHeuristic } from "./noopHeuristic.js";
140140
import { pkgVersion } from "./packageVersion.js";
141141
import type { IQuorumSnapshot } from "./protocol/index.js";
142142
import {
143-
type IProtocolHandler,
143+
type InternalProtocolHandlerBuilder,
144144
ProtocolHandler,
145145
type ProtocolHandlerBuilder,
146+
type ProtocolHandlerInternal,
146147
protocolHandlerShouldProcessSignal,
147148
} from "./protocol.js";
148149
import { initQuorumValuesFromCodeDetails } from "./quorum.js";
@@ -495,7 +496,7 @@ export class Container
495496
private readonly scope: FluidObject;
496497
private readonly subLogger: ITelemetryLoggerExt;
497498
private readonly detachedBlobStorage: MemoryDetachedBlobStorage | undefined;
498-
private readonly protocolHandlerBuilder: ProtocolHandlerBuilder;
499+
private readonly protocolHandlerBuilder: InternalProtocolHandlerBuilder;
499500
private readonly client: IClient;
500501

501502
private readonly mc: MonitoringContext;
@@ -597,8 +598,8 @@ export class Container
597598
}
598599
return this._runtime;
599600
}
600-
private _protocolHandler: IProtocolHandler | undefined;
601-
private get protocolHandler(): IProtocolHandler {
601+
private _protocolHandler: ProtocolHandlerInternal | undefined;
602+
private get protocolHandler(): ProtocolHandlerInternal {
602603
if (this._protocolHandler === undefined) {
603604
throw new Error("Attempted to access protocolHandler before it was defined");
604605
}
@@ -830,7 +831,7 @@ export class Container
830831
attributes: IDocumentAttributes,
831832
quorumSnapshot: IQuorumSnapshot,
832833
sendProposal: (key: string, value: unknown) => number,
833-
): ProtocolHandler =>
834+
): ProtocolHandlerInternal =>
834835
new ProtocolHandler(
835836
attributes,
836837
quorumSnapshot,

packages/loader/container-loader/src/contracts.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
type IConnectionDetails,
1313
} from "@fluidframework/container-definitions/internal";
1414
import type { IErrorBase, ITelemetryBaseProperties } from "@fluidframework/core-interfaces";
15+
import type { JsonString } from "@fluidframework/core-interfaces/internal";
1516
import type { ConnectionMode, IClientDetails } from "@fluidframework/driver-definitions";
1617
import type {
1718
IContainerPackageInfo,
@@ -145,7 +146,9 @@ export interface IConnectionManagerFactoryArgs {
145146
* Called by connection manager for each incoming signal.
146147
* May be called before connectHandler is called (due to initial signals on socket connection)
147148
*/
148-
readonly signalHandler: (signals: ISignalMessage[]) => void;
149+
readonly signalHandler: (
150+
signals: ISignalMessage<{ type: never; content: JsonString<unknown> }>[],
151+
) => void;
149152

150153
/**
151154
* Called when connection manager experiences delay in connecting to relay service.

packages/loader/container-loader/src/deltaManager.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ import type {
1616
ITelemetryBaseEvent,
1717
ITelemetryBaseProperties,
1818
} from "@fluidframework/core-interfaces";
19-
import type { IThrottlingWarning } from "@fluidframework/core-interfaces/internal";
19+
import { JsonParse } from "@fluidframework/core-interfaces/internal";
20+
import type { IThrottlingWarning, JsonString } from "@fluidframework/core-interfaces/internal";
2021
import { assert } from "@fluidframework/core-utils/internal";
2122
import type { ConnectionMode } from "@fluidframework/driver-definitions";
2223
import {
@@ -210,7 +211,9 @@ export class DeltaManager<TConnectionManager extends IConnectionManager>
210211
private initSequenceNumber: number = 0;
211212

212213
private readonly _inbound: DeltaQueue<ISequencedDocumentMessage>;
213-
private readonly _inboundSignal: DeltaQueue<ISignalMessage>;
214+
private readonly _inboundSignal: DeltaQueue<
215+
ISignalMessage<{ type: never; content: JsonString<unknown> }>
216+
>;
214217

215218
private _closed = false;
216219
private _disposed = false;
@@ -433,7 +436,9 @@ export class DeltaManager<TConnectionManager extends IConnectionManager>
433436
this.close(normalizeError(error));
434437
}
435438
},
436-
signalHandler: (signals: ISignalMessage[]) => {
439+
signalHandler: (
440+
signals: ISignalMessage<{ type: never; content: JsonString<unknown> }>[],
441+
) => {
437442
for (const signal of signals) {
438443
this._inboundSignal.push(signal);
439444
}
@@ -474,14 +479,16 @@ export class DeltaManager<TConnectionManager extends IConnectionManager>
474479
});
475480

476481
// Inbound signal queue
477-
this._inboundSignal = new DeltaQueue<ISignalMessage>((message) => {
482+
this._inboundSignal = new DeltaQueue<
483+
ISignalMessage<{ type: never; content: JsonString<unknown> }>
484+
>((message) => {
478485
if (this.handler === undefined) {
479486
throw new Error("Attempted to process an inbound signal without a handler attached");
480487
}
481488

482489
this.handler.processSignal({
483490
...message,
484-
content: JSON.parse(message.content as string),
491+
content: JsonParse(message.content),
485492
});
486493
});
487494

packages/loader/container-loader/src/protocol.ts

Lines changed: 66 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,39 @@ import {
2121
} from "./protocol/index.js";
2222

2323
// ADO: #1986: Start using enum from protocol-base.
24-
export enum SignalType {
25-
ClientJoin = "join", // same value as MessageType.ClientJoin,
26-
ClientLeave = "leave", // same value as MessageType.ClientLeave,
27-
Clear = "clear", // used only by client for synthetic signals
24+
export const SignalType = {
25+
ClientJoin: "join", // same value as MessageType.ClientJoin,
26+
ClientLeave: "leave", // same value as MessageType.ClientLeave,
27+
Clear: "clear", // used only by client for synthetic signals
28+
} as const;
29+
30+
interface SystemSignalContent {
31+
type: (typeof SignalType)[keyof typeof SignalType];
32+
content?: unknown;
33+
}
34+
35+
interface InboundSystemSignal<TSignalContent extends SystemSignalContent>
36+
extends ISignalMessage<{ type: never; content: TSignalContent }> {
37+
// eslint-disable-next-line @rushstack/no-new-null -- `null` is used in JSON protocol to indicate system message
38+
readonly clientId: null;
2839
}
2940

41+
type ClientJoinSignal = InboundSystemSignal<{
42+
type: typeof SignalType.ClientJoin;
43+
content: ISignalClient;
44+
}>;
45+
46+
type ClientLeaveSignal = InboundSystemSignal<{
47+
type: typeof SignalType.ClientLeave;
48+
content: string; // clientId of leaving client
49+
}>;
50+
51+
type ClearClientsSignal = InboundSystemSignal<{
52+
type: typeof SignalType.Clear;
53+
}>;
54+
55+
type AudienceSignal = ClientJoinSignal | ClientLeaveSignal | ClearClientsSignal;
56+
3057
/**
3158
* Function to be used for creating a protocol handler.
3259
* @legacy @beta
@@ -47,7 +74,34 @@ export interface IProtocolHandler extends IBaseProtocolHandler {
4774
processSignal(message: ISignalMessage);
4875
}
4976

50-
export class ProtocolHandler extends ProtocolOpHandler implements IProtocolHandler {
77+
/**
78+
* More specific version of {@link IProtocolHandler} with narrower call
79+
* constraints for processSignal.
80+
*/
81+
export interface ProtocolHandlerInternal extends IProtocolHandler {
82+
/**
83+
* Process the audience related signal.
84+
* @privateRemarks Internally only AudienceSignal messages need handled
85+
*/
86+
processSignal(message: AudienceSignal): void;
87+
}
88+
89+
/**
90+
* Function to be used for creating a protocol handler.
91+
*
92+
* @remarks This is the same are {@link ProtocolHandlerBuilder} but
93+
* returns the {@link ProtocolHandlerInternal} which has narrower
94+
* expectations for `processSignal`.
95+
*/
96+
export type InternalProtocolHandlerBuilder = (
97+
attributes: IDocumentAttributes,
98+
snapshot: IQuorumSnapshot,
99+
// TODO: use a real type (breaking change)
100+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
101+
sendProposal: (key: string, value: any) => number,
102+
) => ProtocolHandlerInternal;
103+
104+
export class ProtocolHandler extends ProtocolOpHandler implements ProtocolHandlerInternal {
51105
constructor(
52106
attributes: IDocumentAttributes,
53107
quorumSnapshot: IQuorumSnapshot,
@@ -104,8 +158,8 @@ export class ProtocolHandler extends ProtocolOpHandler implements IProtocolHandl
104158
return super.processMessage(message, local);
105159
}
106160

107-
public processSignal(message: ISignalMessage): void {
108-
const innerContent = message.content as { content: unknown; type: string };
161+
public processSignal(message: AudienceSignal): void {
162+
const innerContent = message.content;
109163
switch (innerContent.type) {
110164
case SignalType.Clear: {
111165
const members = this.audience.getMembers();
@@ -117,15 +171,15 @@ export class ProtocolHandler extends ProtocolOpHandler implements IProtocolHandl
117171
break;
118172
}
119173
case SignalType.ClientJoin: {
120-
const newClient = innerContent.content as ISignalClient;
174+
const newClient = innerContent.content;
121175
// Ignore write clients - quorum will control such clients.
122176
if (newClient.client.mode === "read") {
123177
this.audience.addMember(newClient.clientId, newClient.client);
124178
}
125179
break;
126180
}
127181
case SignalType.ClientLeave: {
128-
const leftClientId = innerContent.content as string;
182+
const leftClientId = innerContent.content;
129183
// Ignore write clients - quorum will control such clients.
130184
if (this.audience.getMember(leftClientId)?.mode === "read") {
131185
this.audience.removeMember(leftClientId);
@@ -144,7 +198,9 @@ export class ProtocolHandler extends ProtocolOpHandler implements IProtocolHandl
144198
* The protocol handler should strictly handle only ClientJoin, ClientLeave
145199
* and Clear signal types.
146200
*/
147-
export function protocolHandlerShouldProcessSignal(message: ISignalMessage): boolean {
201+
export function protocolHandlerShouldProcessSignal(
202+
message: ISignalMessage,
203+
): message is AudienceSignal {
148204
// Signal originates from server
149205
if (message.clientId === null) {
150206
const innerContent = message.content as { content: unknown; type: string };

0 commit comments

Comments
 (0)