Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 31 additions & 14 deletions packages/loader/container-loader/src/connectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import type {
ReadOnlyInfo,
} from "@fluidframework/container-definitions/internal";
import { type ITelemetryBaseProperties, LogLevel } from "@fluidframework/core-interfaces";
import type { JsonString } from "@fluidframework/core-interfaces/internal";
import { JsonStringify } from "@fluidframework/core-interfaces/internal";
import { assert } from "@fluidframework/core-utils/internal";
import type {
ConnectionMode,
Expand Down Expand Up @@ -113,6 +115,19 @@ interface IPendingConnection {
connectionMode: ConnectionMode;
}

function assertExpectedSignals(
signals: ISignalMessage[],
): asserts signals is ISignalMessage<{ type: never; content: JsonString<unknown> }>[] {
for (const signal of signals) {
if ("type" in signal) {
throw new Error("Unexpected type in ISignalMessage");
}
if (typeof signal.content !== "string") {
throw new TypeError("Non-string content in ISignalMessage");
}
}
}
Comment on lines +118 to +129
Copy link

Copilot AI Oct 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use string literals for assert error messages instead of generic descriptions. Replace 'Unexpected type in ISignalMessage' and 'Non-string content in ISignalMessage' with more descriptive string literals.

Copilot generated this review using guidance from repository custom instructions.

/**
* Implementation of IConnectionManager, used by Container class
* Implements constant connectivity to relay service, by reconnecting in case of lost connection or error.
Expand Down Expand Up @@ -921,29 +936,29 @@ export class ConnectionManager implements IConnectionManager {
// Synthesize clear & join signals out of initialClients state.
// This allows us to have single way to process signals, and makes it simpler to initialize
// protocol in Container.
const clearSignal: ISignalMessage = {
const clearSignal = {
// API uses null
// eslint-disable-next-line unicorn/no-null
clientId: null, // system message
content: JSON.stringify({
content: JsonStringify({
type: SignalType.Clear,
}),
};

// list of signals to process due to this new connection
let signalsToProcess: ISignalMessage[] = [clearSignal];

const clientJoinSignals: ISignalMessage[] = (connection.initialClients ?? []).map(
(priorClient) => ({
// API uses null
// eslint-disable-next-line unicorn/no-null
clientId: null, // system signal
content: JSON.stringify({
type: SignalType.ClientJoin,
content: priorClient, // ISignalClient
}),
let signalsToProcess: ISignalMessage<{ type: never; content: JsonString<unknown> }>[] = [
clearSignal,
];

const clientJoinSignals = (connection.initialClients ?? []).map((priorClient) => ({
// API uses null
// eslint-disable-next-line unicorn/no-null
clientId: null, // system signal
content: JsonStringify({
type: SignalType.ClientJoin,
content: priorClient, // ISignalClient
}),
);
}));
if (clientJoinSignals.length > 0) {
signalsToProcess = [...signalsToProcess, ...clientJoinSignals];
}
Expand All @@ -953,6 +968,7 @@ export class ConnectionManager implements IConnectionManager {
// for "self" and connection.initialClients does not contain "self", so we have to process them after
// "clear" signal above.
if (connection.initialSignals !== undefined && connection.initialSignals.length > 0) {
assertExpectedSignals(connection.initialSignals);
signalsToProcess = [...signalsToProcess, ...connection.initialSignals];
}

Expand Down Expand Up @@ -1179,6 +1195,7 @@ export class ConnectionManager implements IConnectionManager {

private readonly signalHandler = (signalsArg: ISignalMessage | ISignalMessage[]): void => {
const signals = Array.isArray(signalsArg) ? signalsArg : [signalsArg];
assertExpectedSignals(signals);
this.props.signalHandler(signals);
};

Expand Down
11 changes: 6 additions & 5 deletions packages/loader/container-loader/src/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,10 @@ import { NoopHeuristic } from "./noopHeuristic.js";
import { pkgVersion } from "./packageVersion.js";
import type { IQuorumSnapshot } from "./protocol/index.js";
import {
type IProtocolHandler,
type InternalProtocolHandlerBuilder,
ProtocolHandler,
type ProtocolHandlerBuilder,
type ProtocolHandlerInternal,
protocolHandlerShouldProcessSignal,
} from "./protocol.js";
import { initQuorumValuesFromCodeDetails } from "./quorum.js";
Expand Down Expand Up @@ -495,7 +496,7 @@ export class Container
private readonly scope: FluidObject;
private readonly subLogger: ITelemetryLoggerExt;
private readonly detachedBlobStorage: MemoryDetachedBlobStorage | undefined;
private readonly protocolHandlerBuilder: ProtocolHandlerBuilder;
private readonly protocolHandlerBuilder: InternalProtocolHandlerBuilder;
private readonly client: IClient;

private readonly mc: MonitoringContext;
Expand Down Expand Up @@ -597,8 +598,8 @@ export class Container
}
return this._runtime;
}
private _protocolHandler: IProtocolHandler | undefined;
private get protocolHandler(): IProtocolHandler {
private _protocolHandler: ProtocolHandlerInternal | undefined;
private get protocolHandler(): ProtocolHandlerInternal {
if (this._protocolHandler === undefined) {
throw new Error("Attempted to access protocolHandler before it was defined");
}
Expand Down Expand Up @@ -830,7 +831,7 @@ export class Container
attributes: IDocumentAttributes,
quorumSnapshot: IQuorumSnapshot,
sendProposal: (key: string, value: unknown) => number,
): ProtocolHandler =>
): ProtocolHandlerInternal =>
new ProtocolHandler(
attributes,
quorumSnapshot,
Expand Down
5 changes: 4 additions & 1 deletion packages/loader/container-loader/src/contracts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
type IConnectionDetails,
} from "@fluidframework/container-definitions/internal";
import type { IErrorBase, ITelemetryBaseProperties } from "@fluidframework/core-interfaces";
import type { JsonString } from "@fluidframework/core-interfaces/internal";
import type { ConnectionMode, IClientDetails } from "@fluidframework/driver-definitions";
import type {
IContainerPackageInfo,
Expand Down Expand Up @@ -145,7 +146,9 @@ export interface IConnectionManagerFactoryArgs {
* Called by connection manager for each incoming signal.
* May be called before connectHandler is called (due to initial signals on socket connection)
*/
readonly signalHandler: (signals: ISignalMessage[]) => void;
readonly signalHandler: (
signals: ISignalMessage<{ type: never; content: JsonString<unknown> }>[],
) => void;

/**
* Called when connection manager experiences delay in connecting to relay service.
Expand Down
17 changes: 12 additions & 5 deletions packages/loader/container-loader/src/deltaManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import type {
ITelemetryBaseEvent,
ITelemetryBaseProperties,
} from "@fluidframework/core-interfaces";
import type { IThrottlingWarning } from "@fluidframework/core-interfaces/internal";
import { JsonParse } from "@fluidframework/core-interfaces/internal";
import type { IThrottlingWarning, JsonString } from "@fluidframework/core-interfaces/internal";
import { assert } from "@fluidframework/core-utils/internal";
import type { ConnectionMode } from "@fluidframework/driver-definitions";
import {
Expand Down Expand Up @@ -210,7 +211,9 @@ export class DeltaManager<TConnectionManager extends IConnectionManager>
private initSequenceNumber: number = 0;

private readonly _inbound: DeltaQueue<ISequencedDocumentMessage>;
private readonly _inboundSignal: DeltaQueue<ISignalMessage>;
private readonly _inboundSignal: DeltaQueue<
ISignalMessage<{ type: never; content: JsonString<unknown> }>
>;

private _closed = false;
private _disposed = false;
Expand Down Expand Up @@ -433,7 +436,9 @@ export class DeltaManager<TConnectionManager extends IConnectionManager>
this.close(normalizeError(error));
}
},
signalHandler: (signals: ISignalMessage[]) => {
signalHandler: (
signals: ISignalMessage<{ type: never; content: JsonString<unknown> }>[],
) => {
for (const signal of signals) {
this._inboundSignal.push(signal);
}
Expand Down Expand Up @@ -474,14 +479,16 @@ export class DeltaManager<TConnectionManager extends IConnectionManager>
});

// Inbound signal queue
this._inboundSignal = new DeltaQueue<ISignalMessage>((message) => {
this._inboundSignal = new DeltaQueue<
ISignalMessage<{ type: never; content: JsonString<unknown> }>
>((message) => {
if (this.handler === undefined) {
throw new Error("Attempted to process an inbound signal without a handler attached");
}

this.handler.processSignal({
...message,
content: JSON.parse(message.content as string),
content: JsonParse(message.content),
});
});

Expand Down
77 changes: 67 additions & 10 deletions packages/loader/container-loader/src/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,39 @@ import {
} from "./protocol/index.js";

// ADO: #1986: Start using enum from protocol-base.
export enum SignalType {
ClientJoin = "join", // same value as MessageType.ClientJoin,
ClientLeave = "leave", // same value as MessageType.ClientLeave,
Clear = "clear", // used only by client for synthetic signals
export const SignalType = {
ClientJoin: "join", // same value as MessageType.ClientJoin,
ClientLeave: "leave", // same value as MessageType.ClientLeave,
Clear: "clear", // used only by client for synthetic signals
} as const;

interface SystemSignalContent {
type: (typeof SignalType)[keyof typeof SignalType];
content?: unknown;
}

interface InboundSystemSignal<TSignalContent extends SystemSignalContent>
extends ISignalMessage<{ type: never; content: TSignalContent }> {
// eslint-disable-next-line @rushstack/no-new-null -- `null` is used in JSON protocol to indicate system message
readonly clientId: null;
}

type ClientJoinSignal = InboundSystemSignal<{
type: typeof SignalType.ClientJoin;
content: ISignalClient;
}>;

type ClientLeaveSignal = InboundSystemSignal<{
type: typeof SignalType.ClientLeave;
content: string; // clientId of leaving client
}>;

type ClearClientsSignal = InboundSystemSignal<{
type: typeof SignalType.Clear;
}>;

type AudienceSignal = ClientJoinSignal | ClientLeaveSignal | ClearClientsSignal;

/**
* Function to be used for creating a protocol handler.
* @legacy @beta
Expand All @@ -47,7 +74,35 @@ export interface IProtocolHandler extends IBaseProtocolHandler {
processSignal(message: ISignalMessage);
}

export class ProtocolHandler extends ProtocolOpHandler implements IProtocolHandler {
/**
* More specific version of {@link IProtocolHandler} with narrower call
* constraints for {@link IProtocolHandler.processSignal}.
*/
export interface ProtocolHandlerInternal extends IProtocolHandler {
/**
* Process the audience related signal.
* @privateRemarks
* Internally, only {@link AudienceSignal} messages need handling.
*/
processSignal(message: AudienceSignal): void;
}

/**
* Function to be used for creating a protocol handler.
*
* @remarks This is the same are {@link ProtocolHandlerBuilder} but
* returns the {@link ProtocolHandlerInternal} which has narrower
* expectations for `processSignal`.
*/
export type InternalProtocolHandlerBuilder = (
attributes: IDocumentAttributes,
snapshot: IQuorumSnapshot,
// TODO: use a real type (breaking change)
// eslint-disable-next-line @typescript-eslint/no-explicit-any
sendProposal: (key: string, value: any) => number,
) => ProtocolHandlerInternal;

export class ProtocolHandler extends ProtocolOpHandler implements ProtocolHandlerInternal {
constructor(
attributes: IDocumentAttributes,
quorumSnapshot: IQuorumSnapshot,
Expand Down Expand Up @@ -104,8 +159,8 @@ export class ProtocolHandler extends ProtocolOpHandler implements IProtocolHandl
return super.processMessage(message, local);
}

public processSignal(message: ISignalMessage): void {
const innerContent = message.content as { content: unknown; type: string };
public processSignal(message: AudienceSignal): void {
const innerContent = message.content;
switch (innerContent.type) {
case SignalType.Clear: {
const members = this.audience.getMembers();
Expand All @@ -117,15 +172,15 @@ export class ProtocolHandler extends ProtocolOpHandler implements IProtocolHandl
break;
}
case SignalType.ClientJoin: {
const newClient = innerContent.content as ISignalClient;
const newClient = innerContent.content;
// Ignore write clients - quorum will control such clients.
if (newClient.client.mode === "read") {
this.audience.addMember(newClient.clientId, newClient.client);
}
break;
}
case SignalType.ClientLeave: {
const leftClientId = innerContent.content as string;
const leftClientId = innerContent.content;
// Ignore write clients - quorum will control such clients.
if (this.audience.getMember(leftClientId)?.mode === "read") {
this.audience.removeMember(leftClientId);
Expand All @@ -144,7 +199,9 @@ export class ProtocolHandler extends ProtocolOpHandler implements IProtocolHandl
* The protocol handler should strictly handle only ClientJoin, ClientLeave
* and Clear signal types.
*/
export function protocolHandlerShouldProcessSignal(message: ISignalMessage): boolean {
export function protocolHandlerShouldProcessSignal(
message: ISignalMessage,
): message is AudienceSignal {
// Signal originates from server
if (message.clientId === null) {
const innerContent = message.content as { content: unknown; type: string };
Expand Down
Loading