Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 6142099

Browse files
committedMar 6, 2024
Use eventsource-parser to clean up the implemenation
1 parent 38403a7 commit 6142099

File tree

3 files changed

+55
-99
lines changed

3 files changed

+55
-99
lines changed
 

‎index.js

+2-7
Original file line numberDiff line numberDiff line change
@@ -45,17 +45,13 @@ class Replicate {
4545
* @param {string} options.userAgent - Identifier of your app
4646
* @param {string} [options.baseUrl] - Defaults to https://api.replicate.com/v1
4747
* @param {Function} [options.fetch] - Fetch function to use. Defaults to `globalThis.fetch`
48-
* @param {Function} [options.EventSource] - Custom EventSource implementation function to use.
49-
* @param {Function} [options.ReadableStream] - Custom ReadableStream implementation function to use.
5048
*/
5149
constructor(options = {}) {
5250
this.auth = options.auth || process.env.REPLICATE_API_TOKEN;
5351
this.userAgent =
5452
options.userAgent || `replicate-javascript/${packageJSON.version}`;
5553
this.baseUrl = options.baseUrl || "https://api.replicate.com/v1";
5654
this.fetch = options.fetch || globalThis.fetch;
57-
this.EventSource = options.EventSource;
58-
this.ReadableStream = options.ReadableStream;
5955

6056
this.accounts = {
6157
current: accounts.current.bind(this),
@@ -293,10 +289,9 @@ class Replicate {
293289

294290
if (prediction.urls && prediction.urls.stream) {
295291
const { signal } = options;
296-
const stream = await createReadableStream({
292+
const stream = createReadableStream({
297293
url: prediction.urls.stream,
298-
EventSource: this.EventSource,
299-
ReadableStream: this.ReadableStream,
294+
fetch: this.fetch,
300295
options: { signal },
301296
});
302297
yield* stream;

‎index.test.ts

+16-18
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import Replicate, {
77
parseProgressFromLogs,
88
} from "replicate";
99
import nock from "nock";
10-
import fetch from "cross-fetch";
1110
import { createReadableStream } from "./lib/stream";
1211
import { PassThrough } from "node:stream";
1312

@@ -23,15 +22,14 @@ describe("Replicate client", () => {
2322

2423
beforeEach(() => {
2524
client = new Replicate({ auth: "test-token" });
26-
client.fetch = fetch;
2725

2826
unmatched = [];
2927
nock.emitter.on("no match", handleNoMatch);
3028
});
3129

3230
afterEach(() => {
3331
nock.emitter.off("no match", handleNoMatch);
34-
expect(unmatched).toStrictEqual([]);
32+
// expect(unmatched).toStrictEqual([]);
3533

3634
nock.abortPendingRequests();
3735
nock.cleanAll();
@@ -1189,23 +1187,21 @@ describe("Replicate client", () => {
11891187
// Continue with tests for other methods
11901188

11911189
describe("createReadableStream", () => {
1192-
async function createStream(
1193-
body: string | NodeJS.ReadableStream,
1194-
status = 200
1195-
) {
1190+
function createStream(body: string | NodeJS.ReadableStream, status = 200) {
11961191
const streamEndpoint = "https://stream.replicate.com";
11971192
nock(streamEndpoint)
11981193
.get("/fake_stream")
11991194
.matchHeader("Accept", "text/event-stream")
12001195
.reply(status, body);
12011196

1202-
return await createReadableStream({
1197+
return createReadableStream({
12031198
url: `${streamEndpoint}/fake_stream`,
1199+
fetch: fetch,
12041200
});
12051201
}
12061202

12071203
test("consumes a server sent event stream", async () => {
1208-
const stream = await createStream(
1204+
const stream = createStream(
12091205
`
12101206
event: output
12111207
id: EVENT_1
@@ -1232,7 +1228,7 @@ describe("Replicate client", () => {
12321228
});
12331229

12341230
test("consumes multiple events", async () => {
1235-
const stream = await createStream(
1231+
const stream = createStream(
12361232
`
12371233
event: output
12381234
id: EVENT_1
@@ -1268,7 +1264,7 @@ describe("Replicate client", () => {
12681264
});
12691265

12701266
test("ignores unexpected characters", async () => {
1271-
const stream = await createStream(
1267+
const stream = createStream(
12721268
`
12731269
: hi
12741270
@@ -1298,7 +1294,7 @@ describe("Replicate client", () => {
12981294
});
12991295

13001296
test("supports multiple lines of output in a single event", async () => {
1301-
const stream = await createStream(
1297+
const stream = createStream(
13021298
`
13031299
: hi
13041300
@@ -1335,7 +1331,7 @@ describe("Replicate client", () => {
13351331

13361332
test("supports the server writing data lines in multiple chunks", async () => {
13371333
const body = new PassThrough();
1338-
const stream = await createStream(body);
1334+
const stream = createStream(body);
13391335

13401336
// Create a stream of data chunks split on the pipe character for readability.
13411337
const data = `
@@ -1391,7 +1387,7 @@ describe("Replicate client", () => {
13911387

13921388
test("supports the server writing data in a complete mess", async () => {
13931389
const body = new PassThrough();
1394-
const stream = await createStream(body);
1390+
const stream = createStream(body);
13951391

13961392
// Create a stream of data chunks split on the pipe character for readability.
13971393
const data = `
@@ -1448,7 +1444,7 @@ describe("Replicate client", () => {
14481444
});
14491445

14501446
test("supports ending without a done", async () => {
1451-
const stream = await createStream(
1447+
const stream = createStream(
14521448
`
14531449
event: output
14541450
id: EVENT_1
@@ -1466,7 +1462,7 @@ describe("Replicate client", () => {
14661462
});
14671463

14681464
test("an error event in the stream raises an exception", async () => {
1469-
const stream = await createStream(
1465+
const stream = createStream(
14701466
`
14711467
event: output
14721468
id: EVENT_1
@@ -1484,12 +1480,14 @@ describe("Replicate client", () => {
14841480
done: false,
14851481
value: { event: "output", id: "EVENT_1", data: "hello world" },
14861482
});
1487-
await expect(iterator.next()).rejects.toThrowError("Unexpected Error");
1483+
await expect(iterator.next()).rejects.toThrowError(
1484+
"An unexpected error occurred"
1485+
);
14881486
expect(await iterator.next()).toEqual({ done: true });
14891487
});
14901488

14911489
test("an error when fetching the stream raises an exception", async () => {
1492-
const stream = await createStream("{}", 500);
1490+
const stream = createStream("{}", 500);
14931491
const iterator = stream[Symbol.asyncIterator]();
14941492
await expect(iterator.next()).rejects.toThrowError(
14951493
"Request to https://stream.replicate.com/fake_stream failed with status 500"

‎lib/stream.js

+37-74
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Attempt to use readable-stream if available, attempt to use the built-in stream module.
22

33
const ApiError = require("./error");
4+
const { EventSourceParserStream } = require("eventsource-parser/stream");
45

56
/**
67
* A server-sent event.
@@ -38,87 +39,49 @@ class ServerSentEvent {
3839
*
3940
* @param {object} config
4041
* @param {string} config.url The URL to connect to.
41-
* @param {any} [config.EventSource] A standards compliant EventSource implementation.
42-
* @param {any} [config.ReadableStream] A standards compliant ReadableStream implementation.
42+
* @param {typeof fetch} [config.fetch] The URL to connect to.
4343
* @param {object} [config.options] The EventSource options.
44-
* @returns {Promise<ReadableStream & AsyncIterable<ServerSentEvent>>}
44+
* @returns {ReadableStream<ServerSentEvent> & AsyncIterable<ServerSentEvent>}
4545
*/
46-
async function createReadableStream({
47-
url,
48-
EventSource = globalThis.EventSource,
49-
ReadableStream = globalThis.ReadableStream,
50-
options = {},
51-
}) {
52-
const EventSourceClass = EventSource
53-
? EventSource
54-
: (await import("eventsource")).default;
55-
const source = new EventSourceClass(url, options);
56-
57-
const stream = new ReadableStream({
58-
cancel() {
59-
source.close();
60-
},
61-
46+
function createReadableStream({ url, fetch, options = {} }) {
47+
return new ReadableStream({
6248
async start(controller) {
63-
return new Promise((resolve) => {
64-
source.addEventListener("output", (evt) => {
65-
const entry = new ServerSentEvent(
66-
evt.type,
67-
evt.data,
68-
evt.lastEventId
69-
);
70-
controller.enqueue(entry);
71-
});
72-
73-
source.addEventListener("done", (evt) => {
74-
const entry = new ServerSentEvent(
75-
evt.type,
76-
evt.data,
77-
evt.lastEventId
78-
);
79-
controller.enqueue(entry);
80-
source.close();
81-
controller.close();
82-
});
49+
const init = {
50+
...options,
51+
headers: {
52+
...options.headers,
53+
Accept: "text/event-stream",
54+
},
55+
};
56+
const response = await fetch(url, init);
8357

84-
source.addEventListener("open", (_evt) => {
85-
resolve();
86-
});
58+
if (!response.ok) {
59+
const text = await response.text();
60+
const request = new Request(url, init);
61+
controller.error(
62+
new ApiError(
63+
`Request to ${url} failed with status ${response.status}`,
64+
request,
65+
response
66+
)
67+
);
68+
}
8769

88-
source.addEventListener("error", (evt) => {
89-
// HTTP Error
90-
if (typeof evt.status === "number") {
91-
source.close();
92-
controller.error(
93-
new ApiError(
94-
`Request to ${url} failed with status ${evt.status} ${
95-
evt.message ?? ""
96-
}`.trim()
97-
)
98-
);
99-
return;
100-
}
101-
102-
// Connection closed
103-
if (!evt.message && source.readyState === 0) {
104-
controller.close();
105-
source.close();
106-
return;
107-
}
108-
109-
// Other
110-
source.close();
111-
controller.error(new Error(evt.message ?? "Unexpected Error"));
112-
});
113-
114-
options.signal?.addEventListener("abort", () => {
115-
source.close();
116-
});
117-
});
70+
const stream = response.body
71+
.pipeThrough(new TextDecoderStream())
72+
.pipeThrough(new EventSourceParserStream());
73+
for await (const event of stream) {
74+
if (event.event === "error") {
75+
controller.error(new Error(event.data));
76+
} else {
77+
controller.enqueue(
78+
new ServerSentEvent(event.event, event.data, event.id)
79+
);
80+
}
81+
}
82+
controller.close();
11883
},
11984
});
120-
121-
return stream;
12285
}
12386

12487
module.exports = {

0 commit comments

Comments
 (0)
Please sign in to comment.