Skip to content

Commit 6af7ca6

Browse files
committed
fix: applying alternative fix for zombie streams by tracking used StreamIds
1 parent a8a0909 commit 6af7ca6

File tree

1 file changed

+81
-39
lines changed

1 file changed

+81
-39
lines changed

src/QUICConnection.ts

Lines changed: 81 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,26 @@ class QUICConnection {
104104
*/
105105
protected streamIdServerUni: StreamId = 0b11 as StreamId;
106106

107+
/**
108+
* Tracks the highest StreamId that has a created QUICStream for clientBidi
109+
*/
110+
protected streamIdUsedClientBidi = -1 as StreamId;
111+
112+
/**
113+
* Tracks the highest StreamId that has a created QUICStream for serverBidi
114+
*/
115+
protected streamIdUsedServerBidi = -1 as StreamId;
116+
117+
/**
118+
* Tracks the highest StreamId that has a created QUICStream for clientUni
119+
*/
120+
protected streamIdUsedClientUni = -1 as StreamId;
121+
122+
/**
123+
* Tracks the highest StreamId that has a created QUICStream for clientUni
124+
*/
125+
protected streamIdUsedServerUni = -1 as StreamId;
126+
107127
/**
108128
* Quiche connection timer. This performs time delayed state transitions.
109129
*/
@@ -967,6 +987,30 @@ class QUICConnection {
967987
}
968988
}
969989

990+
protected isStreamUsed(streamId: StreamId): boolean {
991+
const type = 0b11 & streamId;
992+
switch (type) {
993+
case 0b00:
994+
if (streamId <= this.streamIdUsedClientBidi) return true;
995+
this.streamIdUsedClientBidi = streamId;
996+
return false;
997+
case 0b01:
998+
if (streamId <= this.streamIdUsedServerBidi) return true;
999+
this.streamIdUsedServerBidi = streamId;
1000+
return false;
1001+
case 0b10:
1002+
if (streamId <= this.streamIdUsedClientUni) return true;
1003+
this.streamIdUsedClientUni = streamId;
1004+
return false;
1005+
case 0b11:
1006+
if (streamId <= this.streamIdUsedServerUni) return true;
1007+
this.streamIdUsedServerUni = streamId;
1008+
return false;
1009+
default:
1010+
utils.never('got an unexpected ID type');
1011+
}
1012+
}
1013+
9701014
protected processStreams() {
9711015
for (const streamId of this.conn.readable() as Iterable<StreamId>) {
9721016
let quicStream = this.streamMap.get(streamId);
@@ -985,7 +1029,9 @@ class QUICConnection {
9851029
);
9861030
continue;
9871031
}
988-
1032+
if (this.isStreamUsed(streamId)) {
1033+
utils.never('We should never repeat streamIds when creating streams');
1034+
}
9891035
quicStream = QUICStream.createQUICStream({
9901036
initiated: 'peer',
9911037
streamId,
@@ -1029,46 +1075,39 @@ class QUICConnection {
10291075
);
10301076
continue;
10311077
}
1032-
try {
1033-
this.conn.streamSend(streamId, Buffer.alloc(0), false);
1034-
} catch (e) {
1035-
// If we got `FinalSize` during the writable iterator then we cleaned up an errant stream
1036-
if (e.message === 'FinalSize') continue;
1037-
if (utils.isStreamStopped(e) !== false) {
1038-
// In this case it was a stream that was created but errored out. We want to create a new stream for this one case.
1039-
quicStream = QUICStream.createQUICStream({
1040-
initiated: 'peer',
1041-
streamId,
1042-
config: this.config,
1043-
connection: this,
1044-
codeToReason: this.codeToReason,
1045-
reasonToCode: this.reasonToCode,
1046-
logger: this.logger.getChild(`${QUICStream.name} ${streamId}`),
1047-
});
1048-
this.streamMap.set(quicStream.streamId, quicStream);
1049-
quicStream.addEventListener(
1050-
events.EventQUICStreamSend.name,
1051-
this.handleEventQUICStreamSend,
1052-
);
1053-
quicStream.addEventListener(
1054-
events.EventQUICStreamDestroyed.name,
1055-
this.handleEventQUICStreamDestroyed,
1056-
{ once: true },
1057-
);
1058-
quicStream.addEventListener(
1059-
EventAll.name,
1060-
this.handleEventQUICStream,
1061-
);
1062-
this.dispatchEvent(
1063-
new events.EventQUICConnectionStream({ detail: quicStream }),
1064-
);
1065-
quicStream.write();
1066-
continue;
1078+
if (this.isStreamUsed(streamId)) {
1079+
try {
1080+
this.conn.streamSend(streamId, new Uint8Array(), false);
1081+
} catch (e) {
1082+
// Both `StreamStopped()` and `FinalSize` errors means that the stream has ended and we cleaned up state
1083+
if (utils.isStreamStopped(e) !== false) continue;
1084+
if (e.message === 'FinalSize') continue;
1085+
throw e;
10671086
}
1068-
utils.never(`Expected to throw "FinalSize", got ${e.message}`);
1087+
utils.never('We never expect a duplicate stream to be readable');
10691088
}
1070-
utils.never(
1071-
'We never expect the stream to be writable if it was created during the writable iterator',
1089+
quicStream = QUICStream.createQUICStream({
1090+
initiated: 'peer',
1091+
streamId,
1092+
config: this.config,
1093+
connection: this,
1094+
codeToReason: this.codeToReason,
1095+
reasonToCode: this.reasonToCode,
1096+
logger: this.logger.getChild(`${QUICStream.name} ${streamId}`),
1097+
});
1098+
this.streamMap.set(quicStream.streamId, quicStream);
1099+
quicStream.addEventListener(
1100+
events.EventQUICStreamSend.name,
1101+
this.handleEventQUICStreamSend,
1102+
);
1103+
quicStream.addEventListener(
1104+
events.EventQUICStreamDestroyed.name,
1105+
this.handleEventQUICStreamDestroyed,
1106+
{ once: true },
1107+
);
1108+
quicStream.addEventListener(EventAll.name, this.handleEventQUICStream);
1109+
this.dispatchEvent(
1110+
new events.EventQUICConnectionStream({ detail: quicStream }),
10721111
);
10731112
}
10741113
quicStream.write();
@@ -1178,6 +1217,9 @@ class QUICConnection {
11781217
} else if (this.type === 'server' && type === 'uni') {
11791218
streamId = this.streamIdServerUni;
11801219
}
1220+
if (this.isStreamUsed(streamId!)) {
1221+
utils.never('We should never repeat streamIds when creating streams');
1222+
}
11811223
const quicStream = QUICStream.createQUICStream({
11821224
initiated: 'local',
11831225
streamId: streamId!,

0 commit comments

Comments
 (0)