Skip to content

Commit 53cb5f2

Browse files
committed
feat(client): Container Extensions operation during CatchingUp
- "connectedToService" is emitted for CatchingUp (ahead of Connected) - Signal only Audience will be sent to provide information sooner and allow early activity. Presence limits quorum use to ordering of fallback Join responses, which are expected to happen later and often when quorum is more complete.
1 parent 5721d4a commit 53cb5f2

File tree

4 files changed

+103
-43
lines changed

4 files changed

+103
-43
lines changed

packages/framework/presence/src/presenceDatastoreManager.ts

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -186,11 +186,43 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
186186
this.targetedSignalSupport = this.runtime.supportedFeatures.has("submit_signals_v2");
187187
}
188188

189-
public joinSession(clientId: ClientConnectionId): void {
189+
private getInteractiveMembersExcludingSelf(selfClientId: ClientConnectionId): {
190+
all: Set<ClientConnectionId>;
191+
writers: Set<ClientConnectionId>;
192+
} {
193+
const audience = this.runtime.getAudience();
194+
const members = audience.getMembers();
195+
const all = new Set<ClientConnectionId>();
196+
const writers = new Set<ClientConnectionId>();
197+
// Remove self (if present)
198+
members.delete(selfClientId);
199+
// Gather interactive client IDs
200+
for (const [id, client] of members) {
201+
if (client.details.capabilities.interactive) {
202+
all.add(id);
203+
if (client.mode === "write") {
204+
writers.add(id);
205+
}
206+
}
207+
}
208+
return {
209+
all,
210+
writers,
211+
};
212+
}
213+
214+
public joinSession(selfClientId: ClientConnectionId): void {
215+
const interactiveMembersExcludingSelf =
216+
this.getInteractiveMembersExcludingSelf(selfClientId);
217+
190218
// Broadcast join message to all clients
191-
const updateProviders = [...this.runtime.getQuorum().getMembers().keys()].filter(
192-
(quorumClientId) => quorumClientId !== clientId,
193-
);
219+
// Select primary update providers
220+
// Use write members if any, then fallback to read-only members.
221+
const updateProviders = [
222+
...(interactiveMembersExcludingSelf.writers.size > 0
223+
? interactiveMembersExcludingSelf.writers
224+
: interactiveMembersExcludingSelf.all),
225+
];
194226
// Limit to three providers to prevent flooding the network.
195227
// If none respond, others present will (should) after a delay.
196228
if (updateProviders.length > 3) {
@@ -330,8 +362,8 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
330362
assert(clientConnectionId !== undefined, 0xa59 /* Client connected without clientId */);
331363
const currentClientToSessionValueState =
332364
// When connected, `clientToSessionId` must always have current connection entry.
333-
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
334-
this.datastore["system:presence"].clientToSessionId[clientConnectionId]!;
365+
this.datastore["system:presence"].clientToSessionId[clientConnectionId];
366+
assert(currentClientToSessionValueState !== undefined, "Client connection update missing");
335367

336368
const newMessage = {
337369
sendTimestamp: Date.now(),

packages/runtime/container-runtime-definitions/src/containerExtension.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ export interface ExtensionHost<TRuntimeProperties extends ExtensionRuntimeProper
282282
* @remarks
283283
* Status changes are signaled through :
284284
* - {@link ExtensionHostEvents.disconnected}: Transitioning to Disconnected state
285-
* - {@link ExtensionHostEvents.joined}: Transition to Connected state (either for reading or writing)
285+
* - {@link ExtensionHostEvents.joined}: Transition to CatchingUp or Connected state (either for reading or writing)
286286
* - {@link ExtensionHostEvents.operabilityChanged}: When operability has changed (e.g., write to read)
287287
*/
288288
readonly getJoinedStatus: () => JoinedStatus;
@@ -313,6 +313,13 @@ export interface ExtensionHost<TRuntimeProperties extends ExtensionRuntimeProper
313313
*/
314314
getQuorum: () => IQuorumClients;
315315

316+
/**
317+
* The collection of all clients as enumerated by the service.
318+
*
319+
* @remarks This may include/exclude those found within the quorum.
320+
* It produces results faster than {@link ExtensionHost.getQuorum}, but
321+
* will be inaccurate if any signals are lost.
322+
*/
316323
getAudience: () => IAudience;
317324
}
318325

packages/runtime/container-runtime/src/containerRuntime.ts

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1571,6 +1571,7 @@ export class ContainerRuntime
15711571
deltaManager,
15721572
quorum,
15731573
audience,
1574+
signalAudience,
15741575
pendingLocalState,
15751576
supportedFeatures,
15761577
snapshotWithContents,
@@ -1725,7 +1726,8 @@ export class ContainerRuntime
17251726
// Later updates come through calls to setConnectionState/Status.
17261727
this.canSendOps = connected;
17271728
this.canSendSignals = this.getConnectionState
1728-
? this.getConnectionState() === ConnectionState.Connected
1729+
? this.getConnectionState() === ConnectionState.Connected ||
1730+
this.getConnectionState() === ConnectionState.CatchingUp
17291731
: undefined;
17301732

17311733
this.mc.logger.sendTelemetryEvent({
@@ -2041,6 +2043,8 @@ export class ContainerRuntime
20412043
});
20422044
}
20432045

2046+
this.signalAudience = signalAudience;
2047+
20442048
const closeSummarizerDelayOverride = this.mc.config.getNumber(
20452049
"Fluid.ContainerRuntime.Test.CloseSummarizerDelayOverrideMs",
20462050
);
@@ -2950,7 +2954,7 @@ export class ContainerRuntime
29502954
* Emits service connection events based on connection state changes.
29512955
*
29522956
* @remarks
2953-
* "connectedToService" is emitted when container connection state transitions to 'Connected' regardless of connection mode.
2957+
* "connectedToService" is emitted when container connection state transitions to 'CatchingUp' or 'Connected' regardless of connection mode.
29542958
* "disconnectedFromService" excludes false "disconnected" events that happen when readonly client transitions to 'Connected'.
29552959
*/
29562960
private emitServiceConnectionEvents(
@@ -2962,16 +2966,20 @@ export class ContainerRuntime
29622966
return;
29632967
}
29642968

2965-
const canSendSignals = this.getConnectionState() === ConnectionState.Connected;
2969+
const connectionState = this.getConnectionState();
2970+
const canSendSignals =
2971+
connectionState === ConnectionState.Connected ||
2972+
connectionState === ConnectionState.CatchingUp;
29662973
const canSendSignalsChanged = this.canSendSignals !== canSendSignals;
29672974
this.canSendSignals = canSendSignals;
29682975
if (canSendSignalsChanged) {
2969-
// If canSendSignals changed, we either transitioned from Connected to Disconnected or CatchingUp to Connected
2976+
// If canSendSignals changed, we either transitioned from CatchingUp or
2977+
// Connected to Disconnected or EstablishingConnection to CatchingUp.
29702978
if (canSendSignals) {
2971-
// Emit for CatchingUp to Connected transition
2979+
// Emit for EstablishingConnection to CatchingUp or Connected transition
29722980
this.emit("connectedToService", clientId, canSendOps);
29732981
} else {
2974-
// Emit for Connected to Disconnected transition
2982+
// Emit for CatchingUp or Connected to Disconnected transition
29752983
this.emit("disconnectedFromService");
29762984
}
29772985
} else if (canSendOpsChanged) {
@@ -3734,6 +3742,14 @@ export class ContainerRuntime
37343742
return this._audience;
37353743
}
37363744

3745+
/**
3746+
* When defined, this {@link @fluidframework/container-definitions#IAudience}
3747+
* maintains member list using signals only.
3748+
* Thus "write" members may be known earlier than quorum and avoid noise from
3749+
* un-summarized quorum history.
3750+
*/
3751+
private readonly signalAudience?: IAudience;
3752+
37373753
/**
37383754
* Returns true of container is dirty, i.e. there are some pending local changes that
37393755
* either were not sent out to delta stream or were not yet acknowledged.
@@ -5278,7 +5294,11 @@ export class ContainerRuntime
52785294
const getConnectionState = this.getConnectionState;
52795295
if (getConnectionState) {
52805296
const connectionState = getConnectionState();
5281-
if (connectionState === ConnectionState.Connected) {
5297+
if (
5298+
connectionState === ConnectionState.Connected ||
5299+
connectionState === ConnectionState.CatchingUp
5300+
) {
5301+
// Note: when CatchingUp, canSendOps will always be false.
52825302
return this.canSendOps ? "joinedForWriting" : "joinedForReading";
52835303
}
52845304
} else if (this.canSendOps) {
@@ -5304,9 +5324,10 @@ export class ContainerRuntime
53045324
): T {
53055325
let entry = this.extensions.get(id);
53065326
if (entry === undefined) {
5327+
const audience = this.signalAudience;
53075328
const runtime = {
53085329
getJoinedStatus: this.getJoinedStatus.bind(this),
5309-
getClientId: () => this.clientId,
5330+
getClientId: audience ? () => audience.getSelf()?.clientId : () => this.clientId,
53105331
events: this.lazyEventsForExtensions.value,
53115332
logger: this.baseLogger,
53125333
submitAddressedSignal: (
@@ -5316,7 +5337,7 @@ export class ContainerRuntime
53165337
this.submitExtensionSignal(id, addressChain, message);
53175338
},
53185339
getQuorum: this.getQuorum.bind(this),
5319-
getAudience: this.getAudience.bind(this),
5340+
getAudience: audience ? () => audience : this.getAudience.bind(this),
53205341
supportedFeatures: this.ILayerCompatDetails.supportedFeatures,
53215342
} satisfies ExtensionHost<TRuntimeProperties>;
53225343
entry = new factory(runtime, ...useContext);

packages/runtime/container-runtime/src/test/containerRuntime.extensions.spec.ts

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -269,8 +269,8 @@ describe("Runtime", () => {
269269

270270
assert.strictEqual(
271271
extension.connectedToService,
272-
false,
273-
"Extension should be disconnected during CatchingUp state",
272+
true,
273+
"Extension should be connected during CatchingUp state",
274274
);
275275
});
276276

@@ -379,22 +379,26 @@ describe("Runtime", () => {
379379

380380
// Transition to CatchingUp
381381
updateConnectionState(runtime, context, ConnectionState.CatchingUp, "mockClientId");
382+
assert.strictEqual(events.length, 1, "Should have received one joined event");
383+
assert.deepStrictEqual(
384+
events[0],
385+
{ type: "joined", clientId: "mockClientId", canWrite: false },
386+
"First event should be joined for reading",
387+
);
382388
assert.strictEqual(
383389
extension.connectedToService,
384-
false,
385-
"Extension should be disconnected during CatchingUp",
390+
true,
391+
"Extension should be connected during CatchingUp",
386392
);
387-
assert.strictEqual(events.length, 0, "Should have no events during CatchingUp");
388393

389394
// Transition to Connected
390395
updateConnectionState(runtime, context, ConnectionState.Connected, "mockClientId");
391-
assert.strictEqual(events.length, 1, "Should have received one joined events");
396+
assert.strictEqual(events.length, 2, "Should have received two events total");
392397
assert.deepStrictEqual(
393-
events[0],
394-
{ type: "joined", clientId: "mockClientId", canWrite: true },
395-
"First event should be joined for writing",
398+
events[1],
399+
{ type: "operabilityChanged", canWrite: true },
400+
"Second event should be operabilityChanged to can write",
396401
);
397-
398402
assert.strictEqual(
399403
extension.connectedToService,
400404
true,
@@ -403,11 +407,11 @@ describe("Runtime", () => {
403407

404408
// Transition back to Disconnected
405409
updateConnectionState(runtime, context, ConnectionState.Disconnected);
406-
assert.strictEqual(events.length, 2, "Should have received two events total");
410+
assert.strictEqual(events.length, 3, "Should have received three events total");
407411
assert.deepStrictEqual(
408-
events[1],
412+
events[2],
409413
{ type: "disconnected" },
410-
"Second event should be disconnected",
414+
"Third event should be disconnected",
411415
);
412416
assert.strictEqual(
413417
extension.connectedToService,
@@ -462,13 +466,18 @@ describe("Runtime", () => {
462466
);
463467
assert.strictEqual(
464468
readOnlyExtension.connectedToService,
465-
false,
466-
"Extension should be disconnected during CatchingUp",
469+
true,
470+
"Extension should be connected during CatchingUp",
467471
);
468472
assert.strictEqual(
469473
readOnlyEvents.length,
470-
0,
471-
"Should have no events during CatchingUp",
474+
1,
475+
"Should have received one event: joined for reading only",
476+
);
477+
assert.deepStrictEqual(
478+
readOnlyEvents[0],
479+
{ type: "joined", clientId: "mockClientId", canWrite: false },
480+
"Event should be joined for reading",
472481
);
473482

474483
// Transition to Connected
@@ -478,16 +487,7 @@ describe("Runtime", () => {
478487
ConnectionState.Connected,
479488
"mockClientId",
480489
);
481-
assert.strictEqual(
482-
readOnlyEvents.length,
483-
1,
484-
"Should have received one event: joined for reading only",
485-
);
486-
assert.deepStrictEqual(
487-
readOnlyEvents[0],
488-
{ type: "joined", clientId: "mockClientId", canWrite: false },
489-
"Event should be joined for reading",
490-
);
490+
assert.strictEqual(readOnlyEvents.length, 1, "Should have received one event total");
491491
assert.strictEqual(
492492
readOnlyExtension.connectedToService,
493493
true,

0 commit comments

Comments
 (0)