Skip to content

Commit 238b54a

Browse files
authored
feat(client): Container Extensions operation during CatchingUp (#25757)
- "connectedToService" is emitted for CatchingUp (ahead of Connected) - Signal only Audience will be sent to provide information sooner and allow early activity. Presence limits quorum use to ordering of fallback Join responses, which are expected to happen later and often when quorum is more complete.
1 parent 5e87573 commit 238b54a

File tree

4 files changed

+103
-43
lines changed

4 files changed

+103
-43
lines changed

packages/framework/presence/src/presenceDatastoreManager.ts

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -186,11 +186,43 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
186186
this.targetedSignalSupport = this.runtime.supportedFeatures.has("submit_signals_v2");
187187
}
188188

189-
public joinSession(clientId: ClientConnectionId): void {
189+
private getInteractiveMembersExcludingSelf(selfClientId: ClientConnectionId): {
190+
all: Set<ClientConnectionId>;
191+
writers: Set<ClientConnectionId>;
192+
} {
193+
const audience = this.runtime.getAudience();
194+
const members = audience.getMembers();
195+
const all = new Set<ClientConnectionId>();
196+
const writers = new Set<ClientConnectionId>();
197+
// Remove self (if present)
198+
members.delete(selfClientId);
199+
// Gather interactive client IDs
200+
for (const [id, client] of members) {
201+
if (client.details.capabilities.interactive) {
202+
all.add(id);
203+
if (client.mode === "write") {
204+
writers.add(id);
205+
}
206+
}
207+
}
208+
return {
209+
all,
210+
writers,
211+
};
212+
}
213+
214+
public joinSession(selfClientId: ClientConnectionId): void {
215+
const interactiveMembersExcludingSelf =
216+
this.getInteractiveMembersExcludingSelf(selfClientId);
217+
190218
// Broadcast join message to all clients
191-
const updateProviders = [...this.runtime.getQuorum().getMembers().keys()].filter(
192-
(quorumClientId) => quorumClientId !== clientId,
193-
);
219+
// Select primary update providers
220+
// Use write members if any, then fallback to read-only members.
221+
const updateProviders = [
222+
...(interactiveMembersExcludingSelf.writers.size > 0
223+
? interactiveMembersExcludingSelf.writers
224+
: interactiveMembersExcludingSelf.all),
225+
];
194226
// Limit to three providers to prevent flooding the network.
195227
// If none respond, others present will (should) after a delay.
196228
if (updateProviders.length > 3) {
@@ -330,8 +362,8 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
330362
assert(clientConnectionId !== undefined, 0xa59 /* Client connected without clientId */);
331363
const currentClientToSessionValueState =
332364
// When connected, `clientToSessionId` must always have current connection entry.
333-
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
334-
this.datastore["system:presence"].clientToSessionId[clientConnectionId]!;
365+
this.datastore["system:presence"].clientToSessionId[clientConnectionId];
366+
assert(currentClientToSessionValueState !== undefined, "Client connection update missing");
335367

336368
const newMessage = {
337369
sendTimestamp: Date.now(),

packages/runtime/container-runtime-definitions/src/containerExtension.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ export interface ExtensionHost<TRuntimeProperties extends ExtensionRuntimeProper
282282
* @remarks
283283
* Status changes are signaled through :
284284
* - {@link ExtensionHostEvents.disconnected}: Transitioning to Disconnected state
285-
* - {@link ExtensionHostEvents.joined}: Transition to Connected state (either for reading or writing)
285+
* - {@link ExtensionHostEvents.joined}: Transition to CatchingUp or Connected state (either for reading or writing)
286286
* - {@link ExtensionHostEvents.operabilityChanged}: When operability has changed (e.g., write to read)
287287
*/
288288
readonly getJoinedStatus: () => JoinedStatus;
@@ -313,6 +313,13 @@ export interface ExtensionHost<TRuntimeProperties extends ExtensionRuntimeProper
313313
*/
314314
getQuorum: () => IQuorumClients;
315315

316+
/**
317+
* The collection of all clients as enumerated by the service.
318+
*
319+
* @remarks This may include/exclude those found within the quorum.
320+
* It produces results faster than {@link ExtensionHost.getQuorum}, but
321+
* will be inaccurate if any signals are lost.
322+
*/
316323
getAudience: () => IAudience;
317324
}
318325

packages/runtime/container-runtime/src/containerRuntime.ts

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1568,6 +1568,7 @@ export class ContainerRuntime
15681568
deltaManager,
15691569
quorum,
15701570
audience,
1571+
signalAudience,
15711572
pendingLocalState,
15721573
supportedFeatures,
15731574
snapshotWithContents,
@@ -1722,7 +1723,8 @@ export class ContainerRuntime
17221723
// Later updates come through calls to setConnectionState/Status.
17231724
this.canSendOps = connected;
17241725
this.canSendSignals = this.getConnectionState
1725-
? this.getConnectionState() === ConnectionState.Connected
1726+
? this.getConnectionState() === ConnectionState.Connected ||
1727+
this.getConnectionState() === ConnectionState.CatchingUp
17261728
: undefined;
17271729

17281730
this.mc.logger.sendTelemetryEvent({
@@ -2038,6 +2040,8 @@ export class ContainerRuntime
20382040
});
20392041
}
20402042

2043+
this.signalAudience = signalAudience;
2044+
20412045
const closeSummarizerDelayOverride = this.mc.config.getNumber(
20422046
"Fluid.ContainerRuntime.Test.CloseSummarizerDelayOverrideMs",
20432047
);
@@ -2947,7 +2951,7 @@ export class ContainerRuntime
29472951
* Emits service connection events based on connection state changes.
29482952
*
29492953
* @remarks
2950-
* "connectedToService" is emitted when container connection state transitions to 'Connected' regardless of connection mode.
2954+
* "connectedToService" is emitted when container connection state transitions to 'CatchingUp' or 'Connected' regardless of connection mode.
29512955
* "disconnectedFromService" excludes false "disconnected" events that happen when readonly client transitions to 'Connected'.
29522956
*/
29532957
private emitServiceConnectionEvents(
@@ -2959,16 +2963,20 @@ export class ContainerRuntime
29592963
return;
29602964
}
29612965

2962-
const canSendSignals = this.getConnectionState() === ConnectionState.Connected;
2966+
const connectionState = this.getConnectionState();
2967+
const canSendSignals =
2968+
connectionState === ConnectionState.Connected ||
2969+
connectionState === ConnectionState.CatchingUp;
29632970
const canSendSignalsChanged = this.canSendSignals !== canSendSignals;
29642971
this.canSendSignals = canSendSignals;
29652972
if (canSendSignalsChanged) {
2966-
// If canSendSignals changed, we either transitioned from Connected to Disconnected or CatchingUp to Connected
2973+
// If canSendSignals changed, we either transitioned from CatchingUp or
2974+
// Connected to Disconnected or EstablishingConnection to CatchingUp.
29672975
if (canSendSignals) {
2968-
// Emit for CatchingUp to Connected transition
2976+
// Emit for EstablishingConnection to CatchingUp or Connected transition
29692977
this.emit("connectedToService", clientId, canSendOps);
29702978
} else {
2971-
// Emit for Connected to Disconnected transition
2979+
// Emit for CatchingUp or Connected to Disconnected transition
29722980
this.emit("disconnectedFromService");
29732981
}
29742982
} else if (canSendOpsChanged) {
@@ -3727,6 +3735,14 @@ export class ContainerRuntime
37273735
return this._audience;
37283736
}
37293737

3738+
/**
3739+
* When defined, this {@link @fluidframework/container-definitions#IAudience}
3740+
* maintains member list using signals only.
3741+
* Thus "write" members may be known earlier than quorum and avoid noise from
3742+
* un-summarized quorum history.
3743+
*/
3744+
private readonly signalAudience?: IAudience;
3745+
37303746
/**
37313747
* Returns true of container is dirty, i.e. there are some pending local changes that
37323748
* either were not sent out to delta stream or were not yet acknowledged.
@@ -5271,7 +5287,11 @@ export class ContainerRuntime
52715287
const getConnectionState = this.getConnectionState;
52725288
if (getConnectionState) {
52735289
const connectionState = getConnectionState();
5274-
if (connectionState === ConnectionState.Connected) {
5290+
if (
5291+
connectionState === ConnectionState.Connected ||
5292+
connectionState === ConnectionState.CatchingUp
5293+
) {
5294+
// Note: when CatchingUp, canSendOps will always be false.
52755295
return this.canSendOps ? "joinedForWriting" : "joinedForReading";
52765296
}
52775297
} else if (this.canSendOps) {
@@ -5297,9 +5317,10 @@ export class ContainerRuntime
52975317
): T {
52985318
let entry = this.extensions.get(id);
52995319
if (entry === undefined) {
5320+
const audience = this.signalAudience;
53005321
const runtime = {
53015322
getJoinedStatus: this.getJoinedStatus.bind(this),
5302-
getClientId: () => this.clientId,
5323+
getClientId: audience ? () => audience.getSelf()?.clientId : () => this.clientId,
53035324
events: this.lazyEventsForExtensions.value,
53045325
logger: this.baseLogger,
53055326
submitAddressedSignal: (
@@ -5309,7 +5330,7 @@ export class ContainerRuntime
53095330
this.submitExtensionSignal(id, addressChain, message);
53105331
},
53115332
getQuorum: this.getQuorum.bind(this),
5312-
getAudience: this.getAudience.bind(this),
5333+
getAudience: audience ? () => audience : this.getAudience.bind(this),
53135334
supportedFeatures: this.ILayerCompatDetails.supportedFeatures,
53145335
} satisfies ExtensionHost<TRuntimeProperties>;
53155336
entry = new factory(runtime, ...useContext);

packages/runtime/container-runtime/src/test/containerRuntime.extensions.spec.ts

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -269,8 +269,8 @@ describe("Runtime", () => {
269269

270270
assert.strictEqual(
271271
extension.connectedToService,
272-
false,
273-
"Extension should be disconnected during CatchingUp state",
272+
true,
273+
"Extension should be connected during CatchingUp state",
274274
);
275275
});
276276

@@ -379,22 +379,26 @@ describe("Runtime", () => {
379379

380380
// Transition to CatchingUp
381381
updateConnectionState(runtime, context, ConnectionState.CatchingUp, "mockClientId");
382+
assert.strictEqual(events.length, 1, "Should have received one joined event");
383+
assert.deepStrictEqual(
384+
events[0],
385+
{ type: "joined", clientId: "mockClientId", canWrite: false },
386+
"First event should be joined for reading",
387+
);
382388
assert.strictEqual(
383389
extension.connectedToService,
384-
false,
385-
"Extension should be disconnected during CatchingUp",
390+
true,
391+
"Extension should be connected during CatchingUp",
386392
);
387-
assert.strictEqual(events.length, 0, "Should have no events during CatchingUp");
388393

389394
// Transition to Connected
390395
updateConnectionState(runtime, context, ConnectionState.Connected, "mockClientId");
391-
assert.strictEqual(events.length, 1, "Should have received one joined events");
396+
assert.strictEqual(events.length, 2, "Should have received two events total");
392397
assert.deepStrictEqual(
393-
events[0],
394-
{ type: "joined", clientId: "mockClientId", canWrite: true },
395-
"First event should be joined for writing",
398+
events[1],
399+
{ type: "operabilityChanged", canWrite: true },
400+
"Second event should be operabilityChanged to can write",
396401
);
397-
398402
assert.strictEqual(
399403
extension.connectedToService,
400404
true,
@@ -403,11 +407,11 @@ describe("Runtime", () => {
403407

404408
// Transition back to Disconnected
405409
updateConnectionState(runtime, context, ConnectionState.Disconnected);
406-
assert.strictEqual(events.length, 2, "Should have received two events total");
410+
assert.strictEqual(events.length, 3, "Should have received three events total");
407411
assert.deepStrictEqual(
408-
events[1],
412+
events[2],
409413
{ type: "disconnected" },
410-
"Second event should be disconnected",
414+
"Third event should be disconnected",
411415
);
412416
assert.strictEqual(
413417
extension.connectedToService,
@@ -462,13 +466,18 @@ describe("Runtime", () => {
462466
);
463467
assert.strictEqual(
464468
readOnlyExtension.connectedToService,
465-
false,
466-
"Extension should be disconnected during CatchingUp",
469+
true,
470+
"Extension should be connected during CatchingUp",
467471
);
468472
assert.strictEqual(
469473
readOnlyEvents.length,
470-
0,
471-
"Should have no events during CatchingUp",
474+
1,
475+
"Should have received one event: joined for reading only",
476+
);
477+
assert.deepStrictEqual(
478+
readOnlyEvents[0],
479+
{ type: "joined", clientId: "mockClientId", canWrite: false },
480+
"Event should be joined for reading",
472481
);
473482

474483
// Transition to Connected
@@ -478,16 +487,7 @@ describe("Runtime", () => {
478487
ConnectionState.Connected,
479488
"mockClientId",
480489
);
481-
assert.strictEqual(
482-
readOnlyEvents.length,
483-
1,
484-
"Should have received one event: joined for reading only",
485-
);
486-
assert.deepStrictEqual(
487-
readOnlyEvents[0],
488-
{ type: "joined", clientId: "mockClientId", canWrite: false },
489-
"Event should be joined for reading",
490-
);
490+
assert.strictEqual(readOnlyEvents.length, 1, "Should have received one event total");
491491
assert.strictEqual(
492492
readOnlyExtension.connectedToService,
493493
true,

0 commit comments

Comments
 (0)