Skip to content

Commit a786501

Browse files
committed
lib: diagnostics_channel use AsyncLocalStorage for suppression context
Refs: #63623 Refs: #63651 Signed-off-by: Divyanshu Sharma <divyanshu88999@gmail.com>
1 parent 7b20b8a commit a786501

1 file changed

Lines changed: 101 additions & 19 deletions

File tree

lib/diagnostics_channel.js

Lines changed: 101 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
const {
44
ArrayPrototypeAt,
5-
ArrayPrototypeIndexOf,
65
ArrayPrototypePush,
76
ArrayPrototypePushApply,
87
ArrayPrototypeSlice,
@@ -15,6 +14,7 @@ const {
1514
ReflectApply,
1615
SafeFinalizationRegistry,
1716
SafeMap,
17+
SafeSet,
1818
SymbolDispose,
1919
SymbolHasInstance,
2020
} = primordials;
@@ -36,6 +36,32 @@ const { subscribers: subscriberCounts } = dc_binding;
3636
const { WeakReference } = require('internal/util');
3737
const { isPromise } = require('internal/util/types');
3838

39+
// Internal only: tracks a Set of active suppression keys for the current async
40+
// context. Uses a simple stack-based approach to avoid bootstrap issues with
41+
// async_hooks. This is a simplified implementation that works for typical usage.
42+
let suppressionStorage = null;
43+
44+
function getSuppressionsStorage() {
45+
if (suppressionStorage === null) {
46+
try {
47+
const { AsyncLocalStorage } = require('async_hooks');
48+
suppressionStorage = new AsyncLocalStorage();
49+
} catch {
50+
// If AsyncLocalStorage fails to initialize (rare), use a fallback
51+
// that won't provide async context isolation but at least works
52+
suppressionStorage = false; // Marker for "tried and failed"
53+
}
54+
}
55+
return suppressionStorage || undefined;
56+
}
57+
58+
function withSuppressionsContext(set, fn, thisArg, args) {
59+
const storage = getSuppressionsStorage();
60+
if (storage) {
61+
return storage.run(set, () => ReflectApply(fn, thisArg, args));
62+
}
63+
return ReflectApply(fn, thisArg, args);
64+
}
3965
// Can't delete when weakref count reaches 0 as it could increment again.
4066
// Only GC can be used as a valid time to clean up the channels map.
4167
class WeakRefMap extends SafeMap {
@@ -93,9 +119,17 @@ class RunStoresScope {
93119

94120
// Enter stores using withScope
95121
if (activeChannel._stores) {
122+
const storage = getSuppressionsStorage();
123+
const activeKeys = storage ? storage.getStore() : undefined;
96124
for (const entry of activeChannel._stores.entries()) {
97125
const store = entry[0];
98-
const transform = entry[1];
126+
const { transform, suppressedBy = null } = entry[1];
127+
128+
// Skip this bound store if it opted into suppression and its key
129+
// is active in the current async context.
130+
if (suppressedBy !== null && activeKeys?.has(suppressedBy)) {
131+
continue;
132+
}
99133

100134
let newContext = data;
101135
if (transform) {
@@ -127,16 +161,32 @@ class RunStoresScope {
127161

128162
// TODO(qard): should there be a C++ channel interface?
129163
class ActiveChannel {
130-
subscribe(subscription) {
164+
subscribe(subscription, options = {}) {
131165
validateFunction(subscription, 'subscription');
166+
const suppressedBy = options && options.suppressedBy !== undefined ? options.suppressedBy : null;
167+
if (suppressedBy !== null) {
168+
const t = typeof suppressedBy;
169+
if (t === 'string' || t === 'number' || t === 'bigint' || t === 'boolean') {
170+
throw new ERR_INVALID_ARG_TYPE('suppressedBy', ['object', 'symbol', 'function'], suppressedBy);
171+
}
172+
}
173+
174+
const handler = subscription;
132175
this._subscribers = ArrayPrototypeSlice(this._subscribers);
133-
ArrayPrototypePush(this._subscribers, subscription);
176+
ArrayPrototypePush(this._subscribers, { handler, suppressedBy });
134177
channels.incRef(this.name);
135178
if (this._index !== undefined) subscriberCounts[this._index]++;
136179
}
137180

138181
unsubscribe(subscription) {
139-
const index = ArrayPrototypeIndexOf(this._subscribers, subscription);
182+
// Find subscriber entry by handler identity.
183+
let index = -1;
184+
for (let i = 0; i < (this._subscribers?.length || 0); i++) {
185+
if (this._subscribers[i].handler === subscription) {
186+
index = i;
187+
break;
188+
}
189+
}
140190
if (index === -1) return false;
141191

142192
const before = ArrayPrototypeSlice(this._subscribers, 0, index);
@@ -151,13 +201,21 @@ class ActiveChannel {
151201
return true;
152202
}
153203

154-
bindStore(store, transform) {
204+
bindStore(store, transform, options = {}) {
205+
const suppressedBy = options && options.suppressedBy !== undefined ? options.suppressedBy : null;
206+
if (suppressedBy !== null) {
207+
const t = typeof suppressedBy;
208+
if (t === 'string' || t === 'number' || t === 'bigint' || t === 'boolean') {
209+
throw new ERR_INVALID_ARG_TYPE('suppressedBy', ['object', 'symbol', 'function'], suppressedBy);
210+
}
211+
}
212+
155213
const replacing = this._stores.has(store);
156214
if (!replacing) {
157215
channels.incRef(this.name);
158216
if (this._index !== undefined) subscriberCounts[this._index]++;
159217
}
160-
this._stores.set(store, transform);
218+
this._stores.set(store, { transform, suppressedBy });
161219
}
162220

163221
unbindStore(store) {
@@ -180,10 +238,15 @@ class ActiveChannel {
180238

181239
publish(data) {
182240
const subscribers = this._subscribers;
241+
const storage = getSuppressionsStorage();
242+
const activeKeys = storage ? storage.getStore() : undefined;
183243
for (let i = 0; i < (subscribers?.length || 0); i++) {
184244
try {
185-
const onMessage = subscribers[i];
186-
onMessage(data, this.name);
245+
const { handler, suppressedBy = null } = subscribers[i];
246+
if (suppressedBy !== null && activeKeys?.has(suppressedBy)) {
247+
continue;
248+
}
249+
handler(data, this.name);
187250
} catch (err) {
188251
process.nextTick(() => {
189252
triggerUncaughtException(err, false);
@@ -221,18 +284,18 @@ class Channel {
221284
prototype === ActiveChannel.prototype;
222285
}
223286

224-
subscribe(subscription) {
287+
subscribe(subscription, options) {
225288
markActive(this);
226-
this.subscribe(subscription);
289+
this.subscribe(subscription, options);
227290
}
228291

229292
unsubscribe() {
230293
return false;
231294
}
232295

233-
bindStore(store, transform) {
296+
bindStore(store, transform, options) {
234297
markActive(this);
235-
this.bindStore(store, transform);
298+
this.bindStore(store, transform, options);
236299
}
237300

238301
unbindStore() {
@@ -366,12 +429,12 @@ class BoundedChannel {
366429
this.end?.hasSubscribers;
367430
}
368431

369-
subscribe(handlers) {
432+
subscribe(handlers, options) {
370433
for (let i = 0; i < boundedEvents.length; ++i) {
371434
const name = boundedEvents[i];
372435
if (!handlers[name]) continue;
373436

374-
this[name]?.subscribe(handlers[name]);
437+
this[name]?.subscribe(handlers[name], options);
375438
}
376439
}
377440

@@ -458,26 +521,26 @@ class TracingChannel {
458521
this.error?.hasSubscribers;
459522
}
460523

461-
subscribe(handlers) {
524+
subscribe(handlers, options) {
462525
// Subscribe to call window (start/end)
463526
if (handlers.start || handlers.end) {
464527
this.#callWindow.subscribe({
465528
start: handlers.start,
466529
end: handlers.end,
467-
});
530+
}, options);
468531
}
469532

470533
// Subscribe to continuation window (asyncStart/asyncEnd)
471534
if (handlers.asyncStart || handlers.asyncEnd) {
472535
this.#continuationWindow.subscribe({
473536
start: handlers.asyncStart,
474537
end: handlers.asyncEnd,
475-
});
538+
}, options);
476539
}
477540

478541
// Subscribe to error channel
479542
if (handlers.error) {
480-
this.error.subscribe(handlers.error);
543+
this.error.subscribe(handlers.error, options);
481544
}
482545
}
483546

@@ -633,10 +696,29 @@ function tracingChannel(nameOrChannels) {
633696

634697
dc_binding.linkNativeChannel((name) => channel(name));
635698

699+
function suppressed(key, fn, thisArg, ...args) {
700+
validateFunction(fn, 'fn');
701+
702+
if (key === null || key === undefined) {
703+
throw new ERR_INVALID_ARG_TYPE('key', ['object', 'symbol', 'function'], key);
704+
}
705+
const t = typeof key;
706+
if (t === 'string' || t === 'number' || t === 'bigint' || t === 'boolean') {
707+
throw new ERR_INVALID_ARG_TYPE('key', ['object', 'symbol', 'function'], key);
708+
}
709+
710+
const storage = getSuppressionsStorage();
711+
const currentSet = storage ? storage.getStore() : undefined;
712+
const next = currentSet ? new SafeSet(currentSet) : new SafeSet();
713+
next.add(key);
714+
return withSuppressionsContext(next, fn, thisArg, args);
715+
}
716+
636717
module.exports = {
637718
channel,
638719
hasSubscribers,
639720
subscribe,
721+
suppressed,
640722
tracingChannel,
641723
unsubscribe,
642724
boundedChannel,

0 commit comments

Comments
 (0)