Skip to content

Commit c0e1555

Browse files
committed
feat(client): internal Signal only based Audience
that will be sent to Container Extensions to provide information sooner and allow early activity. Presence limits quorum use to ordering fallback Join responses which are expected to happen later and often when quorum is more complete.
1 parent 0513686 commit c0e1555

File tree

7 files changed

+130
-39
lines changed

7 files changed

+130
-39
lines changed

packages/common/container-definitions/api-report/container-definitions.legacy.beta.api.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@ export interface IContainerContext {
157157
// (undocumented)
158158
readonly quorum: IQuorumClients;
159159
readonly scope: FluidObject;
160+
// @system
161+
readonly signalAudience?: IAudience;
160162
readonly snapshotWithContents?: ISnapshot;
161163
// (undocumented)
162164
readonly storage: IContainerStorageService;

packages/common/container-definitions/src/runtime.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,16 @@ export interface IContainerContext {
302302
readonly deltaManager: IDeltaManager<ISequencedDocumentMessage, IDocumentMessage>;
303303
readonly quorum: IQuorumClients;
304304
readonly audience: IAudience;
305+
/**
306+
* Signal-based audience provides a view of the audience that only relies
307+
* on system signals which will be updated more quickly than
308+
* {@link IContainerContext#audience} that relies on ops for write clients.
309+
* Being signal-based the write members are inherently less reliable than
310+
* {@link IContainerContext#audience}.
311+
*
312+
* @system
313+
*/
314+
readonly signalAudience?: IAudience;
305315
readonly loader: ILoader;
306316
// The logger implementation, which would support tagged events, should be provided by the loader.
307317
readonly taggedLogger: ITelemetryBaseLogger;

packages/framework/presence/src/presenceDatastoreManager.ts

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -254,24 +254,29 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
254254

255255
private getAudienceInformation(selfClientId: ClientConnectionId): {
256256
audience: IAudience;
257-
otherMembers: Map<ClientConnectionId, IClient>;
258-
othersWorthIgnoring: ClientConnectionId[];
257+
interactiveMembersExcludingSelf: {
258+
all: Set<ClientConnectionId>;
259+
writers: Set<ClientConnectionId>;
260+
};
259261
} {
260262
const audience = this.runtime.getAudience();
261-
const otherMembers = audience.getMembers();
263+
const members = audience.getMembers();
264+
const all = new Set<ClientConnectionId>();
265+
const writers = new Set<ClientConnectionId>();
262266
// Remove self (if present)
263-
otherMembers.delete(selfClientId);
264-
// It is worth ignoring non-interactive clients
265-
const othersWorthIgnoring: ClientConnectionId[] = [];
266-
for (const [id, client] of otherMembers) {
267-
if (!client.details.capabilities.interactive) {
268-
othersWorthIgnoring.push(id);
267+
members.delete(selfClientId);
268+
// Gather interactive client IDs
269+
for (const [id, client] of members) {
270+
if (client.details.capabilities.interactive) {
271+
all.add(id);
272+
if (client.mode === "write") {
273+
writers.add(id);
274+
}
269275
}
270276
}
271277
return {
272278
audience,
273-
otherMembers,
274-
othersWorthIgnoring,
279+
interactiveMembersExcludingSelf: { all, writers },
275280
};
276281
}
277282

@@ -287,27 +292,22 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
287292
// Lack of others might mean that this client is very freshly joined
288293
// and has not received any Join Ops or Signals (type="join") from
289294
// the service yet.
290-
const { audience, otherMembers, othersWorthIgnoring } =
295+
const { audience, interactiveMembersExcludingSelf } =
291296
this.getAudienceInformation(selfClientId);
292297
// When no others (and not forced), delay join until there are.
293-
if (otherMembers.size === othersWorthIgnoring.length && !forceEvenWhenAlone) {
298+
if (interactiveMembersExcludingSelf.all.size === 0 && !forceEvenWhenAlone) {
294299
// No one else known to be present.
295300
this.deferJoinUntilAudienceMember(selfClientId, audience);
296301
return;
297302
}
298303

299304
// Broadcast join message to all clients
300305
// Select primary update providers
301-
const quorumMembers = this.runtime.getQuorum().getMembers();
302-
// Remove self and non-interactive members from possibilities
303-
othersWorthIgnoring.push(selfClientId);
304-
for (const clientIdToDelete of othersWorthIgnoring) {
305-
quorumMembers.delete(clientIdToDelete);
306-
otherMembers.delete(clientIdToDelete);
307-
}
308-
// Use quorum members if available, then fallback to audience.
306+
// Use write members if any, then fallback to read-only members.
309307
const updateProviders = [
310-
...(quorumMembers.size > 0 ? quorumMembers : otherMembers).keys(),
308+
...(interactiveMembersExcludingSelf.writers.size > 0
309+
? interactiveMembersExcludingSelf.writers
310+
: interactiveMembersExcludingSelf.all),
311311
];
312312
// Limit to three providers to prevent flooding the network.
313313
// If none respond, others present will (should) after a delay.
@@ -904,8 +904,8 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
904904
// Check if requestor count meets or exceeds count of other audience
905905
// members indicating that we effectively have a complete snapshot
906906
// (once the current message being processed is processed).
907-
const { otherMembers, othersWorthIgnoring } = this.getAudienceInformation(selfClientId);
908-
if (this.broadcastRequests.size >= otherMembers.size - othersWorthIgnoring.length) {
907+
const { interactiveMembersExcludingSelf } = this.getAudienceInformation(selfClientId);
908+
if (this.broadcastRequests.size >= interactiveMembersExcludingSelf.all.size) {
909909
// Note that no action is taken here specifically.
910910
// We want action to be queued so that it takes place after
911911
// current message is completely processed. All of the actions

packages/loader/container-loader/src/container.ts

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ import {
145145
type ProtocolHandlerBuilder,
146146
type ProtocolHandlerInternal,
147147
protocolHandlerShouldProcessSignal,
148+
wrapProtocolHandlerBuilder,
148149
} from "./protocol.js";
149150
import { initQuorumValuesFromCodeDetails } from "./quorum.js";
150151
import {
@@ -497,6 +498,7 @@ export class Container
497498
private readonly subLogger: ITelemetryLoggerExt;
498499
private readonly detachedBlobStorage: MemoryDetachedBlobStorage | undefined;
499500
private readonly protocolHandlerBuilder: InternalProtocolHandlerBuilder;
501+
private readonly signalAudience = new Audience();
500502
private readonly client: IClient;
501503

502504
private readonly mc: MonitoringContext;
@@ -825,20 +827,22 @@ export class Container
825827
// Tracking alternative ways to handle this in AB#4129.
826828
this.options = { ...options };
827829
this.scope = scope;
828-
this.protocolHandlerBuilder =
830+
this.protocolHandlerBuilder = wrapProtocolHandlerBuilder(
829831
protocolHandlerBuilder ??
830-
((
831-
attributes: IDocumentAttributes,
832-
quorumSnapshot: IQuorumSnapshot,
833-
sendProposal: (key: string, value: unknown) => number,
834-
): ProtocolHandlerInternal =>
835-
new ProtocolHandler(
836-
attributes,
837-
quorumSnapshot,
838-
sendProposal,
839-
new Audience(),
840-
(clientId: string) => this.clientsWhoShouldHaveLeft.has(clientId),
841-
));
832+
((
833+
attributes: IDocumentAttributes,
834+
quorumSnapshot: IQuorumSnapshot,
835+
sendProposal: (key: string, value: unknown) => number,
836+
): ProtocolHandlerInternal =>
837+
new ProtocolHandler(
838+
attributes,
839+
quorumSnapshot,
840+
sendProposal,
841+
new Audience(),
842+
(clientId: string) => this.clientsWhoShouldHaveLeft.has(clientId),
843+
)),
844+
this.signalAudience,
845+
);
842846

843847
// Note that we capture the createProps here so we can replicate the creation call when we want to clone.
844848
this.clone = async (
@@ -2222,6 +2226,13 @@ export class Container
22222226
const clientId = this.connectionStateHandler.clientId;
22232227
assert(clientId !== undefined, 0x96e /* there has to be clientId */);
22242228
this.protocolHandler.audience.setCurrentClientId(clientId);
2229+
this.signalAudience.setCurrentClientId(clientId);
2230+
} else if (this.connectionState === ConnectionState.CatchingUp) {
2231+
// Signal-based Audience does not wait for ops. So provide clientId
2232+
// as soon as possible.
2233+
const clientId = this.connectionStateHandler.pendingClientId;
2234+
assert(clientId !== undefined, "catching up without clientId");
2235+
this.signalAudience.setCurrentClientId(clientId);
22252236
}
22262237

22272238
// We communicate only transitions to Connected & Disconnected states, skipping all other states.

packages/loader/container-loader/src/protocol.ts

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,3 +212,52 @@ export function protocolHandlerShouldProcessSignal(
212212
}
213213
return false;
214214
}
215+
216+
export function wrapProtocolHandlerBuilder(
217+
builder: ProtocolHandlerBuilder,
218+
signalAudience: IAudienceOwner,
219+
): InternalProtocolHandlerBuilder {
220+
return (
221+
attributes: IDocumentAttributes,
222+
snapshot: IQuorumSnapshot,
223+
sendProposal: (key: string, value: unknown) => number,
224+
): ProtocolHandlerInternal => {
225+
const baseHandler = builder(attributes, snapshot, sendProposal);
226+
return {
227+
quorum: baseHandler.quorum,
228+
attributes: baseHandler.attributes,
229+
setConnectionState: baseHandler.setConnectionState.bind(baseHandler),
230+
snapshot: baseHandler.snapshot.bind(baseHandler),
231+
close: baseHandler.close.bind(baseHandler),
232+
processMessage: baseHandler.processMessage.bind(baseHandler),
233+
getProtocolState: baseHandler.getProtocolState.bind(baseHandler),
234+
audience: baseHandler.audience,
235+
processSignal: (message: AudienceSignal) => {
236+
const innerContent = message.content;
237+
switch (innerContent.type) {
238+
case SignalType.Clear: {
239+
const members = signalAudience.getMembers();
240+
for (const clientId of members.keys()) {
241+
signalAudience.removeMember(clientId);
242+
}
243+
break;
244+
}
245+
case SignalType.ClientJoin: {
246+
const newClient = innerContent.content;
247+
signalAudience.addMember(newClient.clientId, newClient.client);
248+
break;
249+
}
250+
case SignalType.ClientLeave: {
251+
const leftClientId = innerContent.content;
252+
signalAudience.removeMember(leftClientId);
253+
break;
254+
}
255+
default: {
256+
break;
257+
}
258+
}
259+
baseHandler.processSignal(message);
260+
},
261+
};
262+
};
263+
}

packages/runtime/container-runtime-definitions/src/containerExtension.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,13 @@ export interface ExtensionHost<TRuntimeProperties extends ExtensionRuntimeProper
274274
*/
275275
getQuorum: () => IQuorumClients;
276276

277+
/**
278+
* The collection of all clients as enumerated by the service.
279+
*
280+
* @remarks This may be a include/exclude those found within the quorum.
281+
* It produces results faster than {@link ExtensionHost.getQuorum}, but
282+
* will be inaccurate if any signals are lost.
283+
*/
277284
getAudience: () => IAudience;
278285
}
279286

packages/runtime/container-runtime/src/containerRuntime.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1533,6 +1533,7 @@ export class ContainerRuntime
15331533
deltaManager,
15341534
quorum,
15351535
audience,
1536+
signalAudience,
15361537
pendingLocalState,
15371538
supportedFeatures,
15381539
snapshotWithContents,
@@ -2002,6 +2003,8 @@ export class ContainerRuntime
20022003
});
20032004
}
20042005

2006+
this.signalAudience = signalAudience;
2007+
20052008
const closeSummarizerDelayOverride = this.mc.config.getNumber(
20062009
"Fluid.ContainerRuntime.Test.CloseSummarizerDelayOverrideMs",
20072010
);
@@ -3638,6 +3641,14 @@ export class ContainerRuntime
36383641
return this._audience;
36393642
}
36403643

3644+
/**
3645+
* When defined, this {@link @fluidframework/container-definitions#IAudience}
3646+
* maintains member list using signals only.
3647+
* Thus "write" members may be known earlier than quorum and avoid noise from
3648+
* un-summarized quorum history.
3649+
*/
3650+
private readonly signalAudience?: IAudience;
3651+
36413652
/**
36423653
* Returns true of container is dirty, i.e. there are some pending local changes that
36433654
* either were not sent out to delta stream or were not yet acknowledged.
@@ -5158,9 +5169,10 @@ export class ContainerRuntime
51585169
): T {
51595170
let entry = this.extensions.get(id);
51605171
if (entry === undefined) {
5172+
const audience = this.signalAudience;
51615173
const runtime = {
51625174
getJoinedStatus: this.getJoinedStatus.bind(this),
5163-
getClientId: () => this.clientId,
5175+
getClientId: audience ? () => audience.getSelf()?.clientId : () => this.clientId,
51645176
events: this.lazyEventsForExtensions.value,
51655177
logger: this.baseLogger,
51665178
submitAddressedSignal: (
@@ -5170,7 +5182,7 @@ export class ContainerRuntime
51705182
this.submitExtensionSignal(id, addressChain, message);
51715183
},
51725184
getQuorum: this.getQuorum.bind(this),
5173-
getAudience: this.getAudience.bind(this),
5185+
getAudience: audience ? () => audience : this.getAudience.bind(this),
51745186
supportedFeatures: this.ILayerCompatDetails.supportedFeatures,
51755187
} satisfies ExtensionHost<TRuntimeProperties>;
51765188
entry = new factory(runtime, ...useContext);

0 commit comments

Comments
 (0)