Skip to content

Commit f6696da

Browse files
authored
fix(js)!: client streamFlow returns properties, not functions (#1690)
Reconciles the server & client SDK to return a promise and async iterable rather than a function that returns these types. Docs already suggested we did this.
1 parent 1bded3c commit f6696da

File tree

9 files changed

+32
-51
lines changed

9 files changed

+32
-51
lines changed

js/core/src/async.ts

+3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
* limitations under the License.
1515
*/
1616

17+
// NOTE: This file is pulled into client code and cannot have any Node-only
18+
// dependencies.
19+
1720
/**
1821
* A handle to a promise and its resolvers.
1922
*/

js/genkit/src/client/client.ts

+12-34
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
* limitations under the License.
1515
*/
1616

17+
import { Channel } from '@genkit-ai/core/async';
18+
1719
const __flowStreamDelimiter = '\n\n';
1820

1921
/**
@@ -43,49 +45,25 @@ export function streamFlow<O = any, S = any>({
4345
input?: any;
4446
headers?: Record<string, string>;
4547
}): {
46-
output(): Promise<O>;
47-
stream(): AsyncIterable<S>;
48+
readonly output: Promise<O>;
49+
readonly stream: AsyncIterable<S>;
4850
} {
49-
let chunkStreamController: ReadableStreamDefaultController | undefined =
50-
undefined;
51-
const chunkStream = new ReadableStream({
52-
start(controller) {
53-
chunkStreamController = controller;
54-
},
55-
pull() {},
56-
cancel() {},
57-
});
51+
const channel = new Channel<S>();
5852

5953
const operationPromise = __flowRunEnvelope({
6054
url,
6155
input,
62-
streamingCallback: (c) => {
63-
chunkStreamController?.enqueue(c);
64-
},
56+
streamingCallback: (c) => channel.send(c),
6557
headers,
6658
});
67-
operationPromise.then((o) => {
68-
chunkStreamController?.close();
69-
return o;
70-
});
59+
operationPromise.then(
60+
() => channel.close(),
61+
(err) => channel.error(err)
62+
);
7163

7264
return {
73-
output() {
74-
return operationPromise;
75-
},
76-
async *stream() {
77-
const reader = chunkStream.getReader();
78-
while (true) {
79-
const chunk = await reader.read();
80-
if (chunk?.value !== undefined) {
81-
yield chunk.value;
82-
}
83-
if (chunk.done) {
84-
break;
85-
}
86-
}
87-
return await operationPromise;
88-
},
65+
output: operationPromise,
66+
stream: channel,
8967
};
9068
}
9169

js/plugins/express/README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,10 @@ const result = streamFlow({
8080
url: `http://localhost:${port}/simpleFlow`,
8181
input: 'say hello',
8282
});
83-
for await (const chunk of result.stream()) {
83+
for await (const chunk of result.stream) {
8484
console.log(chunk);
8585
}
86-
console.log(await result.output());
86+
console.log(await result.output);
8787
```
8888

8989
The sources for this package are in the main [Genkit](https://github.com/firebase/genkit) repo. Please file issues and pull requests against that repo.

js/plugins/express/tests/express_test.ts

+6-6
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ describe('expressHandler', async () => {
283283
});
284284

285285
const gotChunks: GenerateResponseChunkData[] = [];
286-
for await (const chunk of result.stream()) {
286+
for await (const chunk of result.stream) {
287287
gotChunks.push(chunk);
288288
}
289289

@@ -293,7 +293,7 @@ describe('expressHandler', async () => {
293293
{ index: 0, role: 'model', content: [{ text: '1' }] },
294294
]);
295295

296-
assert.strictEqual(await result.output(), 'Echo: olleh');
296+
assert.strictEqual(await result.output, 'Echo: olleh');
297297
});
298298

299299
it('stream a model', async () => {
@@ -310,11 +310,11 @@ describe('expressHandler', async () => {
310310
});
311311

312312
const gotChunks: any[] = [];
313-
for await (const chunk of result.stream()) {
313+
for await (const chunk of result.stream) {
314314
gotChunks.push(chunk);
315315
}
316316

317-
const output = await result.output();
317+
const output = await result.output;
318318
assert.strictEqual(output.finishReason, 'stop');
319319
assert.deepStrictEqual(output.message, {
320320
role: 'model',
@@ -502,7 +502,7 @@ describe('startFlowServer', async () => {
502502
});
503503

504504
const gotChunks: GenerateResponseChunkData[] = [];
505-
for await (const chunk of result.stream()) {
505+
for await (const chunk of result.stream) {
506506
gotChunks.push(chunk);
507507
}
508508

@@ -512,7 +512,7 @@ describe('startFlowServer', async () => {
512512
{ index: 0, role: 'model', content: [{ text: '1' }] },
513513
]);
514514

515-
assert.strictEqual(await result.output(), 'Echo: olleh');
515+
assert.strictEqual(await result.output, 'Echo: olleh');
516516
});
517517
});
518518
});

js/plugins/firebase/tests/functions_test.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -166,11 +166,11 @@ describe('function', () => {
166166
});
167167

168168
const chunks: any[] = [];
169-
for await (const chunk of result.stream()) {
169+
for await (const chunk of result.stream) {
170170
chunks.push(chunk);
171171
}
172172

173-
expect(await result.output()).toBe('hi Pavel - {"user":"Ali Baba"}');
173+
expect(await result.output).toBe('hi Pavel - {"user":"Ali Baba"}');
174174
expect(chunks).toStrictEqual([{ chubk: 1 }, { chubk: 2 }, { chubk: 3 }]);
175175
});
176176
});

samples/chatbot/genkit-app/src/app/samples/chatbot/chatbot.component.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ export class ChatbotComponent {
117117
});
118118

119119
let textBlock: OutputSchema | undefined = undefined;
120-
for await (const chunk of response.stream()) {
120+
for await (const chunk of response.stream) {
121121
for (const content of chunk.content) {
122122
if (content.text) {
123123
if (!textBlock) {
@@ -133,7 +133,7 @@ export class ChatbotComponent {
133133
this.loading = false;
134134
this.chatFormControl.enable();
135135

136-
await response.output();
136+
await response.output;
137137
} catch (e) {
138138
this.loading = false;
139139
this.chatFormControl.enable();

samples/js-angular/genkit-app/src/app/samples/chatbot/chatbot.component.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ export class ChatbotComponent {
109109
});
110110

111111
let textBlock: OutputSchema | undefined = undefined;
112-
for await (const chunk of response.stream()) {
112+
for await (const chunk of response.stream) {
113113
for (const content of chunk.content) {
114114
if (content.text) {
115115
if (!textBlock) {

samples/js-angular/genkit-app/src/app/samples/streaming-json/streaming-json.component.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,10 @@ export class StreamingJSONComponent {
4444
url,
4545
input: parseInt(this.count),
4646
});
47-
for await (const chunk of response.stream()) {
47+
for await (const chunk of response.stream) {
4848
this.characters = chunk;
4949
}
50-
console.log('streamConsumer done', await response.output());
50+
console.log('streamConsumer done', await response.output);
5151
this.loading = false;
5252
} catch (e) {
5353
this.loading = false;

tests/src/flow_server_test.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ async function testFlowServer() {
6868
input: test.post.data,
6969
});
7070

71-
for await (const chunk of response.stream()) {
71+
for await (const chunk of response.stream) {
7272
expected = want.message.replace('{count}', chunkCount.toString());
7373
let chunkJSON = JSON.stringify(await chunk);
7474
if (chunkJSON != expected) {
@@ -83,7 +83,7 @@ async function testFlowServer() {
8383
`unexpected number of stream chunks received: got ${chunkCount}, want: ${test.post.data}`
8484
);
8585
}
86-
let out = await response.output();
86+
let out = await response.output;
8787
want.result = want.result.replace(/\{count\}/g, chunkCount.toString());
8888
if (out != want.result) {
8989
throw new Error(

0 commit comments

Comments
 (0)