Skip to content

Commit e44770f

Browse files
committed
Handle processing chunked event streams
1 parent 913f524 commit e44770f

File tree

2 files changed

+310
-1
lines changed

2 files changed

+310
-1
lines changed

index.test.ts

+287
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ import Replicate, {
88
} from "replicate";
99
import nock from "nock";
1010
import fetch from "cross-fetch";
11+
import { Stream } from "./lib/stream";
12+
import * as fs from "node:fs";
13+
import { PassThrough } from "node:stream";
1114

1215
let client: Replicate;
1316
const BASE_URL = "https://api.replicate.com/v1";
@@ -1187,4 +1190,288 @@ describe("Replicate client", () => {
11871190
});
11881191

11891192
// Continue with tests for other methods
1193+
1194+
describe("Stream", () => {
1195+
function createStream(body: string | NodeJS.ReadableStream) {
1196+
const streamEndpoint = "https://stream.replicate.com";
1197+
nock(streamEndpoint)
1198+
.get(`/fake_stream`)
1199+
.matchHeader("Accept", "text/event-stream")
1200+
.reply(200, body);
1201+
1202+
return new Stream({ url: `${streamEndpoint}/fake_stream`, fetch });
1203+
}
1204+
1205+
test("consumes a server sent event stream", async () => {
1206+
const stream = createStream(
1207+
`
1208+
event: output
1209+
id: EVENT_1
1210+
data: hello world
1211+
1212+
event: done
1213+
id: EVENT_2
1214+
data: {}
1215+
`
1216+
.trim()
1217+
.replace(/^[ ]+/gm, "")
1218+
);
1219+
1220+
const iterator = stream[Symbol.asyncIterator]();
1221+
1222+
expect(await iterator.next()).toEqual({
1223+
done: false,
1224+
value: { event: "output", id: "EVENT_1", data: "hello world" },
1225+
});
1226+
expect(await iterator.next()).toEqual({
1227+
done: false,
1228+
value: { event: "done", id: "EVENT_2", data: "{}" },
1229+
});
1230+
expect(await iterator.next()).toEqual({ done: true });
1231+
expect(await iterator.next()).toEqual({ done: true });
1232+
});
1233+
1234+
test("consumes multiple events", async () => {
1235+
const stream = createStream(
1236+
`
1237+
event: output
1238+
id: EVENT_1
1239+
data: hello world
1240+
1241+
event: output
1242+
id: EVENT_2
1243+
data: hello dave
1244+
1245+
event: done
1246+
id: EVENT_3
1247+
data: {}
1248+
`
1249+
.trim()
1250+
.replace(/^[ ]+/gm, "")
1251+
);
1252+
1253+
const iterator = stream[Symbol.asyncIterator]();
1254+
1255+
expect(await iterator.next()).toEqual({
1256+
done: false,
1257+
value: { event: "output", id: "EVENT_1", data: "hello world" },
1258+
});
1259+
expect(await iterator.next()).toEqual({
1260+
done: false,
1261+
value: { event: "output", id: "EVENT_2", data: "hello dave" },
1262+
});
1263+
expect(await iterator.next()).toEqual({
1264+
done: false,
1265+
value: { event: "done", id: "EVENT_3", data: "{}" },
1266+
});
1267+
expect(await iterator.next()).toEqual({ done: true });
1268+
expect(await iterator.next()).toEqual({ done: true });
1269+
});
1270+
1271+
test("ignores unexpected characters", async () => {
1272+
const stream = createStream(
1273+
`
1274+
: hi
1275+
1276+
event: output
1277+
id: EVENT_1
1278+
data: hello world
1279+
1280+
event: done
1281+
id: EVENT_2
1282+
data: {}
1283+
`
1284+
.trim()
1285+
.replace(/^[ ]+/gm, "")
1286+
);
1287+
1288+
const iterator = stream[Symbol.asyncIterator]();
1289+
1290+
expect(await iterator.next()).toEqual({
1291+
done: false,
1292+
value: { event: "output", id: "EVENT_1", data: "hello world" },
1293+
});
1294+
expect(await iterator.next()).toEqual({
1295+
done: false,
1296+
value: { event: "done", id: "EVENT_2", data: "{}" },
1297+
});
1298+
expect(await iterator.next()).toEqual({ done: true });
1299+
expect(await iterator.next()).toEqual({ done: true });
1300+
});
1301+
1302+
test("supports multiple lines of output in a single event", async () => {
1303+
const stream = createStream(
1304+
`
1305+
: hi
1306+
1307+
event: output
1308+
id: EVENT_1
1309+
data: hello,
1310+
data: this is a new line,
1311+
data: and this is a new line too
1312+
1313+
event: done
1314+
id: EVENT_2
1315+
data: {}
1316+
`
1317+
.trim()
1318+
.replace(/^[ ]+/gm, "")
1319+
);
1320+
1321+
const iterator = stream[Symbol.asyncIterator]();
1322+
1323+
expect(await iterator.next()).toEqual({
1324+
done: false,
1325+
value: {
1326+
event: "output",
1327+
id: "EVENT_1",
1328+
data: "hello,\nthis is a new line,\nand this is a new line too",
1329+
},
1330+
});
1331+
expect(await iterator.next()).toEqual({
1332+
done: false,
1333+
value: { event: "done", id: "EVENT_2", data: "{}" },
1334+
});
1335+
expect(await iterator.next()).toEqual({ done: true });
1336+
expect(await iterator.next()).toEqual({ done: true });
1337+
});
1338+
1339+
test("supports the server writing data lines in multiple chunks", async () => {
1340+
const body = new PassThrough();
1341+
const stream = createStream(body);
1342+
1343+
// Create a stream of data chunks split on the pipe character for readability.
1344+
const data = `
1345+
event: output
1346+
id: EVENT_1
1347+
data: hello,|
1348+
data: this is a new line,|
1349+
data: and this is a new line too
1350+
1351+
event: done
1352+
id: EVENT_2
1353+
data: {}
1354+
`
1355+
.trim()
1356+
.replace(/^[ ]+/gm, "");
1357+
1358+
const chunks = data.split("|");
1359+
1360+
// Consume the iterator in parallel to writing it.
1361+
const reading = new Promise(async (resolve, reject) => {
1362+
try {
1363+
const iterator = stream[Symbol.asyncIterator]();
1364+
expect(await iterator.next()).toEqual({
1365+
done: false,
1366+
value: {
1367+
event: "output",
1368+
id: "EVENT_1",
1369+
data: "hello,\nthis is a new line,\nand this is a new line too",
1370+
},
1371+
});
1372+
expect(await iterator.next()).toEqual({
1373+
done: false,
1374+
value: { event: "done", id: "EVENT_2", data: "{}" },
1375+
});
1376+
expect(await iterator.next()).toEqual({ done: true });
1377+
} catch (err) {
1378+
reject(err);
1379+
}
1380+
resolve(null);
1381+
});
1382+
1383+
// Write the chunks to the stream at an interval.
1384+
const writing = new Promise(async (resolve) => {
1385+
for await (const chunk of chunks) {
1386+
body.write(chunk);
1387+
await new Promise((resolve) => setTimeout(resolve, 1));
1388+
}
1389+
body.end();
1390+
resolve(null);
1391+
});
1392+
1393+
// Wait for both promises to resolve.
1394+
await Promise.all([reading, writing]);
1395+
});
1396+
1397+
test("supports the server writing data in a complete mess", async () => {
1398+
const body = new PassThrough();
1399+
const stream = createStream(body);
1400+
1401+
// Create a stream of data chunks split on the pipe character for readability.
1402+
const data = `
1403+
: hi
1404+
1405+
ev|ent: output
1406+
id: EVENT_1
1407+
data: hello,
1408+
data: this |is a new line,|
1409+
data: and this is |a new line too
1410+
1411+
event: d|one
1412+
id: EVENT|_2
1413+
data: {}
1414+
`
1415+
.trim()
1416+
.replace(/^[ ]+/gm, "");
1417+
1418+
const chunks = data.split("|");
1419+
1420+
// Consume the iterator in parallel to writing it.
1421+
const reading = new Promise(async (resolve, reject) => {
1422+
try {
1423+
const iterator = stream[Symbol.asyncIterator]();
1424+
expect(await iterator.next()).toEqual({
1425+
done: false,
1426+
value: {
1427+
event: "output",
1428+
id: "EVENT_1",
1429+
data: "hello,\nthis is a new line,\nand this is a new line too",
1430+
},
1431+
});
1432+
expect(await iterator.next()).toEqual({
1433+
done: false,
1434+
value: { event: "done", id: "EVENT_2", data: "{}" },
1435+
});
1436+
expect(await iterator.next()).toEqual({ done: true });
1437+
} catch (err) {
1438+
reject(err);
1439+
}
1440+
resolve(null);
1441+
});
1442+
1443+
// Write the chunks to the stream at an interval.
1444+
const writing = new Promise(async (resolve) => {
1445+
for await (const chunk of chunks) {
1446+
body.write(chunk);
1447+
await new Promise((resolve) => setTimeout(resolve, 1));
1448+
}
1449+
body.end();
1450+
resolve(null);
1451+
});
1452+
1453+
// Wait for both promises to resolve.
1454+
await Promise.all([reading, writing]);
1455+
});
1456+
1457+
test("supports ending without a done", async () => {
1458+
const stream = createStream(
1459+
`
1460+
event: output
1461+
id: EVENT_1
1462+
data: hello world
1463+
1464+
`
1465+
.trim()
1466+
.replace(/^[ ]+/gm, "")
1467+
);
1468+
1469+
const iterator = stream[Symbol.asyncIterator]();
1470+
expect(await iterator.next()).toEqual({
1471+
done: false,
1472+
value: { event: "output", id: "EVENT_1", data: "hello world" },
1473+
});
1474+
expect(await iterator.next()).toEqual({ done: true });
1475+
});
1476+
});
11901477
});

lib/stream.js

+23-1
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,21 @@ class Stream extends Readable {
114114
},
115115
});
116116

117+
if (!response.ok) {
118+
throw new Error();
119+
}
120+
121+
let partialChunk = "";
117122
for await (const chunk of response.body) {
118123
const decoder = new TextDecoder("utf-8");
119-
const text = decoder.decode(chunk);
124+
const text = partialChunk + decoder.decode(chunk);
120125
const lines = text.split("\n");
126+
127+
// We want to ensure that the last line is not a fragment
128+
// so we keep it and append it to the start of the next
129+
// chunk.
130+
partialChunk = lines.pop();
131+
121132
for (const line of lines) {
122133
const sse = this.decode(line);
123134
if (sse) {
@@ -133,6 +144,17 @@ class Stream extends Readable {
133144
}
134145
}
135146
}
147+
148+
// Process the final line and ensure we have captured the final event.
149+
this.decode(partialChunk);
150+
const sse = this.decode("");
151+
if (sse) {
152+
if (sse.event === "error") {
153+
throw new Error(sse.data);
154+
}
155+
156+
yield sse;
157+
}
136158
}
137159
}
138160

0 commit comments

Comments
 (0)