Skip to content

Commit b623f37

Browse files
committed
Switch to ReadableStream (by gpt)
1 parent ea11365 commit b623f37

File tree

1 file changed

+74
-110
lines changed

1 file changed

+74
-110
lines changed

lib/stream.js

+74-110
Original file line numberDiff line numberDiff line change
@@ -1,136 +1,100 @@
1-
// Attempt to use the native ReadbleStream if available, otherwise check for the `readable-stream` package or the built-in Node.js `stream` module.
2-
let Readable;
3-
4-
if (typeof ReadableStream === 'undefined') {
5-
try {
6-
Readable = require("readable-stream").Readable || require("stream").Readable;
7-
} catch (e) {
8-
Readable = null;
9-
}
10-
} else {
11-
Readable = ReadableStream;
12-
}
13-
14-
/**
15-
* A server-sent event.
16-
*/
171
class ServerSentEvent {
18-
/**
19-
* Create a new server-sent event.
20-
*
21-
* @param {string} event The event name.
22-
* @param {string} data The event data.
23-
* @param {string} id The event ID.
24-
* @param {number} retry The retry time.
25-
*/
262
constructor(event, data, id, retry) {
273
this.event = event;
284
this.data = data;
295
this.id = id;
306
this.retry = retry;
317
}
328

33-
/**
34-
* Convert the event to a string.
35-
*/
369
toString() {
37-
if (this.event === "output") {
38-
return this.data;
10+
let result = '';
11+
if (this.id) {
12+
result += `id: ${this.id}\n`;
3913
}
40-
41-
return "";
14+
if (this.event) {
15+
result += `event: ${this.event}\n`;
16+
}
17+
if (this.retry) {
18+
result += `retry: ${this.retry}\n`;
19+
}
20+
if (this.data) {
21+
result += `data: ${this.data}\n`;
22+
}
23+
result += '\n';
24+
return result;
4225
}
4326
}
4427

45-
/**
46-
* A stream of server-sent events.
47-
*/
48-
class Stream extends Readable {
49-
/**
50-
* Create a new stream of server-sent events.
51-
*
52-
* @param {string} url The URL to connect to.
53-
* @param {object} options The fetch options.
54-
*/
28+
class Stream {
5529
constructor(url, options) {
56-
if (!Readable) {
57-
throw new Error(
58-
"Readable streams are not supported. Please use Node.js 18 or later, or install the readable-stream package."
59-
);
60-
}
61-
62-
super();
6330
this.url = url;
6431
this.options = options;
32+
this.readableStream = new ReadableStream({
33+
start: async (controller) => {
34+
const response = await fetch(this.url, {
35+
...this.options,
36+
headers: {
37+
Accept: 'text/event-stream',
38+
},
39+
});
40+
const reader = response.body.getReader();
41+
let decoder = new TextDecoder();
42+
let eventBuffer = '';
43+
44+
const processChunk = (chunk) => {
45+
eventBuffer += decoder.decode(chunk, {stream: true});
46+
let eolIndex;
47+
while ((eolIndex = eventBuffer.indexOf('\n')) >= 0) {
48+
const line = eventBuffer.slice(0, eolIndex).trim();
49+
eventBuffer = eventBuffer.slice(eolIndex + 1);
50+
if (line === '') {
51+
// End of an event
52+
const event = this.parseEvent(eventBuffer);
53+
controller.enqueue(event);
54+
eventBuffer = '';
55+
} else {
56+
// Accumulate data
57+
eventBuffer += line + '\n';
58+
}
59+
}
60+
};
6561

66-
this.event = null;
67-
this.data = [];
68-
this.lastEventId = null;
69-
this.retry = null;
70-
}
62+
const push = async () => {
63+
const {done, value} = await reader.read();
64+
if (done) {
65+
controller.close();
66+
return;
67+
}
68+
processChunk(value);
69+
push();
70+
};
7171

72-
decode(line) {
73-
if (!line) {
74-
if (!this.event && !this.data.length && !this.lastEventId) {
75-
return null;
72+
push();
7673
}
77-
78-
const sse = new ServerSentEvent(
79-
this.event,
80-
this.data.join("\n"),
81-
this.lastEventId
82-
);
83-
84-
this.event = null;
85-
this.data = [];
86-
this.retry = null;
87-
88-
return sse;
89-
}
90-
91-
if (line.startsWith(":")) {
92-
return null;
93-
}
94-
95-
const [field, value] = line.split(": ");
96-
if (field === "event") {
97-
this.event = value;
98-
} else if (field === "data") {
99-
this.data.push(value);
100-
} else if (field === "id") {
101-
this.lastEventId = value;
102-
}
103-
104-
return null;
105-
}
106-
107-
async *[Symbol.asyncIterator]() {
108-
const response = await fetch(this.url, {
109-
...this.options,
110-
headers: {
111-
Accept: "text/event-stream",
112-
},
11374
});
75+
}
11476

115-
for await (const chunk of response.body) {
116-
const decoder = new TextDecoder("utf-8");
117-
const text = decoder.decode(chunk);
118-
const lines = text.split("\n");
119-
for (const line of lines) {
120-
const sse = this.decode(line);
121-
if (sse) {
122-
if (sse.event === "error") {
123-
throw new Error(sse.data);
124-
}
125-
126-
yield sse;
127-
128-
if (sse.event === "done") {
129-
return;
130-
}
131-
}
77+
parseEvent(rawData) {
78+
const lines = rawData.trim().split('\n');
79+
let event = 'message', data = '', id = null, retry = null;
80+
for (const line of lines) {
81+
const [fieldName, value] = line.split(/:(.*)/, 2);
82+
switch (fieldName) {
83+
case 'event':
84+
event = value.trim();
85+
break;
86+
case 'data':
87+
data += value.trim() + '\n';
88+
break;
89+
case 'id':
90+
id = value.trim();
91+
break;
92+
case 'retry':
93+
retry = parseInt(value.trim(), 10);
94+
break;
13295
}
13396
}
97+
return new ServerSentEvent(event, data.trim(), id, retry);
13498
}
13599
}
136100

0 commit comments

Comments
 (0)