Skip to content

Commit cbedc26

Browse files
committed
fix: cleaning up Monitor resources
1 parent ed6f3a3 commit cbedc26

File tree

4 files changed

+70
-48
lines changed

4 files changed

+70
-48
lines changed

src/QUICConnection.ts

+39-39
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ import type {
1313
VerifyCallback,
1414
} from './types';
1515
import type { Connection, ConnectionErrorCode, SendInfo } from './native/types';
16-
import { Lock, LockBox, Monitor, RWLockWriter } from '@matrixai/async-locks';
16+
import type { Monitor } from '@matrixai/async-locks';
17+
import { Lock, LockBox, RWLockWriter } from '@matrixai/async-locks';
1718
import {
1819
ready,
1920
running,
@@ -515,29 +516,34 @@ class QUICConnection extends EventTarget {
515516
this.logger.debug('streams destroyed');
516517
this.stopKeepAliveIntervalTimer();
517518

518-
mon = mon ?? new Monitor<RWLockWriter>(this.lockbox, RWLockWriter);
519519
// Trigger closing connection in the background and await close later.
520-
void mon.withF(this.lockCode, async (mon) => {
521-
// If this is already closed, then `Done` will be thrown
522-
// Otherwise it can send `CONNECTION_CLOSE` frame
523-
// This can be 0x1c close at the QUIC layer or no errors
524-
// Or it can be 0x1d for application close with an error
525-
// Upon receiving a `CONNECTION_CLOSE`, you can send back
526-
// 1 packet containing a `CONNECTION_CLOSE` frame too
527-
// (with `NO_ERROR` code if appropriate)
528-
// It must enter into a draining state, and no other packets can be sent
529-
try {
530-
this.conn.close(applicationError, errorCode, Buffer.from(errorMessage));
531-
// If we get a `Done` exception we don't bother calling send
532-
// The send only gets sent if the `Done` is not the case
533-
await this.send(mon);
534-
} catch (e) {
535-
// Ignore 'Done' if already closed
536-
if (e.message !== 'Done') {
537-
// No other exceptions are expected
538-
never();
520+
void utils.withMonitor(mon, this.lockbox, RWLockWriter, async (mon) => {
521+
await mon.withF(this.lockCode, async (mon) => {
522+
// If this is already closed, then `Done` will be thrown
523+
// Otherwise it can send `CONNECTION_CLOSE` frame
524+
// This can be 0x1c close at the QUIC layer or no errors
525+
// Or it can be 0x1d for application close with an error
526+
// Upon receiving a `CONNECTION_CLOSE`, you can send back
527+
// 1 packet containing a `CONNECTION_CLOSE` frame too
528+
// (with `NO_ERROR` code if appropriate)
529+
// It must enter into a draining state, and no other packets can be sent
530+
try {
531+
this.conn.close(
532+
applicationError,
533+
errorCode,
534+
Buffer.from(errorMessage),
535+
);
536+
// If we get a `Done` exception we don't bother calling send
537+
// The send only gets sent if the `Done` is not the case
538+
await this.send(mon);
539+
} catch (e) {
540+
// Ignore 'Done' if already closed
541+
if (e.message !== 'Done') {
542+
// No other exceptions are expected
543+
never();
544+
}
539545
}
540-
}
546+
});
541547
});
542548

543549
if (this.conn.isClosed()) {
@@ -750,17 +756,14 @@ class QUICConnection extends EventTarget {
750756
* Any errors must be emitted as events.
751757
* @internal
752758
*/
753-
public async send(
754-
mon: Monitor<RWLockWriter> = new Monitor<RWLockWriter>(
755-
this.lockbox,
756-
RWLockWriter,
757-
),
758-
): Promise<void> {
759-
if (!mon.isLocked(this.lockCode)) {
760-
return mon.withF(this.lockCode, async (mon) => {
761-
return this.send(mon);
762-
});
763-
}
759+
public async send(mon?: Monitor<RWLockWriter>): Promise<void> {
760+
await utils.withMonitor(mon, this.lockbox, RWLockWriter, async (mon) => {
761+
if (!mon.isLocked(this.lockCode)) {
762+
return mon.withF(this.lockCode, async (mon) => {
763+
return this.send(mon);
764+
});
765+
}
766+
});
764767

765768
const sendBuffer = new Uint8Array(quiche.MAX_DATAGRAM_SIZE);
766769
let sendLength: number;
@@ -914,17 +917,15 @@ class QUICConnection extends EventTarget {
914917
this.resolveClosedP();
915918
// If we are still running and not stopping then we need to stop
916919
if (this[running] && this[status] !== 'stopping') {
917-
const mon = new Monitor(this.lockbox, RWLockWriter);
918920
// Background stopping, we don't want to block the timer resolving
919-
void this.stop({ force: true }, mon);
921+
void this.stop({ force: true });
920922
}
921923
logger.debug('CLEANING UP TIMER');
922924
return;
923925
}
924926

925-
const mon = new Monitor(this.lockbox, RWLockWriter);
926927
// There may be data to send after timing out
927-
void this.send(mon);
928+
void this.send();
928929

929930
// Note that a `0` timeout is still a valid timeout
930931
const timeout = this.conn.timeout();
@@ -982,9 +983,8 @@ class QUICConnection extends EventTarget {
982983
// Intelligently schedule a PING frame.
983984
// If the connection has already sent ack-eliciting frames
984985
// then this is a noop.
985-
const mon = new Monitor(this.lockbox, RWLockWriter);
986986
this.conn.sendAckEliciting();
987-
await this.send(mon);
987+
await this.send();
988988
this.keepAliveIntervalTimer = new Timer({
989989
delay: ms,
990990
handler: keepAliveHandler,

src/QUICSocket.ts

+14-8
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import dgram from 'dgram';
66
import Logger from '@matrixai/logger';
77
import { running } from '@matrixai/async-init';
88
import { StartStop, ready } from '@matrixai/async-init/dist/StartStop';
9-
import { Monitor, RWLockWriter } from '@matrixai/async-locks';
9+
import { RWLockWriter } from '@matrixai/async-locks';
1010
import { status } from '@matrixai/async-init/dist/utils';
1111
import QUICConnectionId from './QUICConnectionId';
1212
import QUICConnectionMap from './QUICConnectionMap';
@@ -107,13 +107,19 @@ class QUICSocket extends EventTarget {
107107
// Acquire the conn lock, this ensures mutual exclusion
108108
// for state changes on the internal connection
109109
try {
110-
const mon = new Monitor<RWLockWriter>(connection.lockbox, RWLockWriter);
111-
await mon.withF(connection.lockCode, async (mon) => {
112-
// Even if we are `stopping`, the `quiche` library says we need to
113-
// continue processing any packets.
114-
await connection.recv(data, remoteInfo_, mon);
115-
await connection.send(mon);
116-
});
110+
await utils.withMonitor(
111+
undefined,
112+
connection.lockbox,
113+
RWLockWriter,
114+
async (mon) => {
115+
await mon.withF(connection.lockCode, async (mon) => {
116+
// Even if we are `stopping`, the `quiche` library says we need to
117+
// continue processing any packets.
118+
await connection.recv(data, remoteInfo_, mon);
119+
await connection.send(mon);
120+
});
121+
},
122+
);
117123
} catch (e) {
118124
// Race condition with destroying socket, just ignore
119125
if (!(e instanceof errors.ErrorQUICSocketNotRunning)) throw e;

src/utils.ts

+16
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ import type {
88
ServerCrypto,
99
} from './types';
1010
import type { Connection } from '@/native';
11+
import type { LockBox, RWLockWriter } from '@matrixai/async-locks';
1112
import dns from 'dns';
1213
import { IPv4, IPv6, Validator } from 'ip-num';
14+
import { Monitor } from '@matrixai/async-locks';
1315
import QUICConnectionId from './QUICConnectionId';
1416
import * as errors from './errors';
1517

@@ -428,6 +430,19 @@ function streamStats(
428430
`;
429431
}
430432

433+
async function withMonitor<T>(
434+
mon: Monitor<RWLockWriter> | undefined,
435+
lockBox: LockBox<RWLockWriter>,
436+
lockConstructor: { new (): RWLockWriter },
437+
fun: (mon: Monitor<RWLockWriter>) => Promise<T>,
438+
locksPending?: Map<string, { count: number }>,
439+
): Promise<T> {
440+
const _mon = mon ?? new Monitor(lockBox, lockConstructor, locksPending);
441+
const result = await fun(_mon);
442+
if (mon != null) await _mon.unlockAll();
443+
return result;
444+
}
445+
431446
export {
432447
isIPv4,
433448
isIPv6,
@@ -455,4 +470,5 @@ export {
455470
validateToken,
456471
sleep,
457472
streamStats,
473+
withMonitor,
458474
};

tests/utils.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,7 @@ async function generateCertificate({
470470
return await x509.X509CertificateGenerator.create(certConfig);
471471
}
472472

473-
// async function createTLSConfigWithChain(
473+
// Async function createTLSConfigWithChain(
474474
// keyPairs: Array<{
475475
// publicKey: JsonWebKey;
476476
// privateKey: JsonWebKey;

0 commit comments

Comments
 (0)