Skip to content

Commit 1418c50

Browse files
committed
Add tests; fix abort issue.
1 parent 85467bf commit 1418c50

File tree

3 files changed

+222
-8
lines changed

3 files changed

+222
-8
lines changed

packages/service-core/src/streams/Demultiplexer.ts

+10-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,16 @@ export interface DemultiplexerValue<T> {
1414
}
1515

1616
export interface DemultiplexerSource<T> {
17+
/**
18+
* The async iterator providing a stream of values.
19+
*/
1720
iterator: AsyncIterable<DemultiplexerValue<T>>;
21+
22+
/**
23+
* Fetches the first value for a given key.
24+
*
25+
* This is used to get an initial value for each subscription.
26+
*/
1827
getFirstValue(key: string): Promise<T>;
1928
}
2029

@@ -144,7 +153,7 @@ export class Demultiplexer<T> {
144153
try {
145154
const firstValue = await source.getFirstValue(key);
146155
yield firstValue;
147-
yield* wrapWithAbort(sink, signal);
156+
yield* sink.withSignal(signal);
148157
} finally {
149158
this.removeSink(key, sink);
150159
}

packages/service-core/test/src/broadcast_iterable.test.ts

+7-7
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ import { take } from 'ix/asynciterable/operators/take.js';
55
import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js';
66
import { toArray } from 'ix/asynciterable/toarray.js';
77
import * as timers from 'timers/promises';
8-
import { describe, expect, test } from 'vitest';
8+
import { describe, expect, it } from 'vitest';
99

1010
describe('BroadcastIterable', () => {
11-
test('should iterate', async () => {
11+
it('should iterate', async () => {
1212
const range = AsyncIterableX.from([1, 2, 3]);
1313
const broadcast = new BroadcastIterable(() => range);
1414

@@ -17,7 +17,7 @@ describe('BroadcastIterable', () => {
1717
expect(broadcast.active).toBe(false);
1818
});
1919

20-
test('should skip values if sink is slow', async () => {
20+
it('should skip values if sink is slow', async () => {
2121
const range = AsyncIterableX.from([1, 2, 3]);
2222
const broadcast = new BroadcastIterable(() => range);
2323

@@ -30,7 +30,7 @@ describe('BroadcastIterable', () => {
3030
expect(broadcast.active).toBe(false);
3131
});
3232

33-
test('should abort', async () => {
33+
it('should abort', async () => {
3434
const range = AsyncIterableX.from([1, 2, 3]);
3535
let recordedSignal: AbortSignal | undefined;
3636
const broadcast = new BroadcastIterable((signal) => {
@@ -46,7 +46,7 @@ describe('BroadcastIterable', () => {
4646
expect(recordedSignal!.aborted).toEqual(true);
4747
});
4848

49-
test('should handle indefinite sources', async () => {
49+
it('should handle indefinite sources', async () => {
5050
const source: IterableSource<number> = (signal) => {
5151
return wrapWithAbort(interval(1), signal);
5252
};
@@ -65,7 +65,7 @@ describe('BroadcastIterable', () => {
6565
expect(broadcast.active).toBe(false);
6666
});
6767

68-
test('should handle multiple subscribers', async () => {
68+
it('should handle multiple subscribers', async () => {
6969
let sourceIndex = 0;
7070
const source = async function* (signal: AbortSignal) {
7171
// Test value out by 1000 means it may have used the wrong iteration of the source
@@ -111,7 +111,7 @@ describe('BroadcastIterable', () => {
111111
expect(results3[4]).toBeLessThan(2145);
112112
});
113113

114-
test('should handle errors on multiple subscribers', async () => {
114+
it('should handle errors on multiple subscribers', async () => {
115115
let sourceIndex = 0;
116116
const source = async function* (signal: AbortSignal) {
117117
// Test value out by 1000 means it may have used the wrong iteration of the source
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
// Vitest Unit Tests
2+
import { Demultiplexer, DemultiplexerSource, DemultiplexerSourceFactory, DemultiplexerValue } from '@/index.js';
3+
import { delayEach } from 'ix/asynciterable/operators/delayeach.js';
4+
import { take } from 'ix/asynciterable/operators/take.js';
5+
import { toArray } from 'ix/asynciterable/toarray.js';
6+
import * as timers from 'node:timers/promises';
7+
import { describe, expect, it } from 'vitest';
8+
9+
describe('Demultiplexer', () => {
10+
it('should start subscription lazily and provide first value', async () => {
11+
const mockSource: DemultiplexerSourceFactory<string> = (signal: AbortSignal) => {
12+
const iterator = (async function* (): AsyncIterable<DemultiplexerValue<string>> {})();
13+
return {
14+
iterator,
15+
getFirstValue: async (key: string) => `first-${key}`
16+
};
17+
};
18+
19+
const demux = new Demultiplexer(mockSource);
20+
const signal = new AbortController().signal;
21+
22+
const iter = demux.subscribe('user1', signal)[Symbol.asyncIterator]();
23+
const result = await iter.next();
24+
expect(result.value).toBe('first-user1');
25+
});
26+
27+
it('should handle multiple subscribers to the same key', async () => {
28+
const iter = (async function* () {
29+
yield { key: 'user1', value: 'value1' };
30+
yield { key: 'user1', value: 'value2' };
31+
})();
32+
const source: DemultiplexerSource<string> = {
33+
iterator: iter,
34+
getFirstValue: async (key: string) => `first-${key}`
35+
};
36+
37+
const demux = new Demultiplexer(() => source);
38+
const signal = new AbortController().signal;
39+
40+
const iter1 = demux.subscribe('user1', signal)[Symbol.asyncIterator]();
41+
const iter2 = demux.subscribe('user1', signal)[Symbol.asyncIterator]();
42+
43+
// Due to only keeping the last value, some values are skipped
44+
expect(await iter1.next()).toEqual({ value: 'first-user1', done: false });
45+
expect(await iter1.next()).toEqual({ value: 'value1', done: false });
46+
expect(await iter1.next()).toEqual({ value: undefined, done: true });
47+
48+
expect(await iter2.next()).toEqual({ value: 'first-user1', done: false });
49+
expect(await iter2.next()).toEqual({ value: undefined, done: true });
50+
});
51+
52+
it('should handle multiple subscribers to the same key (2)', async () => {
53+
const p1 = Promise.withResolvers<void>();
54+
const p2 = Promise.withResolvers<void>();
55+
const p3 = Promise.withResolvers<void>();
56+
57+
const iter = (async function* () {
58+
await p1.promise;
59+
yield { key: 'user1', value: 'value1' };
60+
await p2.promise;
61+
yield { key: 'user1', value: 'value2' };
62+
await p3.promise;
63+
})();
64+
65+
const source: DemultiplexerSource<string> = {
66+
iterator: iter,
67+
getFirstValue: async (key: string) => `first-${key}`
68+
};
69+
70+
const demux = new Demultiplexer(() => source);
71+
const signal = new AbortController().signal;
72+
73+
const iter1 = demux.subscribe('user1', signal)[Symbol.asyncIterator]();
74+
const iter2 = demux.subscribe('user1', signal)[Symbol.asyncIterator]();
75+
76+
// Due to only keeping the last value, some values are skilled
77+
expect(await iter1.next()).toEqual({ value: 'first-user1', done: false });
78+
expect(await iter2.next()).toEqual({ value: 'first-user1', done: false });
79+
p1.resolve();
80+
81+
expect(await iter1.next()).toEqual({ value: 'value1', done: false });
82+
expect(await iter2.next()).toEqual({ value: 'value1', done: false });
83+
p2.resolve();
84+
85+
expect(await iter1.next()).toEqual({ value: 'value2', done: false });
86+
p3.resolve();
87+
88+
expect(await iter1.next()).toEqual({ value: undefined, done: true });
89+
expect(await iter2.next()).toEqual({ value: undefined, done: true });
90+
});
91+
92+
it('should handle multiple subscribers to different keys', async () => {
93+
const p1 = Promise.withResolvers<void>();
94+
const p2 = Promise.withResolvers<void>();
95+
const p3 = Promise.withResolvers<void>();
96+
97+
const iter = (async function* () {
98+
await p1.promise;
99+
yield { key: 'user1', value: 'value1' };
100+
await p2.promise;
101+
yield { key: 'user2', value: 'value2' };
102+
await p3.promise;
103+
})();
104+
105+
const source: DemultiplexerSource<string> = {
106+
iterator: iter,
107+
getFirstValue: async (key: string) => `first-${key}`
108+
};
109+
110+
const demux = new Demultiplexer(() => source);
111+
const signal = new AbortController().signal;
112+
113+
const iter1 = demux.subscribe('user1', signal)[Symbol.asyncIterator]();
114+
const iter2 = demux.subscribe('user2', signal)[Symbol.asyncIterator]();
115+
116+
// Due to only keeping the last value, some values are skilled
117+
expect(await iter1.next()).toEqual({ value: 'first-user1', done: false });
118+
expect(await iter2.next()).toEqual({ value: 'first-user2', done: false });
119+
p1.resolve();
120+
121+
expect(await iter1.next()).toEqual({ value: 'value1', done: false });
122+
p2.resolve();
123+
124+
expect(await iter2.next()).toEqual({ value: 'value2', done: false });
125+
p3.resolve();
126+
127+
expect(await iter1.next()).toEqual({ value: undefined, done: true });
128+
expect(await iter2.next()).toEqual({ value: undefined, done: true });
129+
});
130+
131+
it('should abort', async () => {
132+
const iter = (async function* () {
133+
yield { key: 'user1', value: 'value1' };
134+
yield { key: 'user1', value: 'value2' };
135+
})();
136+
137+
const source: DemultiplexerSource<string> = {
138+
iterator: iter,
139+
getFirstValue: async (key: string) => `first-${key}`
140+
};
141+
142+
const demux = new Demultiplexer(() => source);
143+
const controller = new AbortController();
144+
145+
const iter1 = demux.subscribe('user1', controller.signal)[Symbol.asyncIterator]();
146+
147+
expect(await iter1.next()).toEqual({ value: 'first-user1', done: false });
148+
controller.abort();
149+
150+
await expect(iter1.next()).rejects.toThrow('The operation has been aborted');
151+
});
152+
153+
it('should handle errors on multiple subscribers', async () => {
154+
let sourceIndex = 0;
155+
const sourceFn = async function* (signal: AbortSignal): AsyncIterable<DemultiplexerValue<number>> {
156+
// Test value out by 1000 means it may have used the wrong iteration of the source
157+
const base = (sourceIndex += 1000);
158+
const abortedPromise = new Promise((resolve) => {
159+
signal.addEventListener('abort', resolve, { once: true });
160+
});
161+
for (let i = 0; !signal.aborted; i++) {
162+
if (base + i == 1005) {
163+
throw new Error('simulated failure');
164+
}
165+
yield { key: 'u1', value: base + i };
166+
await Promise.race([abortedPromise, timers.setTimeout(1)]);
167+
}
168+
// Test value out by 100 means this wasn't reached
169+
sourceIndex += 100;
170+
};
171+
172+
const sourceFactory: DemultiplexerSourceFactory<number> = (signal) => {
173+
const source: DemultiplexerSource<number> = {
174+
iterator: sourceFn(signal),
175+
getFirstValue: async (key: string) => -1
176+
};
177+
return source;
178+
};
179+
const demux = new Demultiplexer(sourceFactory);
180+
181+
const controller = new AbortController();
182+
183+
const delayed1 = delayEach(9)(demux.subscribe('u1', controller.signal));
184+
const delayed2 = delayEach(10)(demux.subscribe('u1', controller.signal));
185+
expect(demux.active).toBe(false);
186+
const results1Promise = toArray(take(5)(delayed1)) as Promise<number[]>;
187+
const results2Promise = toArray(take(5)(delayed2)) as Promise<number[]>;
188+
189+
const [r1, r2] = await Promise.allSettled([results1Promise, results2Promise]);
190+
191+
expect(r1).toEqual({ status: 'rejected', reason: new Error('simulated failure') });
192+
expect(r2).toEqual({ status: 'rejected', reason: new Error('simulated failure') });
193+
194+
expect(demux.active).toBe(false);
195+
196+
// This starts a new source
197+
const delayed3 = delayEach(10)(demux.subscribe('u1', controller.signal));
198+
const results3 = await toArray(take(6)(delayed3));
199+
expect(results3.length).toEqual(6);
200+
expect(results3[0]).toEqual(-1); // Initial value
201+
// There should be approximately 10ms between each value, but we allow for some slack
202+
expect(results3[5]).toBeGreaterThan(2005);
203+
expect(results3[5]).toBeLessThan(2200);
204+
});
205+
});

0 commit comments

Comments
 (0)