Skip to content

Commit 2f6817d

Browse files
authored
improvement(client-container-loader): tighten Signal expectations (#25522)
Signals specify `type` within message `content` next to inner content that is stringified JSON. Assert that internally (to avoid legacy API change) and leverage. Additionally, clean up system signal typing and type guard to reduce casts.
1 parent c557a7f commit 2f6817d

File tree

5 files changed

+120
-35
lines changed

5 files changed

+120
-35
lines changed

packages/loader/container-loader/src/connectionManager.ts

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import type {
1010
ReadOnlyInfo,
1111
} from "@fluidframework/container-definitions/internal";
1212
import { type ITelemetryBaseProperties, LogLevel } from "@fluidframework/core-interfaces";
13+
import type { JsonString } from "@fluidframework/core-interfaces/internal";
14+
import { JsonStringify } from "@fluidframework/core-interfaces/internal";
1315
import { assert } from "@fluidframework/core-utils/internal";
1416
import type {
1517
ConnectionMode,
@@ -113,6 +115,19 @@ interface IPendingConnection {
113115
connectionMode: ConnectionMode;
114116
}
115117

118+
function assertExpectedSignals(
119+
signals: ISignalMessage[],
120+
): asserts signals is ISignalMessage<{ type: never; content: JsonString<unknown> }>[] {
121+
for (const signal of signals) {
122+
if ("type" in signal) {
123+
throw new Error("Unexpected type in ISignalMessage");
124+
}
125+
if (typeof signal.content !== "string") {
126+
throw new TypeError("Non-string content in ISignalMessage");
127+
}
128+
}
129+
}
130+
116131
/**
117132
* Implementation of IConnectionManager, used by Container class
118133
* Implements constant connectivity to relay service, by reconnecting in case of lost connection or error.
@@ -921,29 +936,29 @@ export class ConnectionManager implements IConnectionManager {
921936
// Synthesize clear & join signals out of initialClients state.
922937
// This allows us to have single way to process signals, and makes it simpler to initialize
923938
// protocol in Container.
924-
const clearSignal: ISignalMessage = {
939+
const clearSignal = {
925940
// API uses null
926941
// eslint-disable-next-line unicorn/no-null
927942
clientId: null, // system message
928-
content: JSON.stringify({
943+
content: JsonStringify({
929944
type: SignalType.Clear,
930945
}),
931946
};
932947

933948
// list of signals to process due to this new connection
934-
let signalsToProcess: ISignalMessage[] = [clearSignal];
935-
936-
const clientJoinSignals: ISignalMessage[] = (connection.initialClients ?? []).map(
937-
(priorClient) => ({
938-
// API uses null
939-
// eslint-disable-next-line unicorn/no-null
940-
clientId: null, // system signal
941-
content: JSON.stringify({
942-
type: SignalType.ClientJoin,
943-
content: priorClient, // ISignalClient
944-
}),
949+
let signalsToProcess: ISignalMessage<{ type: never; content: JsonString<unknown> }>[] = [
950+
clearSignal,
951+
];
952+
953+
const clientJoinSignals = (connection.initialClients ?? []).map((priorClient) => ({
954+
// API uses null
955+
// eslint-disable-next-line unicorn/no-null
956+
clientId: null, // system signal
957+
content: JsonStringify({
958+
type: SignalType.ClientJoin,
959+
content: priorClient, // ISignalClient
945960
}),
946-
);
961+
}));
947962
if (clientJoinSignals.length > 0) {
948963
signalsToProcess = [...signalsToProcess, ...clientJoinSignals];
949964
}
@@ -953,6 +968,7 @@ export class ConnectionManager implements IConnectionManager {
953968
// for "self" and connection.initialClients does not contain "self", so we have to process them after
954969
// "clear" signal above.
955970
if (connection.initialSignals !== undefined && connection.initialSignals.length > 0) {
971+
assertExpectedSignals(connection.initialSignals);
956972
signalsToProcess = [...signalsToProcess, ...connection.initialSignals];
957973
}
958974

@@ -1179,6 +1195,7 @@ export class ConnectionManager implements IConnectionManager {
11791195

11801196
private readonly signalHandler = (signalsArg: ISignalMessage | ISignalMessage[]): void => {
11811197
const signals = Array.isArray(signalsArg) ? signalsArg : [signalsArg];
1198+
assertExpectedSignals(signals);
11821199
this.props.signalHandler(signals);
11831200
};
11841201

packages/loader/container-loader/src/container.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,10 @@ import { NoopHeuristic } from "./noopHeuristic.js";
140140
import { pkgVersion } from "./packageVersion.js";
141141
import type { IQuorumSnapshot } from "./protocol/index.js";
142142
import {
143-
type IProtocolHandler,
143+
type InternalProtocolHandlerBuilder,
144144
ProtocolHandler,
145145
type ProtocolHandlerBuilder,
146+
type ProtocolHandlerInternal,
146147
protocolHandlerShouldProcessSignal,
147148
} from "./protocol.js";
148149
import { initQuorumValuesFromCodeDetails } from "./quorum.js";
@@ -495,7 +496,7 @@ export class Container
495496
private readonly scope: FluidObject;
496497
private readonly subLogger: ITelemetryLoggerExt;
497498
private readonly detachedBlobStorage: MemoryDetachedBlobStorage | undefined;
498-
private readonly protocolHandlerBuilder: ProtocolHandlerBuilder;
499+
private readonly protocolHandlerBuilder: InternalProtocolHandlerBuilder;
499500
private readonly client: IClient;
500501

501502
private readonly mc: MonitoringContext;
@@ -597,8 +598,8 @@ export class Container
597598
}
598599
return this._runtime;
599600
}
600-
private _protocolHandler: IProtocolHandler | undefined;
601-
private get protocolHandler(): IProtocolHandler {
601+
private _protocolHandler: ProtocolHandlerInternal | undefined;
602+
private get protocolHandler(): ProtocolHandlerInternal {
602603
if (this._protocolHandler === undefined) {
603604
throw new Error("Attempted to access protocolHandler before it was defined");
604605
}
@@ -830,7 +831,7 @@ export class Container
830831
attributes: IDocumentAttributes,
831832
quorumSnapshot: IQuorumSnapshot,
832833
sendProposal: (key: string, value: unknown) => number,
833-
): ProtocolHandler =>
834+
): ProtocolHandlerInternal =>
834835
new ProtocolHandler(
835836
attributes,
836837
quorumSnapshot,

packages/loader/container-loader/src/contracts.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
type IConnectionDetails,
1313
} from "@fluidframework/container-definitions/internal";
1414
import type { IErrorBase, ITelemetryBaseProperties } from "@fluidframework/core-interfaces";
15+
import type { JsonString } from "@fluidframework/core-interfaces/internal";
1516
import type { ConnectionMode, IClientDetails } from "@fluidframework/driver-definitions";
1617
import type {
1718
IContainerPackageInfo,
@@ -145,7 +146,9 @@ export interface IConnectionManagerFactoryArgs {
145146
* Called by connection manager for each incoming signal.
146147
* May be called before connectHandler is called (due to initial signals on socket connection)
147148
*/
148-
readonly signalHandler: (signals: ISignalMessage[]) => void;
149+
readonly signalHandler: (
150+
signals: ISignalMessage<{ type: never; content: JsonString<unknown> }>[],
151+
) => void;
149152

150153
/**
151154
* Called when connection manager experiences delay in connecting to relay service.

packages/loader/container-loader/src/deltaManager.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ import type {
1616
ITelemetryBaseEvent,
1717
ITelemetryBaseProperties,
1818
} from "@fluidframework/core-interfaces";
19-
import type { IThrottlingWarning } from "@fluidframework/core-interfaces/internal";
19+
import { JsonParse } from "@fluidframework/core-interfaces/internal";
20+
import type { IThrottlingWarning, JsonString } from "@fluidframework/core-interfaces/internal";
2021
import { assert } from "@fluidframework/core-utils/internal";
2122
import type { ConnectionMode } from "@fluidframework/driver-definitions";
2223
import {
@@ -210,7 +211,9 @@ export class DeltaManager<TConnectionManager extends IConnectionManager>
210211
private initSequenceNumber: number = 0;
211212

212213
private readonly _inbound: DeltaQueue<ISequencedDocumentMessage>;
213-
private readonly _inboundSignal: DeltaQueue<ISignalMessage>;
214+
private readonly _inboundSignal: DeltaQueue<
215+
ISignalMessage<{ type: never; content: JsonString<unknown> }>
216+
>;
214217

215218
private _closed = false;
216219
private _disposed = false;
@@ -433,7 +436,9 @@ export class DeltaManager<TConnectionManager extends IConnectionManager>
433436
this.close(normalizeError(error));
434437
}
435438
},
436-
signalHandler: (signals: ISignalMessage[]) => {
439+
signalHandler: (
440+
signals: ISignalMessage<{ type: never; content: JsonString<unknown> }>[],
441+
) => {
437442
for (const signal of signals) {
438443
this._inboundSignal.push(signal);
439444
}
@@ -474,14 +479,16 @@ export class DeltaManager<TConnectionManager extends IConnectionManager>
474479
});
475480

476481
// Inbound signal queue
477-
this._inboundSignal = new DeltaQueue<ISignalMessage>((message) => {
482+
this._inboundSignal = new DeltaQueue<
483+
ISignalMessage<{ type: never; content: JsonString<unknown> }>
484+
>((message) => {
478485
if (this.handler === undefined) {
479486
throw new Error("Attempted to process an inbound signal without a handler attached");
480487
}
481488

482489
this.handler.processSignal({
483490
...message,
484-
content: JSON.parse(message.content as string),
491+
content: JsonParse(message.content),
485492
});
486493
});
487494

packages/loader/container-loader/src/protocol.ts

Lines changed: 67 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,39 @@ import {
2121
} from "./protocol/index.js";
2222

2323
// ADO: #1986: Start using enum from protocol-base.
24-
export enum SignalType {
25-
ClientJoin = "join", // same value as MessageType.ClientJoin,
26-
ClientLeave = "leave", // same value as MessageType.ClientLeave,
27-
Clear = "clear", // used only by client for synthetic signals
24+
export const SignalType = {
25+
ClientJoin: "join", // same value as MessageType.ClientJoin,
26+
ClientLeave: "leave", // same value as MessageType.ClientLeave,
27+
Clear: "clear", // used only by client for synthetic signals
28+
} as const;
29+
30+
interface SystemSignalContent {
31+
type: (typeof SignalType)[keyof typeof SignalType];
32+
content?: unknown;
33+
}
34+
35+
interface InboundSystemSignal<TSignalContent extends SystemSignalContent>
36+
extends ISignalMessage<{ type: never; content: TSignalContent }> {
37+
// eslint-disable-next-line @rushstack/no-new-null -- `null` is used in JSON protocol to indicate system message
38+
readonly clientId: null;
2839
}
2940

41+
type ClientJoinSignal = InboundSystemSignal<{
42+
type: typeof SignalType.ClientJoin;
43+
content: ISignalClient;
44+
}>;
45+
46+
type ClientLeaveSignal = InboundSystemSignal<{
47+
type: typeof SignalType.ClientLeave;
48+
content: string; // clientId of leaving client
49+
}>;
50+
51+
type ClearClientsSignal = InboundSystemSignal<{
52+
type: typeof SignalType.Clear;
53+
}>;
54+
55+
type AudienceSignal = ClientJoinSignal | ClientLeaveSignal | ClearClientsSignal;
56+
3057
/**
3158
* Function to be used for creating a protocol handler.
3259
* @legacy @beta
@@ -47,7 +74,35 @@ export interface IProtocolHandler extends IBaseProtocolHandler {
4774
processSignal(message: ISignalMessage);
4875
}
4976

50-
export class ProtocolHandler extends ProtocolOpHandler implements IProtocolHandler {
77+
/**
78+
* More specific version of {@link IProtocolHandler} with narrower call
79+
* constraints for {@link IProtocolHandler.processSignal}.
80+
*/
81+
export interface ProtocolHandlerInternal extends IProtocolHandler {
82+
/**
83+
* Process the audience related signal.
84+
* @privateRemarks
85+
* Internally, only {@link AudienceSignal} messages need handling.
86+
*/
87+
processSignal(message: AudienceSignal): void;
88+
}
89+
90+
/**
91+
* Function to be used for creating a protocol handler.
92+
*
93+
* @remarks This is the same are {@link ProtocolHandlerBuilder} but
94+
* returns the {@link ProtocolHandlerInternal} which has narrower
95+
* expectations for `processSignal`.
96+
*/
97+
export type InternalProtocolHandlerBuilder = (
98+
attributes: IDocumentAttributes,
99+
snapshot: IQuorumSnapshot,
100+
// TODO: use a real type (breaking change)
101+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
102+
sendProposal: (key: string, value: any) => number,
103+
) => ProtocolHandlerInternal;
104+
105+
export class ProtocolHandler extends ProtocolOpHandler implements ProtocolHandlerInternal {
51106
constructor(
52107
attributes: IDocumentAttributes,
53108
quorumSnapshot: IQuorumSnapshot,
@@ -104,8 +159,8 @@ export class ProtocolHandler extends ProtocolOpHandler implements IProtocolHandl
104159
return super.processMessage(message, local);
105160
}
106161

107-
public processSignal(message: ISignalMessage): void {
108-
const innerContent = message.content as { content: unknown; type: string };
162+
public processSignal(message: AudienceSignal): void {
163+
const innerContent = message.content;
109164
switch (innerContent.type) {
110165
case SignalType.Clear: {
111166
const members = this.audience.getMembers();
@@ -117,15 +172,15 @@ export class ProtocolHandler extends ProtocolOpHandler implements IProtocolHandl
117172
break;
118173
}
119174
case SignalType.ClientJoin: {
120-
const newClient = innerContent.content as ISignalClient;
175+
const newClient = innerContent.content;
121176
// Ignore write clients - quorum will control such clients.
122177
if (newClient.client.mode === "read") {
123178
this.audience.addMember(newClient.clientId, newClient.client);
124179
}
125180
break;
126181
}
127182
case SignalType.ClientLeave: {
128-
const leftClientId = innerContent.content as string;
183+
const leftClientId = innerContent.content;
129184
// Ignore write clients - quorum will control such clients.
130185
if (this.audience.getMember(leftClientId)?.mode === "read") {
131186
this.audience.removeMember(leftClientId);
@@ -144,7 +199,9 @@ export class ProtocolHandler extends ProtocolOpHandler implements IProtocolHandl
144199
* The protocol handler should strictly handle only ClientJoin, ClientLeave
145200
* and Clear signal types.
146201
*/
147-
export function protocolHandlerShouldProcessSignal(message: ISignalMessage): boolean {
202+
export function protocolHandlerShouldProcessSignal(
203+
message: ISignalMessage,
204+
): message is AudienceSignal {
148205
// Signal originates from server
149206
if (message.clientId === null) {
150207
const innerContent = message.content as { content: unknown; type: string };

0 commit comments

Comments
 (0)