Skip to content

Commit 31c83ee

Browse files
committed
Use primitives as rowIds
1 parent bda80f0 commit 31c83ee

File tree

4 files changed

+70
-51
lines changed

4 files changed

+70
-51
lines changed

packages/sdk/src/algebraic_type.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ export class ProductType {
164164
}
165165
};
166166

167-
intoMapKey(value: any): any {
167+
intoMapKey(value: any): ComparablePrimitive {
168168
if (this.elements.length === 1) {
169169
if (this.elements[0].name === '__time_duration_micros__') {
170170
return (value as TimeDuration).__time_duration_micros__;
@@ -188,7 +188,7 @@ export class ProductType {
188188
return writer.toBase64();
189189
}
190190

191-
deserialize = (reader: BinaryReader): any => {
191+
deserialize = (reader: BinaryReader): { [key: string]: any } => {
192192
let result: { [key: string]: any } = {};
193193
if (this.elements.length === 1) {
194194
if (this.elements[0].name === '__time_duration_micros__') {
@@ -240,6 +240,8 @@ type AnyType =
240240
| TypeRef
241241
| None;
242242

243+
export type ComparablePrimitive = number | string | String | boolean | bigint;
244+
243245
/**
244246
* The SpacetimeDB Algebraic Type System (SATS) is a structural type system in
245247
* which a nominal type system can be constructed.
@@ -480,7 +482,7 @@ export class AlgebraicType {
480482
* @param value A value of the algebraic type
481483
* @returns Something that can be used as a key in a map.
482484
*/
483-
intoMapKey(value: any): any {
485+
intoMapKey(value: any): ComparablePrimitive {
484486
switch (this.type) {
485487
case Type.U8:
486488
case Type.U16:

packages/sdk/src/binary_writer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ export default class BinaryWriter {
2121
this.#view = new DataView(this.#buffer.buffer);
2222
}
2323

24-
toBase64(): String {
24+
toBase64(): string {
2525
return fromByteArray(this.#buffer.subarray(0, this.#offset));
2626
}
2727

packages/sdk/src/db_connection_impl.ts

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -309,21 +309,30 @@ export class DbConnectionImpl<
309309
const reader = new BinaryReader(buffer);
310310
const rows: Operation[] = [];
311311
const rowType = this.#remoteModule.tables[tableName]!.rowType;
312+
const primaryKeyInfo =
313+
this.#remoteModule.tables[tableName]!.primaryKeyInfo;
312314
while (reader.offset < buffer.length + buffer.byteOffset) {
313315
const initialOffset = reader.offset;
314316
const row = rowType.deserialize(reader);
315-
316-
// Get a view of the bytes for this row.
317-
const rowBytes = buffer.subarray(
318-
initialOffset - buffer.byteOffset,
319-
reader.offset - buffer.byteOffset
320-
);
321-
// Convert it to a base64 string, so we can use it as a map key.
322-
const asBase64 = fromByteArray(rowBytes);
317+
let rowId: any | undefined = undefined;
318+
if (primaryKeyInfo !== undefined) {
319+
rowId = primaryKeyInfo.colType.intoMapKey(
320+
row[primaryKeyInfo.colName]
321+
);
322+
} else {
323+
// Get a view of the bytes for this row.
324+
const rowBytes = buffer.subarray(
325+
initialOffset - buffer.byteOffset,
326+
reader.offset - buffer.byteOffset
327+
);
328+
// Convert it to a base64 string, so we can use it as a map key.
329+
const asBase64 = fromByteArray(rowBytes);
330+
rowId = asBase64;
331+
}
323332

324333
rows.push({
325334
type,
326-
rowId: asBase64,
335+
rowId,
327336
row,
328337
});
329338
}

packages/sdk/src/table_cache.ts

Lines changed: 46 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,14 @@ import {
77
type EventContextInterface,
88
} from './db_connection_impl.ts';
99
import { stdbLogger } from './logger.ts';
10+
import type { ComparablePrimitive } from './algebraic_type.ts';
1011

1112
export type Operation = {
1213
type: 'insert' | 'delete';
13-
// rowId: string;
14-
rowId: string;
14+
// For tables with a primary key, this is the primary key value, as a primitive or string.
15+
// Otherwise, it is an encoding of the full row.
16+
rowId: ComparablePrimitive;
17+
// TODO: Refine this type to at least reflect that it is a product.
1518
row: any;
1619
};
1720

@@ -29,7 +32,7 @@ export type PendingCallback = {
2932
* Builder to generate calls to query a `table` in the database
3033
*/
3134
export class TableCache<RowType = any> {
32-
private rows: Map<string, [RowType, number]>;
35+
private rows: Map<ComparablePrimitive, [RowType, number]>;
3336
private tableTypeInfo: TableRuntimeTypeInfo;
3437
private emitter: EventEmitter<'insert' | 'delete' | 'update'>;
3538

@@ -65,38 +68,31 @@ export class TableCache<RowType = any> {
6568
): PendingCallback[] => {
6669
const pendingCallbacks: PendingCallback[] = [];
6770
if (this.tableTypeInfo.primaryKeyInfo !== undefined) {
68-
const primaryKeyCol = this.tableTypeInfo.primaryKeyInfo.colName;
69-
const primaryKeyType = this.tableTypeInfo.primaryKeyInfo.colType;
70-
const getPrimaryKey = (row: any) => {
71-
const primaryKeyValue = row[primaryKeyCol];
72-
const writer = new BinaryWriter(10);
73-
primaryKeyType.serialize(writer, primaryKeyValue);
74-
return writer.toBase64();
75-
};
76-
const insertMap = new OperationsMap<any, [Operation, number]>();
77-
const deleteMap = new OperationsMap<any, [Operation, number]>();
71+
const insertMap = new Map<ComparablePrimitive, [Operation, number]>();
72+
const deleteMap = new Map<ComparablePrimitive, [Operation, number]>();
7873
for (const op of operations) {
79-
const primaryKey = getPrimaryKey(op.row);
8074
if (op.type === 'insert') {
81-
const [_, prevCount] = insertMap.get(primaryKey) || [op, 0];
82-
insertMap.set(primaryKey, [op, prevCount + 1]);
75+
const [_, prevCount] = insertMap.get(op.rowId) || [op, 0];
76+
insertMap.set(op.rowId, [op, prevCount + 1]);
8377
} else {
84-
const [_, prevCount] = deleteMap.get(primaryKey) || [op, 0];
85-
deleteMap.set(primaryKey, [op, prevCount + 1]);
78+
const [_, prevCount] = deleteMap.get(op.rowId) || [op, 0];
79+
deleteMap.set(op.rowId, [op, prevCount + 1]);
8680
}
8781
}
88-
for (const {
89-
key: primaryKey,
90-
value: [insertOp, refCount],
91-
} of insertMap) {
82+
for (const [primaryKey, [insertOp, refCount]] of insertMap) {
9283
const deleteEntry = deleteMap.get(primaryKey);
9384
if (deleteEntry) {
9485
const [deleteOp, deleteCount] = deleteEntry;
9586
// In most cases the refCountDelta will be either 0 or refCount, but if
9687
// an update moves a row in or out of the result set of different queries, then
9788
// other deltas are possible.
9889
const refCountDelta = refCount - deleteCount;
99-
const maybeCb = this.update(ctx, insertOp, deleteOp, refCountDelta);
90+
const maybeCb = this.update(
91+
ctx,
92+
primaryKey,
93+
insertOp.row,
94+
refCountDelta
95+
);
10096
if (maybeCb) {
10197
pendingCallbacks.push(maybeCb);
10298
}
@@ -134,36 +130,48 @@ export class TableCache<RowType = any> {
134130

135131
update = (
136132
ctx: EventContextInterface,
137-
newDbOp: Operation,
138-
oldDbOp: Operation,
133+
rowId: ComparablePrimitive,
134+
newRow: RowType,
139135
refCountDelta: number = 0
140136
): PendingCallback | undefined => {
141-
const [oldRow, previousCount] = this.rows.get(oldDbOp.rowId) || [
142-
oldDbOp.row,
143-
0,
144-
];
137+
const existingEntry = this.rows.get(rowId);
138+
if (!existingEntry) {
139+
// TODO: this should throw an error and kill the connection.
140+
stdbLogger(
141+
'error',
142+
`Updating a row that was not present in the cache. Table: ${this.tableTypeInfo.tableName}, RowId: ${rowId}`
143+
);
144+
return undefined;
145+
}
146+
const [oldRow, previousCount] = existingEntry;
145147
const refCount = Math.max(1, previousCount + refCountDelta);
146-
this.rows.delete(oldDbOp.rowId);
147-
this.rows.set(newDbOp.rowId, [newDbOp.row, refCount]);
148+
if (previousCount + refCountDelta <= 0) {
149+
stdbLogger(
150+
'error',
151+
`Negative reference count for in table ${this.tableTypeInfo.tableName} row ${rowId} (${previousCount} + ${refCountDelta})`
152+
);
153+
return undefined;
154+
}
155+
this.rows.set(rowId, [newRow, refCount]);
148156
// This indicates something is wrong, so we could arguably crash here.
149157
if (previousCount === 0) {
150-
stdbLogger('error', 'Updating a row that was not present in the cache');
158+
stdbLogger(
159+
'error',
160+
`Updating a row id in table ${this.tableTypeInfo.tableName} which was not present in the cache (rowId: ${rowId})`
161+
);
151162
return {
152163
type: 'insert',
153164
table: this.tableTypeInfo.tableName,
154165
cb: () => {
155-
this.emitter.emit('insert', ctx, newDbOp.row);
166+
this.emitter.emit('insert', ctx, newRow);
156167
},
157168
};
158-
} else if (previousCount + refCountDelta <= 0) {
159-
stdbLogger('error', 'Negative reference count for row');
160-
// TODO: We should actually error and kill the connection here.
161169
}
162170
return {
163171
type: 'update',
164172
table: this.tableTypeInfo.tableName,
165173
cb: () => {
166-
this.emitter.emit('update', ctx, oldRow, newDbOp.row);
174+
this.emitter.emit('update', ctx, oldRow, newRow);
167175
},
168176
};
169177
};
@@ -187,7 +195,7 @@ export class TableCache<RowType = any> {
187195
},
188196
};
189197
}
190-
console.log(`previousCount of ${previousCount} for ${operation.rowId}`);
198+
// It's possible to get a duplicate insert because rows can be returned from multiple queries.
191199
return undefined;
192200
};
193201

0 commit comments

Comments
 (0)