From 20e988ae34028bcf3d1be2bdab05f521bbcfa49c Mon Sep 17 00:00:00 2001 From: Harry Brundage Date: Tue, 17 Oct 2023 09:54:09 -0400 Subject: [PATCH] Add live query support to the imperative operation runners #239 added support for live queries to the react hooks. React provides a great model for ... react-ing... to changes in the live query where the hooks can trigger a re-render. #239 did not add support for the imperative style of api calls to go live because it isn't immediately clear how they should work. This adds the inner support we need for calling imperative live queries. I chose to use async iterators for this api. The end result would look like: ``` for await (const result of api.widget.findOne("123")) { console.log(result); } ``` `result` there would be a fully formed `GadgetRecord` instance, and each time it changed on the backend, we'd push a whole new instance out the async iterator. [no-changelog-required] --- packages/api-client-core/package.json | 1 + packages/api-client-core/spec/helpers.ts | 4 + .../spec/operationRunners.spec.ts | 995 ++++++++++++------ .../api-client-core/src/operationRunners.ts | 122 ++- pnpm-lock.yaml | 3 + 5 files changed, 774 insertions(+), 351 deletions(-) diff --git a/packages/api-client-core/package.json b/packages/api-client-core/package.json index c8785a94b..16284b685 100644 --- a/packages/api-client-core/package.json +++ b/packages/api-client-core/package.json @@ -39,6 +39,7 @@ "klona": "^2.0.6", "tiny-graphql-query-compiler": "^0.2.2", "tslib": "^2.6.2", + "wonka": "^6.3.2", "ws": "^8.13.0" }, "devDependencies": { diff --git a/packages/api-client-core/spec/helpers.ts b/packages/api-client-core/spec/helpers.ts index ffd9a28d0..599e09a00 100644 --- a/packages/api-client-core/spec/helpers.ts +++ b/packages/api-client-core/spec/helpers.ts @@ -15,3 +15,7 @@ export const base64 = (str: string) => Buffer.from(str).toString("base64"); export function expectValidGraphQLQuery(query: string) { parse(query); } + +export const asyncIterableToIterator = (iterator: AsyncIterable) => { + return iterator[Symbol.asyncIterator](); +}; diff --git a/packages/api-client-core/spec/operationRunners.spec.ts b/packages/api-client-core/spec/operationRunners.spec.ts index fac652f86..a6f392afe 100644 --- a/packages/api-client-core/spec/operationRunners.spec.ts +++ b/packages/api-client-core/spec/operationRunners.spec.ts @@ -1,377 +1,724 @@ +import { diff } from "@n1ru4l/json-patch-plus"; +import { CombinedError } from "@urql/core"; +import { GraphQLError } from "graphql"; import nock from "nock"; -import type { GadgetErrorGroup } from "../src/index.js"; -import { GadgetConnection, actionRunner } from "../src/index.js"; -import { mockUrqlClient } from "./mockUrqlClient.js"; +import type { AnyModelManager, GadgetErrorGroup } from "../src/index.js"; +import { GadgetConnection, actionRunner, findManyRunner, findOneRunner } from "../src/index.js"; +import { asyncIterableToIterator } from "./helpers.js"; +import { mockGraphQLWSClient, mockUrqlClient } from "./mockUrqlClient.js"; nock.disableNetConnect(); // eslint-disable-next-line jest/no-export describe("operationRunners", () => { let connection: GadgetConnection; - beforeEach(() => { - connection = new GadgetConnection({ endpoint: "https://someapp.gadget.app" }); - jest.spyOn(connection, "currentClient", "get").mockReturnValue(mockUrqlClient as any); - }); - describe("actionRunner", () => { - test("can run a single create action", async () => { - const promise = actionRunner<{ id: string; name: string }>( - { - connection, - }, - "createWidget", - { id: true, name: true }, - "widget", - "widget", - false, - { - widget: { - value: { name: "hello" }, - required: false, - type: "CreateWidgetInput", - }, - }, - {}, - null, - false - ); - - mockUrqlClient.executeMutation.pushResponse("createWidget", { - data: { - createWidget: { - success: true, - errors: null, + describe("static", () => { + beforeEach(() => { + connection = new GadgetConnection({ endpoint: "https://someapp.gadget.app" }); + jest.spyOn(connection, "currentClient", "get").mockReturnValue(mockUrqlClient as any); + }); + + describe("findOneRunner", () => { + test("can run a single findOne find", async () => { + const promise = findOneRunner( + { + connection, + }, + "widget", + "123", + { id: true, name: true }, + "widget", + {}, + true + ); + + mockUrqlClient.executeQuery.pushResponse("widget", { + data: { widget: { id: "123", name: "foo", }, }, - }, - stale: false, - hasNext: false, + stale: false, + hasNext: false, + }); + + const result = await promise; + expect(result.id).toBeTruthy(); + expect(result.name).toBeTruthy(); }); - const result = await promise; - expect(result.id).toBeTruthy(); - expect(result.name).toBeTruthy(); + test("returns an error if the backend returns an error", async () => { + const promise = findOneRunner( + { + connection, + }, + "widget", + "123", + { id: true, name: true }, + "widget", + {}, + true + ); + + mockUrqlClient.executeQuery.pushResponse("widget", { + data: null, + error: new CombinedError({ networkError: new Error("backend exploded") }), + stale: false, + hasNext: false, + }); + + await expect(promise).rejects.toThrowErrorMatchingInlineSnapshot(`"[Network] backend exploded"`); + }); }); - test("can run a single update action", async () => { - const promise = actionRunner<{ id: string; name: string }>( - { - connection, - }, - "updateWidget", - { id: true, name: true }, - "widget", - "widget", - false, - { - id: { - value: "123", - required: true, - type: "GadgetID", - }, - widget: { - value: { name: "hello" }, - required: false, - type: "UpdateWidgetInput", - }, - }, - {}, - null, - false - ); - - mockUrqlClient.executeMutation.pushResponse("updateWidget", { - data: { - updateWidget: { - success: true, - errors: null, - widget: { - id: "123", - name: "foo", + describe("findManyRunner", () => { + test("can run a findMany", async () => { + const promise = findManyRunner({ connection } as AnyModelManager, "widgets", { id: true, name: true }, "widget", {}, true); + + mockUrqlClient.executeQuery.pushResponse("widgets", { + data: { + widgets: { + edges: [ + { + node: { + id: "123", + name: "foo", + }, + }, + ], }, }, - }, - stale: false, - hasNext: false, - }); + stale: false, + hasNext: false, + }); - const result = await promise; - expect(result.id).toBeTruthy(); - expect(result.name).toBeTruthy(); + const result = await promise; + expect(result[0].id).toBeTruthy(); + expect(result[0].name).toBeTruthy(); + }); }); - test("can throw the error returned by the server for a single action", async () => { - const promise = actionRunner<{ id: string; name: string }>( - { - connection, - }, - "updateWidget", - { id: true, name: true }, - "widget", - "widget", - false, - { - id: { - value: "123", - required: true, - type: "GadgetID", - }, - widget: { - value: { name: "hello" }, - required: false, - type: "UpdateWidgetInput", - }, - }, - {}, - null, - false - ); - - mockUrqlClient.executeMutation.pushResponse("updateWidget", { - data: { - updateWidget: { - success: false, - errors: [ - { - code: "GGT_SOMETHING_OR_OTHER", - message: "An internal error occurred", - }, - ], - widget: null, + describe("actionRunner", () => { + test("can run a single create action", async () => { + const promise = actionRunner<{ id: string; name: string }>( + { + connection, }, - }, - stale: false, - hasNext: false, - }); - - await expect(promise).rejects.toThrowErrorMatchingInlineSnapshot(`"GGT_SOMETHING_OR_OTHER: An internal error occurred"`); - }); + "createWidget", + { id: true, name: true }, + "widget", + "widget", + false, + { + widget: { + value: { name: "hello" }, + required: false, + type: "CreateWidgetInput", + }, + }, + {}, + null, + false + ); - test("can run a bulk action by ids", async () => { - const promise = actionRunner<{ id: string; name: string }>( - { - connection, - }, - "bulkFlipWidgets", - { id: true, name: true }, - "widget", - "widgets", - true, - { - ids: { - value: ["123", "456"], - required: true, - type: "[GadgetID!]", - }, - }, - {}, - null, - false - ); - - mockUrqlClient.executeMutation.pushResponse("bulkFlipWidgets", { - data: { - bulkFlipWidgets: { - success: true, - errors: null, - widgets: [ - { + mockUrqlClient.executeMutation.pushResponse("createWidget", { + data: { + createWidget: { + success: true, + errors: null, + widget: { id: "123", name: "foo", }, - { - id: "456", - name: "bar", - }, - ], + }, }, - }, - stale: false, - hasNext: false, + stale: false, + hasNext: false, + }); + + const result = await promise; + expect(result.id).toBeTruthy(); + expect(result.name).toBeTruthy(); }); - const results = await promise; - expect(results[0].id).toBeTruthy(); - expect(results[0].name).toBeTruthy(); - expect(results[1].id).toBeTruthy(); - expect(results[1].name).toBeTruthy(); - }); + test("can run a single update action", async () => { + const promise = actionRunner<{ id: string; name: string }>( + { + connection, + }, + "updateWidget", + { id: true, name: true }, + "widget", + "widget", + false, + { + id: { + value: "123", + required: true, + type: "GadgetID", + }, + widget: { + value: { name: "hello" }, + required: false, + type: "UpdateWidgetInput", + }, + }, + {}, + null, + false + ); - test("can run a bulk action with params", async () => { - const promise = actionRunner<{ id: string; name: string }>( - { - connection, - }, - "bulkCreateWidgets", - { id: true, name: true }, - "widget", - "widgets", - true, - { - inputs: { - value: [{ id: "123", widget: { name: "foo" } }], - required: true, - type: "[BulkCreateWidgetInput!]", - }, - }, - {}, - null, - false - ); - - mockUrqlClient.executeMutation.pushResponse("bulkCreateWidgets", { - data: { - bulkCreateWidgets: { - success: true, - errors: null, - widgets: [ - { + mockUrqlClient.executeMutation.pushResponse("updateWidget", { + data: { + updateWidget: { + success: true, + errors: null, + widget: { id: "123", name: "foo", }, - { - id: "456", - name: "bar", - }, - ], + }, }, - }, - stale: false, - hasNext: false, + stale: false, + hasNext: false, + }); + + const result = await promise; + expect(result.id).toBeTruthy(); + expect(result.name).toBeTruthy(); }); - const results = await promise; - expect(results[0].id).toBeTruthy(); - expect(results[0].name).toBeTruthy(); - expect(results[1].id).toBeTruthy(); - expect(results[1].name).toBeTruthy(); - }); + test("can throw a transport error", async () => { + const promise = actionRunner<{ id: string; name: string }>( + { + connection, + }, + "updateWidget", + { id: true, name: true }, + "widget", + "widget", + false, + { + id: { + value: "123", + required: true, + type: "GadgetID", + }, + widget: { + value: { name: "hello" }, + required: false, + type: "UpdateWidgetInput", + }, + }, + {}, + null, + false + ); - test("throws a nice error when a bulk action returns errors", async () => { - const promise = actionRunner<{ id: string; name: string }>( - { - connection, - }, - "bulkCreateWidgets", - { id: true, name: true }, - "widget", - "widgets", - true, - { - inputs: { - value: [{ id: "123", widget: { name: "foo" } }], - required: true, - type: "[BulkCreateWidgetInput!]", - }, - }, - {}, - null, - false - ); - - mockUrqlClient.executeMutation.pushResponse("bulkCreateWidgets", { - data: { - bulkCreateWidgets: { - success: false, - errors: [ - { code: "GGT_ERROR_CODE_A", message: "Something went super wrong" }, - { code: "GGT_ERROR_CODE_B", message: "Something went super wrong" }, - ], - widgets: null, + mockUrqlClient.executeMutation.pushResponse("updateWidget", { + data: null, + error: new CombinedError({ networkError: new Error("backend exploded") }), + stale: false, + hasNext: false, + }); + + await expect(promise).rejects.toThrowErrorMatchingInlineSnapshot(`"[Network] backend exploded"`); + }); + + test("can throw the error returned by the server in the mutation result for a single action", async () => { + const promise = actionRunner<{ id: string; name: string }>( + { + connection, }, - }, - stale: false, - hasNext: false, + "updateWidget", + { id: true, name: true }, + "widget", + "widget", + false, + { + id: { + value: "123", + required: true, + type: "GadgetID", + }, + widget: { + value: { name: "hello" }, + required: false, + type: "UpdateWidgetInput", + }, + }, + {}, + null, + false + ); + + mockUrqlClient.executeMutation.pushResponse("updateWidget", { + data: { + updateWidget: { + success: false, + errors: [ + { + code: "GGT_SOMETHING_OR_OTHER", + message: "An internal error occurred", + }, + ], + widget: null, + }, + }, + stale: false, + hasNext: false, + }); + + await expect(promise).rejects.toThrowErrorMatchingInlineSnapshot(`"GGT_SOMETHING_OR_OTHER: An internal error occurred"`); }); - await expect(promise).rejects.toThrowErrorMatchingInlineSnapshot(`"Multiple errors occurred"`); - }); + test("can run a bulk action by ids", async () => { + const promise = actionRunner<{ id: string; name: string }>( + { + connection, + }, + "bulkFlipWidgets", + { id: true, name: true }, + "widget", + "widgets", + true, + { + ids: { + value: ["123", "456"], + required: true, + type: "[GadgetID!]", + }, + }, + {}, + null, + false + ); - test("throws a nice error when a bulk action returns errors and data", async () => { - const promise = actionRunner<{ id: string; name: string }>( - { - connection, - }, - "bulkCreateWidgets", - { id: true, name: true }, - "widget", - "widgets", - true, - { - inputs: { - value: [{ id: "123", widget: { name: "foo" } }], - required: true, - type: "[BulkCreateWidgetInput!]", - }, - }, - {}, - null, - false - ); - - mockUrqlClient.executeMutation.pushResponse("bulkCreateWidgets", { - data: { - bulkCreateWidgets: { - success: false, - errors: [ - { code: "GGT_ERROR_CODE_A", message: "Something went super wrong" }, - { code: "GGT_ERROR_CODE_B", message: "Something went super wrong" }, - ], - widgets: [{ id: "123", name: "foo" }], + mockUrqlClient.executeMutation.pushResponse("bulkFlipWidgets", { + data: { + bulkFlipWidgets: { + success: true, + errors: null, + widgets: [ + { + id: "123", + name: "foo", + }, + { + id: "456", + name: "bar", + }, + ], + }, }, - }, - stale: false, - hasNext: false, + stale: false, + hasNext: false, + }); + + const results = await promise; + expect(results[0].id).toBeTruthy(); + expect(results[0].name).toBeTruthy(); + expect(results[1].id).toBeTruthy(); + expect(results[1].name).toBeTruthy(); }); - let error: GadgetErrorGroup; - await promise.catch((e) => { - error = e; + test("can run a bulk action with params", async () => { + const promise = actionRunner<{ id: string; name: string }>( + { + connection, + }, + "bulkCreateWidgets", + { id: true, name: true }, + "widget", + "widgets", + true, + { + inputs: { + value: [{ id: "123", widget: { name: "foo" } }], + required: true, + type: "[BulkCreateWidgetInput!]", + }, + }, + {}, + null, + false + ); + + mockUrqlClient.executeMutation.pushResponse("bulkCreateWidgets", { + data: { + bulkCreateWidgets: { + success: true, + errors: null, + widgets: [ + { + id: "123", + name: "foo", + }, + { + id: "456", + name: "bar", + }, + ], + }, + }, + stale: false, + hasNext: false, + }); + + const results = await promise; + expect(results[0].id).toBeTruthy(); + expect(results[0].name).toBeTruthy(); + expect(results[1].id).toBeTruthy(); + expect(results[1].name).toBeTruthy(); + }); + + test("throws a nice error when a bulk action returns errors", async () => { + const promise = actionRunner<{ id: string; name: string }>( + { + connection, + }, + "bulkCreateWidgets", + { id: true, name: true }, + "widget", + "widgets", + true, + { + inputs: { + value: [{ id: "123", widget: { name: "foo" } }], + required: true, + type: "[BulkCreateWidgetInput!]", + }, + }, + {}, + null, + false + ); + + mockUrqlClient.executeMutation.pushResponse("bulkCreateWidgets", { + data: { + bulkCreateWidgets: { + success: false, + errors: [ + { code: "GGT_ERROR_CODE_A", message: "Something went super wrong" }, + { code: "GGT_ERROR_CODE_B", message: "Something went super wrong" }, + ], + widgets: null, + }, + }, + stale: false, + hasNext: false, + }); + + await expect(promise).rejects.toThrowErrorMatchingInlineSnapshot(`"Multiple errors occurred"`); }); - expect(error!).toBeTruthy(); - expect(error!.errors.length).toBe(2); - expect(error!.code).toMatchInlineSnapshot(`"GGT_ERROR_GROUP(GGT_ERROR_CODE_A,GGT_ERROR_CODE_B)"`); - expect(error!.results?.[0].id).toBeTruthy(); + + test("throws a nice error when a bulk action returns errors and data", async () => { + const promise = actionRunner<{ id: string; name: string }>( + { + connection, + }, + "bulkCreateWidgets", + { id: true, name: true }, + "widget", + "widgets", + true, + { + inputs: { + value: [{ id: "123", widget: { name: "foo" } }], + required: true, + type: "[BulkCreateWidgetInput!]", + }, + }, + {}, + null, + false + ); + + mockUrqlClient.executeMutation.pushResponse("bulkCreateWidgets", { + data: { + bulkCreateWidgets: { + success: false, + errors: [ + { code: "GGT_ERROR_CODE_A", message: "Something went super wrong" }, + { code: "GGT_ERROR_CODE_B", message: "Something went super wrong" }, + ], + widgets: [{ id: "123", name: "foo" }], + }, + }, + stale: false, + hasNext: false, + }); + + let error: GadgetErrorGroup; + await promise.catch((e) => { + error = e; + }); + expect(error!).toBeTruthy(); + expect(error!.errors.length).toBe(2); + expect(error!.code).toMatchInlineSnapshot(`"GGT_ERROR_GROUP(GGT_ERROR_CODE_A,GGT_ERROR_CODE_B)"`); + expect(error!.results?.[0].id).toBeTruthy(); + }); + + test("returns undefined when bulk action does not have a result", async () => { + const promise = actionRunner<{ id: string; name: string }>( + { + connection, + }, + "bulkDeleteWidgets", + { id: true, name: true }, + "widget", + "widgets", + true, + { + ids: { + value: ["123", "234"], + required: true, + type: "[GadgetID!]", + }, + }, + {}, + null, + false + ); + + mockUrqlClient.executeMutation.pushResponse("bulkDeleteWidgets", { + data: { + bulkDeleteWidgets: { + success: true, + errors: null, + }, + }, + stale: false, + hasNext: false, + }); + + const result = await promise; + expect(result).toBeUndefined(); + }); + }); + }); + + describe("live", () => { + beforeEach(() => { + connection = new GadgetConnection({ endpoint: "https://someapp.gadget.app" }); + jest.replaceProperty(connection, "baseSubscriptionClient", mockGraphQLWSClient as any); }); - test("returns undefined when bulk action does not have a result", async () => { - const promise = actionRunner<{ id: string; name: string }>( - { - connection, - }, - "bulkDeleteWidgets", - { id: true, name: true }, - "widget", - "widgets", - true, - { - ids: { - value: ["123", "234"], - required: true, - type: "[GadgetID!]", - }, - }, - {}, - null, - false - ); - - mockUrqlClient.executeMutation.pushResponse("bulkDeleteWidgets", { - data: { - bulkDeleteWidgets: { - success: true, - errors: null, - }, - }, - stale: false, - hasNext: false, + describe("findOneRunner", () => { + test("can run a live findOne find", async () => { + const iterator = asyncIterableToIterator( + await findOneRunner<{ id: string; name: string }, { live: true }>( + { + connection, + }, + "widget", + "123", + { id: true, name: true }, + "widget", + { live: true }, + true + ) + ); + + const firstValuePromise = iterator.next(); + + await Promise.resolve(); + + expect(mockGraphQLWSClient.subscribe.subscriptions).toHaveLength(1); + const subscription = mockGraphQLWSClient.subscribe.subscriptions[0]; + expect(subscription.payload.query).toContain("@live"); + + subscription.push({ + data: { + widget: { + id: "123", + name: "test widget", + }, + }, + revision: 1, + } as any); + + let { value } = await firstValuePromise; + expect(value.id).toEqual("123"); + expect(value.name).toEqual("test widget"); + + subscription.push({ + patch: { + widget: { + name: [null, "a new name"], + }, + }, + revision: 2, + } as any); + + ({ value } = await iterator.next()); + expect(value.id).toEqual("123"); + expect(value.name).toEqual("a new name"); }); - const result = await promise; - expect(result).toBeUndefined(); + test("can run a live findOne find with returns an error", async () => { + const iterator = asyncIterableToIterator( + await findOneRunner<{ id: string; name: string }, { live: true }>( + { + connection, + }, + "widget", + "123", + { id: true, name: true }, + "widget", + { live: true }, + true + ) + ); + + const firstValuePromise = iterator.next(); + + await Promise.resolve(); + + expect(mockGraphQLWSClient.subscribe.subscriptions).toHaveLength(1); + const subscription = mockGraphQLWSClient.subscribe.subscriptions[0]; + expect(subscription.payload.query).toContain("@live"); + + subscription.push({ + errors: [{ message: "backend error encountered processing" }], + } as any); + + await expect(firstValuePromise).rejects.toThrowErrorMatchingInlineSnapshot(`"[GraphQL] backend error encountered processing"`); + }); + + test("can run a live findOne that starts with working data then later returns an error", async () => { + const iterator = asyncIterableToIterator( + await findOneRunner<{ id: string; name: string }, { live: true }>( + { + connection, + }, + "widget", + "123", + { id: true, name: true }, + "widget", + { live: true }, + true + ) + ); + + const firstValuePromise = iterator.next(); + + await Promise.resolve(); + + expect(mockGraphQLWSClient.subscribe.subscriptions).toHaveLength(1); + const subscription = mockGraphQLWSClient.subscribe.subscriptions[0]; + expect(subscription.payload.query).toContain("@live"); + + subscription.push({ + data: { + widget: { + id: "123", + name: "test widget", + }, + }, + revision: 1, + } as any); + + expect(await firstValuePromise).toBeTruthy(); + + const nextValuePromise = iterator.next(); + + subscription.push({ + data: null, + errors: [new GraphQLError("backend exploded")], + }); + + await expect(nextValuePromise).rejects.toThrowErrorMatchingInlineSnapshot(`"[GraphQL] backend exploded"`); + }); + }); + + describe("findManyRunner", () => { + test("can run a live findMany", async () => { + const iterator = asyncIterableToIterator( + await findManyRunner<{ id: string; name: string }, { live: true }>( + { connection } as AnyModelManager, + "widgets", + { id: true, name: true }, + "widget", + { live: true }, + true + ) + ); + + const firstValuePromise = iterator.next(); + await Promise.resolve(); + + expect(mockGraphQLWSClient.subscribe.subscriptions).toHaveLength(1); + const subscription = mockGraphQLWSClient.subscribe.subscriptions[0]; + expect(subscription.payload.query).toContain("@live"); + + const initial = { + widgets: { + edges: [ + { + node: { + id: "123", + name: "test 1", + }, + }, + { + node: { + id: "456", + name: "test 2", + }, + }, + ], + }, + }; + + subscription.push({ + data: initial, + revision: 1, + } as any); + + let { value } = await firstValuePromise; + expect(value[0].id).toEqual("123"); + expect(value[1].id).toEqual("456"); + + const next = { + widgets: { + edges: [ + { + node: { + id: "123", + name: "a new name", + }, + }, + { + node: { + id: "456", + name: "test 2", + }, + }, + { + node: { + id: "789", + name: null, + }, + }, + ], + }, + }; + + subscription.push({ + patch: diff({ left: initial, right: next }), + revision: 2, + } as any); + + ({ value } = await iterator.next()); + expect(value[0].id).toEqual("123"); + expect(value[0].name).toEqual("a new name"); + expect(value[1].name).toEqual("test 2"); + expect(value[2].id).toEqual("789"); + expect(value[2].name).toEqual(null); + }); }); }); }); diff --git a/packages/api-client-core/src/operationRunners.ts b/packages/api-client-core/src/operationRunners.ts index bca7ec7b8..460e4c312 100644 --- a/packages/api-client-core/src/operationRunners.ts +++ b/packages/api-client-core/src/operationRunners.ts @@ -1,3 +1,6 @@ +import type { OperationResult } from "@urql/core"; +import type { Source } from "wonka"; +import { filter, pipe, take, toAsyncIterable, toPromise } from "wonka"; import type { FieldSelection } from "./FieldSelection.js"; import type { GadgetConnection } from "./GadgetConnection.js"; import type { GadgetRecord, RecordShape } from "./GadgetRecord.js"; @@ -24,20 +27,73 @@ import { } from "./support.js"; import type { BaseFindOptions, FindManyOptions, VariablesOptions } from "./types.js"; -export const findOneRunner = async ( +type LiveResultForOptions = LiveOptions extends { live: true } + ? AsyncIterable + : Promise; + +const mapAsyncIterable = (source: AsyncIterable, mapper: (item: T) => U): AsyncIterable => { + return { + [Symbol.asyncIterator]() { + const iter = source[Symbol.asyncIterator](); + + return { + async next(): Promise> { + const { done, value } = await iter.next(); + + return { + done, + value: typeof value != "undefined" ? mapper(value) : undefined, + } as any; + }, + async return(value: any): Promise> { + return (await iter.return?.(value)) as any; + }, + }; + }, + }; +}; + +/** Given a stream, return an async iterable when live querying, and a promise resolving to the last value otherwise */ +function maybeLiveStream( + $result: Source, + mapper: (value: T) => U, + options?: LiveOptions | null +): LiveResultForOptions { + if (options?.live) { + return mapAsyncIterable(toAsyncIterable($result), mapper) as unknown as LiveResultForOptions; + } else { + const promise = pipe( + $result, + filter((result) => !result.stale && !result.hasNext), + take(1), + toPromise + ); + + return promise.then((value) => mapper(value)) as LiveResultForOptions; + } +} + +export const findOneRunner = async ( modelManager: { connection: GadgetConnection }, operation: string, id: string | undefined, defaultSelection: FieldSelection, modelApiIdentifier: string, - options?: BaseFindOptions | null, + options?: Options | null, throwOnEmptyData = true ) => { const plan = findOneOperation(operation, id, defaultSelection, modelApiIdentifier, options); - const response = await modelManager.connection.currentClient.query(plan.query, plan.variables).toPromise(); - const assertSuccess = throwOnEmptyData ? assertOperationSuccess : assertNullableOperationSuccess; - const record = assertSuccess(response, [operation]); - return hydrateRecord(response, record); + const $results = modelManager.connection.currentClient.query(plan.query, plan.variables); + + return await maybeLiveStream( + $results, + (response) => { + const assertSuccess = throwOnEmptyData ? assertOperationSuccess : assertNullableOperationSuccess; + const record = assertSuccess(response, [operation]); + return hydrateRecord(response, record); + }, + options + ); }; export const findOneByFieldRunner = async ( @@ -50,40 +106,52 @@ export const findOneByFieldRunner = async ( options?: BaseFindOptions | null ) => { const plan = findOneByFieldOperation(operation, fieldName, fieldValue, defaultSelection, modelApiIdentifier, options); - const response = await modelManager.connection.currentClient.query(plan.query, plan.variables).toPromise(); - const connectionObject = assertOperationSuccess(response, [operation]); - const records = hydrateConnection(response, connectionObject); + const $results = modelManager.connection.currentClient.query(plan.query, plan.variables); - if (records.length > 1) { - throw getNonUniqueDataError(modelApiIdentifier, fieldName, fieldValue); - } + return await maybeLiveStream( + $results, + (response) => { + const connectionObject = assertOperationSuccess(response, [operation]); + const records = hydrateConnection(response, connectionObject); - return records[0]; + if (records.length > 1) { + throw getNonUniqueDataError(modelApiIdentifier, fieldName, fieldValue); + } + return records[0]; + }, + options + ); }; -export const findManyRunner = async ( +export const findManyRunner = async ( modelManager: AnyModelManager, operation: string, defaultSelection: FieldSelection, modelApiIdentifier: string, - options?: FindManyOptions, + options?: Options, throwOnEmptyData?: boolean ) => { const plan = findManyOperation(operation, defaultSelection, modelApiIdentifier, options); - const response = await modelManager.connection.currentClient.query(plan.query, plan.variables).toPromise(); + const $results = modelManager.connection.currentClient.query(plan.query, plan.variables); - let connectionObject; - if (throwOnEmptyData === false) { - // If this is a nullable operation, don't throw errors on empty - connectionObject = assertNullableOperationSuccess(response, [operation]); - } else { - // Otherwise, passthrough the `throwOnEmptyData` flag, to account for - // `findMany` (allows empty arrays) vs `findFirst` (no empty result) usage. - connectionObject = assertOperationSuccess(response, [operation], throwOnEmptyData); - } + return await maybeLiveStream( + $results, + (response) => { + let connectionObject; + if (throwOnEmptyData === false) { + // If this is a nullable operation, don't throw errors on empty + connectionObject = assertNullableOperationSuccess(response, [operation]); + } else { + // Otherwise, passthrough the `throwOnEmptyData` flag, to account for + // `findMany` (allows empty arrays) vs `findFirst` (no empty result) usage. + connectionObject = assertOperationSuccess(response, [operation], throwOnEmptyData); + } - const records = hydrateConnection(response, connectionObject); - return GadgetRecordList.boot(modelManager, records, { options, pageInfo: connectionObject.pageInfo }); + const records = hydrateConnection(response, connectionObject); + return GadgetRecordList.boot(modelManager, records, { options, pageInfo: connectionObject.pageInfo }); + }, + options + ); }; export interface ActionRunner { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index c5d5f594e..6c6378220 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -123,6 +123,9 @@ importers: tslib: specifier: ^2.6.2 version: 2.6.2 + wonka: + specifier: ^6.3.2 + version: 6.3.2 ws: specifier: ^8.13.0 version: 8.13.0