Skip to content

Commit 85467bf

Browse files
committed
Use LastValueSink for Demultiplexer.
1 parent 2a51024 commit 85467bf

File tree

4 files changed

+14
-15
lines changed

4 files changed

+14
-15
lines changed

Diff for: modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -815,12 +815,12 @@ export class PostgresSyncRulesStorage
815815
const sink = new LastValueSink<string>(undefined);
816816

817817
const disposeListener = this.db.registerListener({
818-
notification: (notification) => sink.next(notification.payload)
818+
notification: (notification) => sink.write(notification.payload)
819819
});
820820

821821
signal.addEventListener('aborted', async () => {
822822
disposeListener();
823-
sink.complete();
823+
sink.end();
824824
});
825825

826826
yield this.makeActiveCheckpoint(doc);

Diff for: packages/service-core/src/streams/BroadcastIterable.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,13 @@ export class BroadcastIterable<T> implements AsyncIterable<T> {
5252
}
5353
this.last = doc;
5454
for (let sink of sinks) {
55-
sink.next(doc);
55+
sink.write(doc);
5656
}
5757
}
5858

5959
// End of stream
6060
for (let sink of sinks) {
61-
sink.complete();
61+
sink.end();
6262
}
6363
} catch (e) {
6464
// Just in case the error is not from the source

Diff for: packages/service-core/src/streams/Demultiplexer.ts

+8-9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { AbortError } from 'ix/aborterror.js';
2-
import { AsyncSink } from 'ix/asynciterable/asynciterablex.js';
32
import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js';
3+
import { LastValueSink } from './LastValueSink.js';
44

55
export interface DemultiplexerValue<T> {
66
/**
@@ -28,17 +28,16 @@ export type DemultiplexerSourceFactory<T> = (signal: AbortSignal) => Demultiplex
2828
* 1. We only start subscribing when there is a downstream subscriber.
2929
* 2. When all downstream subscriptions have ended, we end the source subscription.
3030
*
31-
* The Demultiplexer does not handle backpressure. If subscribers are slow, a queue may build up
32-
* for each.
31+
* For each subscriber, if backpressure builds up, we only keep the _last_ value.
3332
*/
3433
export class Demultiplexer<T> {
35-
private subscribers: Map<string, Set<AsyncSink<T>>> | undefined = undefined;
34+
private subscribers: Map<string, Set<LastValueSink<T>>> | undefined = undefined;
3635
private abortController: AbortController | undefined = undefined;
3736
private currentSource: DemultiplexerSource<T> | undefined = undefined;
3837

3938
constructor(private source: DemultiplexerSourceFactory<T>) {}
4039

41-
private start(filter: string, sink: AsyncSink<T>) {
40+
private start(filter: string, sink: LastValueSink<T>) {
4241
const abortController = new AbortController();
4342
const listeners = new Map();
4443
listeners.set(filter, new Set([sink]));
@@ -55,7 +54,7 @@ export class Demultiplexer<T> {
5554
private async loop(
5655
source: DemultiplexerSource<T>,
5756
abortController: AbortController,
58-
sinks: Map<string, Set<AsyncSink<T>>>
57+
sinks: Map<string, Set<LastValueSink<T>>>
5958
) {
6059
try {
6160
for await (let doc of source.iterator) {
@@ -98,7 +97,7 @@ export class Demultiplexer<T> {
9897
}
9998
}
10099

101-
private removeSink(key: string, sink: AsyncSink<T>) {
100+
private removeSink(key: string, sink: LastValueSink<T>) {
102101
const existing = this.subscribers?.get(key);
103102
if (existing == null) {
104103
return;
@@ -118,7 +117,7 @@ export class Demultiplexer<T> {
118117
}
119118
}
120119

121-
private addSink(key: string, sink: AsyncSink<T>) {
120+
private addSink(key: string, sink: LastValueSink<T>) {
122121
if (this.currentSource == null) {
123122
return this.start(key, sink);
124123
} else {
@@ -139,7 +138,7 @@ export class Demultiplexer<T> {
139138
* @param signal
140139
*/
141140
async *subscribe(key: string, signal: AbortSignal): AsyncIterable<T> {
142-
const sink = new AsyncSink<T>();
141+
const sink = new LastValueSink<T>(undefined);
143142
// Important that we register the sink before calling getFirstValue().
144143
const source = this.addSink(key, sink);
145144
try {

Diff for: packages/service-core/src/streams/LastValueSink.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@ export class LastValueSink<T> implements AsyncIterable<T> {
1717
}
1818
}
1919

20-
next(value: T) {
20+
write(value: T) {
2121
this.push({
2222
value,
2323
done: false,
2424
error: undefined
2525
});
2626
}
2727

28-
complete() {
28+
end() {
2929
this.push({
3030
value: undefined,
3131
done: true,

0 commit comments

Comments
 (0)