Skip to content

Improve performance of row ids #180

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Jul 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/sdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,8 @@
"@clockworklabs/test-app": "file:../test-app",
"tsup": "^8.1.0",
"undici": "^6.19.2"
},
"dependencies": {
"base64-js": "^1.5.1"
}
}
63 changes: 61 additions & 2 deletions packages/sdk/src/algebraic_type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { TimeDuration } from './time_duration';
import { Timestamp } from './timestamp';
import { ConnectionId } from './connection_id';
import type BinaryReader from './binary_reader';
import type BinaryWriter from './binary_writer';
import BinaryWriter from './binary_writer';
import { Identity } from './identity';
import ScheduleAt from './schedule_at';

Expand Down Expand Up @@ -164,7 +164,31 @@ export class ProductType {
}
};

deserialize = (reader: BinaryReader): any => {
intoMapKey(value: any): ComparablePrimitive {
if (this.elements.length === 1) {
if (this.elements[0].name === '__time_duration_micros__') {
return (value as TimeDuration).__time_duration_micros__;
}

if (this.elements[0].name === '__timestamp_micros_since_unix_epoch__') {
return (value as Timestamp).__timestamp_micros_since_unix_epoch__;
}

if (this.elements[0].name === '__identity__') {
return (value as Identity).__identity__;
}

if (this.elements[0].name === '__connection_id__') {
return (value as ConnectionId).__connection_id__;
}
}
// The fallback is to serialize and base64 encode the bytes.
const writer = new BinaryWriter(10);
this.serialize(writer, value);
return writer.toBase64();
}

deserialize = (reader: BinaryReader): { [key: string]: any } => {
let result: { [key: string]: any } = {};
if (this.elements.length === 1) {
if (this.elements[0].name === '__time_duration_micros__') {
Expand Down Expand Up @@ -216,6 +240,8 @@ type AnyType =
| TypeRef
| None;

export type ComparablePrimitive = number | string | String | boolean | bigint;

/**
* The SpacetimeDB Algebraic Type System (SATS) is a structural type system in
* which a nominal type system can be constructed.
Expand Down Expand Up @@ -449,6 +475,39 @@ export class AlgebraicType {
return this.#isI64Newtype('__time_duration_micros__');
}

/**
* Convert a value of the algebraic type into something that can be used as a key in a map.
* There are no guarantees about being able to order it.
* This is only guaranteed to be comparable to other values of the same type.
* @param value A value of the algebraic type
* @returns Something that can be used as a key in a map.
*/
intoMapKey(value: any): ComparablePrimitive {
switch (this.type) {
case Type.U8:
case Type.U16:
case Type.U32:
case Type.U64:
case Type.U128:
case Type.U256:
case Type.I8:
case Type.I16:
case Type.I64:
case Type.I128:
case Type.F32:
case Type.F64:
case Type.String:
case Type.Bool:
return value;
case Type.ProductType:
return this.product.intoMapKey(value);
default:
const writer = new BinaryWriter(10);
this.serialize(writer, value);
return writer.toBase64();
}
}

serialize(writer: BinaryWriter, value: any): void {
switch (this.type) {
case Type.ProductType:
Expand Down
6 changes: 6 additions & 0 deletions packages/sdk/src/binary_writer.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { fromByteArray } from 'base64-js';

export default class BinaryWriter {
#buffer: Uint8Array;
#view: DataView;
Expand All @@ -19,6 +21,10 @@ export default class BinaryWriter {
this.#view = new DataView(this.#buffer.buffer);
}

toBase64(): string {
return fromByteArray(this.#buffer.subarray(0, this.#offset));
}

getBuffer(): Uint8Array {
return this.#buffer.slice(0, this.#offset);
}
Expand Down
83 changes: 54 additions & 29 deletions packages/sdk/src/db_connection_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
ProductTypeElement,
SumType,
SumTypeVariant,
type ComparablePrimitive,
} from './algebraic_type.ts';
import {
AlgebraicValue,
Expand All @@ -15,7 +16,13 @@ import {
} from './algebraic_value.ts';
import BinaryReader from './binary_reader.ts';
import BinaryWriter from './binary_writer.ts';
import * as ws from './client_api/index.ts';
import { BsatnRowList } from './client_api/bsatn_row_list_type.ts';
import { ClientMessage } from './client_api/client_message_type.ts';
import { DatabaseUpdate } from './client_api/database_update_type.ts';
import { QueryUpdate } from './client_api/query_update_type.ts';
import { ServerMessage } from './client_api/server_message_type.ts';
import { TableUpdate as RawTableUpdate } from './client_api/table_update_type.ts';
import type * as clientApi from './client_api/index.ts';
import { ClientCache } from './client_cache.ts';
import { DbConnectionBuilder } from './db_connection_builder.ts';
import { type DbContext } from './db_context.ts';
Expand All @@ -41,7 +48,7 @@ import {
TableCache,
type Operation,
type PendingCallback,
type TableUpdate,
type TableUpdate as CacheTableUpdate,
} from './table_cache.ts';
import { deepEqual, toPascalCase } from './utils.ts';
import { WebsocketDecompressAdapter } from './websocket_decompress_adapter.ts';
Expand All @@ -53,7 +60,8 @@ import {
type SubscribeEvent,
} from './subscription_builder_impl.ts';
import { stdbLogger } from './logger.ts';
import type { ReducerRuntimeTypeInfo } from './spacetime_module.ts';
import { type ReducerRuntimeTypeInfo } from './spacetime_module.ts';
import { fromByteArray } from 'base64-js';

export {
AlgebraicType,
Expand Down Expand Up @@ -273,7 +281,7 @@ export class DbConnectionImpl<
emitter: handleEmitter,
});
this.#sendMessage(
ws.ClientMessage.SubscribeMulti({
ClientMessage.SubscribeMulti({
queryStrings: querySql,
queryId: { id: queryId },
// The TypeScript SDK doesn't currently track `request_id`s,
Expand All @@ -286,7 +294,7 @@ export class DbConnectionImpl<

unregisterSubscription(queryId: number): void {
this.#sendMessage(
ws.ClientMessage.UnsubscribeMulti({
ClientMessage.UnsubscribeMulti({
queryId: { id: queryId },
// The TypeScript SDK doesn't currently track `request_id`s,
// so always use 0.
Expand All @@ -297,25 +305,38 @@ export class DbConnectionImpl<

// This function is async because we decompress the message async
async #processParsedMessage(
message: ws.ServerMessage
message: ServerMessage
): Promise<Message | undefined> {
const parseRowList = (
type: 'insert' | 'delete',
tableName: string,
rowList: ws.BsatnRowList
rowList: BsatnRowList
): Operation[] => {
const buffer = rowList.rowsData;
const reader = new BinaryReader(buffer);
const rows: any[] = [];
const rows: Operation[] = [];
const rowType = this.#remoteModule.tables[tableName]!.rowType;
const primaryKeyInfo =
this.#remoteModule.tables[tableName]!.primaryKeyInfo;
while (reader.offset < buffer.length + buffer.byteOffset) {
const initialOffset = reader.offset;
const row = rowType.deserialize(reader);
// This is super inefficient, but the buffer indexes are weird, so we are doing this for now.
// We should just base64 encode the bytes.
const rowId = JSON.stringify(row, (_, v) =>
typeof v === 'bigint' ? v.toString() : v
);
let rowId: ComparablePrimitive | undefined = undefined;
if (primaryKeyInfo !== undefined) {
rowId = primaryKeyInfo.colType.intoMapKey(
row[primaryKeyInfo.colName]
);
} else {
// Get a view of the bytes for this row.
const rowBytes = buffer.subarray(
initialOffset - buffer.byteOffset,
reader.offset - buffer.byteOffset
);
// Convert it to a base64 string, so we can use it as a map key.
const asBase64 = fromByteArray(rowBytes);
rowId = asBase64;
}

rows.push({
type,
rowId,
Expand All @@ -326,15 +347,15 @@ export class DbConnectionImpl<
};

const parseTableUpdate = async (
rawTableUpdate: ws.TableUpdate
): Promise<TableUpdate> => {
rawTableUpdate: RawTableUpdate
): Promise<CacheTableUpdate> => {
const tableName = rawTableUpdate.tableName;
let operations: Operation[] = [];
for (const update of rawTableUpdate.updates) {
let decompressed: ws.QueryUpdate;
let decompressed: QueryUpdate;
if (update.tag === 'Gzip') {
const decompressedBuffer = await decompress(update.value, 'gzip');
decompressed = ws.QueryUpdate.deserialize(
decompressed = QueryUpdate.deserialize(
new BinaryReader(decompressedBuffer)
);
} else if (update.tag === 'Brotli') {
Expand All @@ -358,9 +379,9 @@ export class DbConnectionImpl<
};

const parseDatabaseUpdate = async (
dbUpdate: ws.DatabaseUpdate
): Promise<TableUpdate[]> => {
const tableUpdates: TableUpdate[] = [];
dbUpdate: DatabaseUpdate
): Promise<CacheTableUpdate[]> => {
const tableUpdates: CacheTableUpdate[] = [];
for (const rawTableUpdate of dbUpdate.tables) {
tableUpdates.push(await parseTableUpdate(rawTableUpdate));
}
Expand Down Expand Up @@ -398,7 +419,7 @@ export class DbConnectionImpl<
const args = txUpdate.reducerCall.args;
const energyQuantaUsed = txUpdate.energyQuantaUsed;

let tableUpdates: TableUpdate[];
let tableUpdates: CacheTableUpdate[];
let errMessage = '';
switch (txUpdate.status.tag) {
case 'Committed':
Expand Down Expand Up @@ -498,11 +519,11 @@ export class DbConnectionImpl<
}
}

#sendMessage(message: ws.ClientMessage): void {
#sendMessage(message: ClientMessage): void {
this.wsPromise.then(wsResolved => {
if (wsResolved) {
const writer = new BinaryWriter(1024);
ws.ClientMessage.serialize(writer, message);
ClientMessage.serialize(writer, message);
const encoded = writer.getBuffer();
wsResolved.send(encoded);
}
Expand All @@ -517,24 +538,28 @@ export class DbConnectionImpl<
}

#applyTableUpdates(
tableUpdates: TableUpdate[],
tableUpdates: CacheTableUpdate[],
eventContext: EventContextInterface
): PendingCallback[] {
const pendingCallbacks: PendingCallback[] = [];
let pendingCallbacks: PendingCallback[] = [];
for (let tableUpdate of tableUpdates) {
// Get table information for the table being updated
const tableName = tableUpdate.tableName;
const tableTypeInfo = this.#remoteModule.tables[tableName]!;
const table = this.clientCache.getOrCreateTable(tableTypeInfo);
pendingCallbacks.push(
...table.applyOperations(tableUpdate.operations, eventContext)
const newCallbacks = table.applyOperations(
tableUpdate.operations,
eventContext
);
for (const callback of newCallbacks) {
pendingCallbacks.push(callback);
}
}
return pendingCallbacks;
}

async #processMessage(data: Uint8Array): Promise<void> {
const serverMessage = parseValue(ws.ServerMessage, data);
const serverMessage = parseValue(ServerMessage, data);
const message = await this.#processParsedMessage(serverMessage);
if (!message) {
return;
Expand Down Expand Up @@ -788,7 +813,7 @@ export class DbConnectionImpl<
argsBuffer: Uint8Array,
flags: CallReducerFlags
): void {
const message = ws.ClientMessage.CallReducer({
const message = ClientMessage.CallReducer({
reducer: reducerName,
args: argsBuffer,
// The TypeScript SDK doesn't currently track `request_id`s,
Expand Down
63 changes: 0 additions & 63 deletions packages/sdk/src/operations_map.ts

This file was deleted.

Loading