Skip to content

Commit 7704407

Browse files
committed
fix: add cleanup state to compute function that triggers cleanups of underlying message iterator
1 parent b404f5a commit 7704407

File tree

1 file changed

+21
-17
lines changed

1 file changed

+21
-17
lines changed

src/computable/computable.ts

+21-17
Original file line numberDiff line numberDiff line change
@@ -11,30 +11,34 @@ async function* _compute<T extends ComputableFactory<any>[], U extends Normalize
1111
messages: AsyncIterableIterator<U>,
1212
...computables: T
1313
): AsyncIterableIterator<T extends ComputableFactory<infer Z>[] ? (U extends Disconnect ? U | Z | Disconnect : U | Z) : never> {
14-
const factory = new Computables(computables)
14+
try {
15+
const factory = new Computables(computables)
1516

16-
for await (const message of messages) {
17-
// always pass through source message
18-
yield message as any
17+
for await (const message of messages) {
18+
// always pass through source message
19+
yield message as any
1920

20-
if (message.type === 'disconnect') {
21-
// reset all computables for given exchange if we've received disconnect for it
22-
factory.reset(message.exchange)
23-
continue
24-
}
21+
if (message.type === 'disconnect') {
22+
// reset all computables for given exchange if we've received disconnect for it
23+
factory.reset(message.exchange)
24+
continue
25+
}
2526

26-
const normalizedMessage = message as NormalizedData
27-
const id = normalizedMessage.name !== undefined ? `${normalizedMessage.symbol}:${normalizedMessage.name}` : normalizedMessage.symbol
27+
const normalizedMessage = message as NormalizedData
28+
const id = normalizedMessage.name !== undefined ? `${normalizedMessage.symbol}:${normalizedMessage.name}` : normalizedMessage.symbol
2829

29-
const computablesMap = factory.getOrCreate(normalizedMessage.exchange, id)
30-
const computables = computablesMap[normalizedMessage.type]
31-
if (!computables) continue
30+
const computablesMap = factory.getOrCreate(normalizedMessage.exchange, id)
31+
const computables = computablesMap[normalizedMessage.type]
32+
if (!computables) continue
3233

33-
for (const computable of computables) {
34-
for (const computedMessage of computable.compute(normalizedMessage)) {
35-
yield computedMessage
34+
for (const computable of computables) {
35+
for (const computedMessage of computable.compute(normalizedMessage)) {
36+
yield computedMessage
37+
}
3638
}
3739
}
40+
} finally {
41+
messages.return!()
3842
}
3943
}
4044

0 commit comments

Comments
 (0)