-
Notifications
You must be signed in to change notification settings - Fork 219
/
Copy pathstream.js
108 lines (93 loc) · 2.44 KB
/
stream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// Attempt to use readable-stream if available, attempt to use the built-in stream module.
const ApiError = require("./error");
const {
EventSourceParserStream,
} = require("../vendor/eventsource-parser/stream");
/**
* A server-sent event.
*/
class ServerSentEvent {
/**
* Create a new server-sent event.
*
* @param {string} event The event name.
* @param {string} data The event data.
* @param {string} id The event ID.
* @param {number} retry The retry time.
*/
constructor(event, data, id, retry) {
this.event = event;
this.data = data;
this.id = id;
this.retry = retry;
}
/**
* Convert the event to a string.
*/
toString() {
if (this.event === "output") {
return this.data;
}
return "";
}
}
/**
* Create a new stream of server-sent events.
*
* @param {object} config
* @param {string} config.url The URL to connect to.
* @param {typeof fetch} [config.fetch] The URL to connect to.
* @param {object} [config.options] The EventSource options.
* @returns {ReadableStream<ServerSentEvent> & AsyncIterable<ServerSentEvent>}
*/
function createReadableStream({ url, fetch, options = {} }) {
return new ReadableStream({
async start(controller) {
const init = {
...options,
headers: {
...options.headers,
Accept: "text/event-stream",
},
};
const response = await fetch(url, init);
if (!response.ok) {
const text = await response.text();
const request = new Request(url, init);
controller.error(
new ApiError(
`Request to ${url} failed with status ${response.status}: ${text}`,
request,
response
)
);
}
const stream = response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream());
const reader = stream.getReader();
while (true) {
const { done, value: event } = await reader.read();
if (done) {
break;
}
if (event.event === "error") {
controller.error(new Error(event.data));
break;
}
controller.enqueue(
new ServerSentEvent(event.event, event.data, event.id)
);
if (event.event === "done") {
break;
}
}
reader.releaseLock();
controller.close();
},
});
}
module.exports = {
createReadableStream,
ServerSentEvent,
};