Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changeset/angry-ducks-sneeze.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
'@powersync/react-native': minor
'@powersync/common': minor
'@powersync/web': minor
'@powersync/node': minor
---

Add alpha support for sync streams, allowing different sets of data to be synced dynamically.
5 changes: 5 additions & 0 deletions .changeset/mighty-colts-rule.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/react': minor
---

Add hooks for sync streams
18 changes: 13 additions & 5 deletions packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,10 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
return this.waitForStatus(statusMatches, signal);
}

private async waitForStatus(predicate: (status: SyncStatus) => any, signal?: AbortSignal): Promise<void> {
/**
* Waits for the first sync status for which the `status` callback returns a truthy value.
*/
async waitForStatus(predicate: (status: SyncStatus) => any, signal?: AbortSignal): Promise<void> {
if (predicate(this.currentStatus)) {
return;
}
Expand All @@ -364,16 +367,21 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
const dispose = this.registerListener({
statusChanged: (status) => {
if (predicate(status)) {
dispose();
resolve();
abort();
}
}
});

signal?.addEventListener('abort', () => {
function abort() {
dispose();
resolve();
});
}

if (signal?.aborted) {
abort();
} else {
signal?.addEventListener('abort', abort);
}
});
}

Expand Down
9 changes: 5 additions & 4 deletions packages/common/src/client/ConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -324,14 +324,15 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
};
}

private get activeStreams() {
/**
* @internal exposed for testing
*/
get activeStreams() {
return [...this.locallyActiveSubscriptions.values()].map((a) => ({ name: a.name, params: a.parameters }));
}

private subscriptionsMayHaveChanged() {
if (this.syncStreamImplementation) {
this.syncStreamImplementation.updateSubscriptions(this.activeStreams);
}
this.syncStreamImplementation?.updateSubscriptions(this.activeStreams);
}
}

Expand Down
4 changes: 3 additions & 1 deletion packages/node/vitest.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { defineConfig } from 'vitest/config';
// We need to define an empty config to be part of the vitest works
export default defineConfig({
test: {
silent: false
silent: false,
// This doesn't make the tests considerably slower. It may improve reliability for GH actions.
fileParallelism: false
}
});
131 changes: 131 additions & 0 deletions packages/react/src/hooks/streams.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import { useEffect, useMemo, useState } from 'react';
import { usePowerSync } from './PowerSyncContext.js';
import {
AbstractPowerSyncDatabase,
SyncStatus,
SyncStreamStatus,
SyncStreamSubscribeOptions,
SyncStreamSubscription
} from '@powersync/common';
import { useStatus } from './useStatus.js';
import { QuerySyncStreamOptions } from './watched/watch-types.js';

/**
* A sync stream to subscribe to in {@link useSyncStream}.
*
* For more details on sync streams, see the [documentation](https://docs.powersync.com/usage/sync-streams).
*/
export interface UseSyncStreamOptions extends SyncStreamSubscribeOptions {
/**
* The name of the stream to subscribe to.
*/
name: string;
/**
* Parameters for the stream subscription. A single stream can have multiple subscriptions with different parameter
* sets.
*/
parameters?: Record<string, any>;
}

/**
* Creates a PowerSync stream subscription. The subscription is kept alive as long as the React component calling this
* function. When it unmounts, {@link SyncStreamSubscription.unsubscribe} is called
*
* For more details on sync streams, see the [documentation](https://docs.powersync.com/usage/sync-streams).
*
* @returns The status for that stream, or `null` if the stream is currently being resolved.
*/
export function useSyncStream(options: UseSyncStreamOptions): SyncStreamStatus | null {
const { name, parameters } = options;
const db = usePowerSync();
const status = useStatus();
const [subscription, setSubscription] = useState<SyncStreamSubscription | null>(null);

useEffect(() => {
let active = true;
let subscription: SyncStreamSubscription | null = null;

db.syncStream(name, parameters)
.subscribe(options)
.then((sub) => {
if (active) {
subscription = sub;
setSubscription(sub);
} else {
// The cleanup function already ran, unsubscribe immediately.
sub.unsubscribe();
}
});

return () => {
active = false;
// If we don't have a subscription yet, it'll still get cleaned up once the promise resolves because we've set
// active to false.
subscription?.unsubscribe();
};
}, [name, parameters]);

return subscription && status.forStream(subscription);
}

/**
* @internal
*/
export function useAllSyncStreamsHaveSynced(
db: AbstractPowerSyncDatabase,
streams: QuerySyncStreamOptions[] | undefined
): boolean {
// Since streams are a user-supplied array, they will likely be different each time this function is called. We don't
// want to update underlying subscriptions each time, though.
const hash = useMemo(() => streams && JSON.stringify(streams), [streams]);
const [synced, setHasSynced] = useState(streams == null || streams.every((e) => e.waitForStream != true));

useEffect(() => {
if (streams) {
setHasSynced(false);

const promises: Promise<SyncStreamSubscription>[] = [];
const abort = new AbortController();
for (const stream of streams) {
promises.push(db.syncStream(stream.name, stream.parameters).subscribe(stream));
}

// First, wait for all subscribe() calls to make all subscriptions active.
Promise.all(promises).then(async (resolvedStreams) => {
function allHaveSynced(status: SyncStatus) {
return resolvedStreams.every((s, i) => {
const request = streams[i];
return !request.waitForStream || status.forStream(s)?.subscription?.hasSynced;
});
}

// Wait for the effect to be cancelled or all streams having synced.
await db.waitForStatus(allHaveSynced, abort.signal);
if (abort.signal.aborted) {
// Was cancelled
} else {
// Has synced, update public state.
setHasSynced(true);

// Wait for cancellation before clearing subscriptions.
await new Promise<void>((resolve) => {
abort.signal.addEventListener('abort', () => resolve());
});
}

// Effect was definitely cancelled at this point, so drop the subscriptions.
for (const stream of resolvedStreams) {
stream.unsubscribe();
}
});

return () => abort.abort();
} else {
// There are no streams, so all of them have synced.
setHasSynced(true);
return undefined;
}
}, [hash]);

return synced;
}
20 changes: 16 additions & 4 deletions packages/react/src/hooks/watched/useQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { useSingleQuery } from './useSingleQuery.js';
import { useWatchedQuery } from './useWatchedQuery.js';
import { AdditionalOptions, DifferentialHookOptions, QueryResult, ReadonlyQueryResult } from './watch-types.js';
import { constructCompatibleQuery } from './watch-utils.js';
import { useAllSyncStreamsHaveSynced } from '../streams.js';

/**
* A hook to access the results of a watched query.
Expand Down Expand Up @@ -58,15 +59,20 @@ export function useQuery<RowType = any>(
) {
const powerSync = usePowerSync();
if (!powerSync) {
return { isLoading: false, isFetching: false, data: [], error: new Error('PowerSync not configured.') };
return {
..._loadingState,
isLoading: false,
error: new Error('PowerSync not configured.')
};
}
const { parsedQuery, queryChanged } = constructCompatibleQuery(query, parameters, options);
const streamsHaveSynced = useAllSyncStreamsHaveSynced(powerSync, options?.streams);
const runOnce = options?.runQueryOnce == true;
const single = useSingleQuery<RowType>({
query: parsedQuery,
powerSync,
queryChanged,
active: runOnce
active: runOnce && streamsHaveSynced
});
const watched = useWatchedQuery<RowType>({
query: parsedQuery,
Expand All @@ -79,8 +85,14 @@ export function useQuery<RowType = any>(
// We emit new data for each table change by default.
rowComparator: options.rowComparator
},
active: !runOnce
active: !runOnce && streamsHaveSynced
});

return runOnce ? single : watched;
if (!streamsHaveSynced) {
return { ..._loadingState };
}

return (runOnce ? single : watched) ?? _loadingState;
}

const _loadingState = { isLoading: true, isFetching: false, data: [], error: undefined };
28 changes: 27 additions & 1 deletion packages/react/src/hooks/watched/watch-types.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,35 @@
import { DifferentialWatchedQueryComparator, SQLOnChangeOptions } from '@powersync/common';
import { DifferentialWatchedQueryComparator, SQLOnChangeOptions, SyncSubscriptionDescription } from '@powersync/common';
import { UseSyncStreamOptions } from '../streams.js';

export interface HookWatchOptions extends Omit<SQLOnChangeOptions, 'signal'> {
/**
* An optional array of sync streams (with names and parameters) backing the query.
*
* When set, `useQuery` will subscribe to those streams (and automatically handle unsubscribing from them, too).
*
* If {@link QuerySyncStreamOptions} is set on a stream, `useQuery` will remain in a loading state until that stream
* is marked as {@link SyncSubscriptionDescription.hasSynced}. This ensures the query is not missing rows that haven't
* been downloaded.
* Note however that after an initial sync, the query will not block itself while new rows are downloading. Instead,
* consistent sync snapshots will be made available as they've been processed by PowerSync.
*
* @experimental Sync streams are currently in alpha.
*/
streams?: QuerySyncStreamOptions[];
reportFetching?: boolean;
}

/**
* Additional options to control how `useQuery` behaves when subscribing to a stream.
*/
export interface QuerySyncStreamOptions extends UseSyncStreamOptions {
/**
* When set to `true`, a `useQuery` hook will remain in a loading state as long as the stream is resolving or
* downloading for the first time (in other words, until {@link SyncSubscriptionDescription.hasSynced} is true).
*/
waitForStream?: boolean;
}

export interface AdditionalOptions extends HookWatchOptions {
runQueryOnce?: boolean;
}
Expand Down
1 change: 1 addition & 0 deletions packages/react/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export * from './hooks/PowerSyncContext.js';
export { SuspenseQueryResult } from './hooks/suspense/SuspenseQueryResult.js';
export { useSuspenseQuery } from './hooks/suspense/useSuspenseQuery.js';
export { useWatchedQuerySuspenseSubscription } from './hooks/suspense/useWatchedQuerySuspenseSubscription.js';
export { useSyncStream, UseSyncStreamOptions } from './hooks/streams.js';
export { useStatus } from './hooks/useStatus.js';
export { useQuery } from './hooks/watched/useQuery.js';
export { useWatchedQuerySubscription } from './hooks/watched/useWatchedQuerySubscription.js';
Expand Down
2 changes: 1 addition & 1 deletion packages/react/tests/QueryStore.test.tsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AbstractPowerSyncDatabase, SQLWatchOptions } from '@powersync/common';
import { beforeEach, describe, expect, it } from 'vitest';
import { generateQueryKey, getQueryStore, QueryStore } from '../src/QueryStore';
import { openPowerSync } from './useQuery.test';
import { openPowerSync } from './utils';

describe('QueryStore', () => {
describe('generateQueryKey', () => {
Expand Down
Loading
Loading