Skip to content

Commit 9e1f75e

Browse files
committed
Return row counts for SQL ingestion (storage.sql.ingest()).
D1 bills based on rows read and written, but sql.ingest() didn't return those. Now it does. This required changing the return type of sql.ingest(), but it's an experimental API so that should be okay. let sqlCode = "INSERT INTO tbl (col) VALUES (123); INSERT I"; let result = sql.ingest(sqlCode); Old: result == " INSERT I" New: result.remainder == " INSERT I" result.meta.rowsRead == 0 result.meta.rowsWritten == 1 This "meta" attribute also gives us a convenient spot to stick additional information. For example, if we wanted to count how many SQL statements ingest() executed, we could easily add "result.meta.statementsExecuted" without breaking any existing users. Implementation-wise, there might be an optimization opportunity around SqlStorage::IngestResult::getMeta(). Right now, it allocates a new Meta every time it's called, so if you do the obvious thing like so: totalRead += result.meta.rowsRead totalWritten += result.meta.rowsWritten totalRedFish += result.meta.redFish // and so on then that might end up allocating and discarding multiple Meta objects, and some caching could fix that. On the other hand, I'm not very familiar[1] with JSG internals, so maybe that's already happening somehow. I have no idea. [1] This is a terrible understatement.
1 parent 925a1cb commit 9e1f75e

File tree

5 files changed

+105
-27
lines changed

5 files changed

+105
-27
lines changed

src/workerd/api/sql-test.js

+29-20
Original file line numberDiff line numberDiff line change
@@ -62,23 +62,23 @@ async function test(storage) {
6262

6363

6464
// Test partial query ingestion
65-
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456; `), ' ')
66-
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456;`), '')
67-
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456`), ' SELECT 456')
68-
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 45`), ' SELECT 45')
69-
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 4`), ' SELECT 4')
70-
assert.deepEqual(sql.ingest(`SELECT 123; SELECT `), ' SELECT ')
71-
assert.deepEqual(sql.ingest(`SELECT 123; SELECT`), ' SELECT')
72-
assert.deepEqual(sql.ingest(`SELECT 123; SELEC`), ' SELEC')
73-
assert.deepEqual(sql.ingest(`SELECT 123; SELE`), ' SELE')
74-
assert.deepEqual(sql.ingest(`SELECT 123; SEL`), ' SEL')
75-
assert.deepEqual(sql.ingest(`SELECT 123; SE`), ' SE')
76-
assert.deepEqual(sql.ingest(`SELECT 123; S`), ' S')
77-
assert.deepEqual(sql.ingest(`SELECT 123; `), ' ')
78-
assert.deepEqual(sql.ingest(`SELECT 123;`), '')
79-
assert.deepEqual(sql.ingest(`SELECT 123`), 'SELECT 123')
80-
assert.deepEqual(sql.ingest(`SELECT 12`), 'SELECT 12')
81-
assert.deepEqual(sql.ingest(`SELECT 1`), 'SELECT 1')
65+
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456; `).remainder, ' ')
66+
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456;`).remainder, '')
67+
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456`).remainder, ' SELECT 456')
68+
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 45`).remainder, ' SELECT 45')
69+
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 4`).remainder, ' SELECT 4')
70+
assert.deepEqual(sql.ingest(`SELECT 123; SELECT `).remainder, ' SELECT ')
71+
assert.deepEqual(sql.ingest(`SELECT 123; SELECT`).remainder, ' SELECT')
72+
assert.deepEqual(sql.ingest(`SELECT 123; SELEC`).remainder, ' SELEC')
73+
assert.deepEqual(sql.ingest(`SELECT 123; SELE`).remainder, ' SELE')
74+
assert.deepEqual(sql.ingest(`SELECT 123; SEL`).remainder, ' SEL')
75+
assert.deepEqual(sql.ingest(`SELECT 123; SE`).remainder, ' SE')
76+
assert.deepEqual(sql.ingest(`SELECT 123; S`).remainder, ' S')
77+
assert.deepEqual(sql.ingest(`SELECT 123; `).remainder, ' ')
78+
assert.deepEqual(sql.ingest(`SELECT 123;`).remainder, '')
79+
assert.deepEqual(sql.ingest(`SELECT 123`).remainder, 'SELECT 123')
80+
assert.deepEqual(sql.ingest(`SELECT 12`).remainder, 'SELECT 12')
81+
assert.deepEqual(sql.ingest(`SELECT 1`).remainder, 'SELECT 1')
8282

8383
// Exec throws with trailing comments
8484
assert.throws(
@@ -87,7 +87,7 @@ async function test(storage) {
8787
)
8888
// Ingest does not
8989
assert.deepEqual(
90-
sql.ingest(`SELECT 123; SELECT 456; -- trailing comment`),
90+
sql.ingest(`SELECT 123; SELECT 456; -- trailing comment`).remainder,
9191
' -- trailing comment'
9292
)
9393

@@ -105,8 +105,10 @@ async function test(storage) {
105105
const inputBytes = new TextEncoder().encode(INSERT_36_ROWS)
106106
const decoder = new TextDecoder()
107107

108+
108109
// Use a chunk size 1, 3, 9, 27, 81, ... bytes
109110
for (let length = 1; length < inputBytes.length; length = length * 3) {
111+
let totalRowsWritten = 0;
110112
let buffer = ''
111113
for (let offset = 0; offset < inputBytes.length; offset += length) {
112114
// Simulate a single "chunk" arriving
@@ -116,7 +118,9 @@ async function test(storage) {
116118
buffer += decoder.decode(chunk, { stream: true })
117119

118120
// Ingest any complete statements and snip those chars off the buffer
119-
buffer = sql.ingest(buffer)
121+
let result = sql.ingest(buffer);
122+
buffer = result.remainder;
123+
totalRowsWritten += result.meta.rowsWritten;
120124

121125
// Simulate awaiting next chunk
122126
await scheduler.wait(1)
@@ -139,6 +143,10 @@ async function test(storage) {
139143
{ val: 'f: 🔥😎🔥' },
140144
]
141145
)
146+
147+
// Verify that all 36 rows we inserted were accounted for.
148+
assert.equal(totalRowsWritten, 36);
149+
142150
sql.exec(`DELETE FROM streaming`)
143151
await scheduler.wait(1)
144152
}
@@ -1030,9 +1038,10 @@ async function testStreamingIngestion(request, storage) {
10301038
buffer += chunk
10311039

10321040
// Ingest any complete statements and snip those chars off the buffer
1033-
buffer = sql.ingest(buffer)
1041+
buffer = sql.ingest(buffer).remainder
10341042
}
10351043
})
1044+
10361045
// Verify exactly 36 rows were added
10371046
assert.deepEqual(Array.from(sql.exec(`SELECT count(*) FROM streaming`)), [
10381047
{ 'count(*)': 36 },

src/workerd/api/sql.c++

+17-2
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ jsg::Ref<SqlStorage::Cursor> SqlStorage::exec(jsg::Lock& js, kj::String querySql
1919
return jsg::alloc<Cursor>(*sqlite, regulator, querySql, kj::mv(bindings));
2020
}
2121

22-
kj::String SqlStorage::ingest(jsg::Lock& js, kj::String querySql) {
22+
jsg::Ref<SqlStorage::IngestResult> SqlStorage::ingest(jsg::Lock& js, kj::String querySql) {
2323
SqliteDatabase::Regulator& regulator = *this;
24-
return kj::str(sqlite->ingestSql(regulator, querySql));
24+
auto result = sqlite->ingestSql(regulator, querySql);
25+
return jsg::alloc<IngestResult>(kj::str(result.remainder), result.rowsRead, result.rowsWritten);
2526
}
2627

2728
jsg::Ref<SqlStorage::Statement> SqlStorage::prepare(jsg::Lock& js, kj::String query) {
@@ -295,4 +296,18 @@ void SqlStorage::visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
295296
}
296297
}
297298

299+
SqlStorage::IngestResult::IngestResult(kj::String remainder, uint64_t rowsRead, uint64_t rowsWritten) : remainder(kj::mv(remainder)), rowsRead(rowsRead), rowsWritten(rowsWritten) {}
300+
301+
kj::StringPtr SqlStorage::IngestResult::getRemainder() { return remainder; }
302+
303+
jsg::Ref<SqlStorage::IngestResult::Meta> SqlStorage::IngestResult::getMeta() { return jsg::alloc<Meta>(rowsRead, rowsWritten); }
304+
305+
306+
SqlStorage::IngestResult::Meta::Meta(uint64_t rowsRead, uint64_t rowsWritten) : rowsRead(rowsRead), rowsWritten(rowsWritten) {}
307+
308+
double SqlStorage::IngestResult::Meta::getRowsRead() { return rowsRead; }
309+
310+
double SqlStorage::IngestResult::Meta::getRowsWritten() { return rowsWritten; }
311+
312+
298313
} // namespace workerd::api

src/workerd/api/sql.h

+44-1
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ class SqlStorage final: public jsg::Object, private SqliteDatabase::Regulator {
2222

2323
class Cursor;
2424
class Statement;
25+
class IngestResult;
2526

2627
jsg::Ref<Cursor> exec(jsg::Lock& js, kj::String query, jsg::Arguments<BindingValue> bindings);
27-
kj::String ingest(jsg::Lock& js, kj::String query);
28+
jsg::Ref<IngestResult> ingest(jsg::Lock& js, kj::String query);
2829

2930
jsg::Ref<Statement> prepare(jsg::Lock& js, kj::String query);
3031

@@ -249,10 +250,52 @@ class SqlStorage::Statement final: public jsg::Object {
249250
friend class Cursor;
250251
};
251252

253+
254+
255+
class SqlStorage::IngestResult final : public jsg::Object {
256+
public:
257+
class Meta;
258+
259+
IngestResult(kj::String remainder, uint64_t rowsRead, uint64_t rowsWritten);
260+
261+
JSG_RESOURCE_TYPE(IngestResult) {
262+
JSG_READONLY_PROTOTYPE_PROPERTY(meta, getMeta);
263+
JSG_READONLY_PROTOTYPE_PROPERTY(remainder, getRemainder);
264+
JSG_NESTED_TYPE(Meta);
265+
}
266+
267+
jsg::Ref<Meta> getMeta();
268+
kj::StringPtr getRemainder();
269+
270+
private:
271+
kj::String remainder;
272+
uint64_t rowsRead;
273+
uint64_t rowsWritten;
274+
};
275+
276+
class SqlStorage::IngestResult::Meta final : public jsg::Object {
277+
public:
278+
Meta(uint64_t rowsRead, uint64_t rowsWritten);
279+
280+
JSG_RESOURCE_TYPE(Meta) {
281+
JSG_READONLY_PROTOTYPE_PROPERTY(rowsRead, getRowsRead);
282+
JSG_READONLY_PROTOTYPE_PROPERTY(rowsWritten, getRowsWritten);
283+
}
284+
285+
double getRowsRead();
286+
double getRowsWritten();
287+
288+
private:
289+
uint64_t rowsRead;
290+
uint64_t rowsWritten;
291+
};
292+
252293
#define EW_SQL_ISOLATE_TYPES \
253294
api::SqlStorage, \
254295
api::SqlStorage::Statement, \
255296
api::SqlStorage::Cursor, \
297+
api::SqlStorage::IngestResult, \
298+
api::SqlStorage::IngestResult::Meta, \
256299
api::SqlStorage::Cursor::RowIterator, \
257300
api::SqlStorage::Cursor::RowIterator::Next, \
258301
api::SqlStorage::Cursor::RawIterator, \

src/workerd/util/sqlite.c++

+8-3
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,10 @@ kj::Own<sqlite3_stmt> SqliteDatabase::prepareSql(
446446
}
447447
}
448448

449-
kj::StringPtr SqliteDatabase::ingestSql(Regulator& regulator, kj::StringPtr sqlCode) {
449+
SqliteDatabase::IngestResult SqliteDatabase::ingestSql(Regulator& regulator, kj::StringPtr sqlCode) {
450+
uint64_t rowsRead = 0;
451+
uint64_t rowsWritten = 0;
452+
450453
// While there's still some input SQL to process
451454
while (sqlCode.begin() != sqlCode.end()) {
452455
// And there are still valid statements:
@@ -456,13 +459,15 @@ kj::StringPtr SqliteDatabase::ingestSql(Regulator& regulator, kj::StringPtr sqlC
456459
// Slice off the next valid statement SQL
457460
auto nextStatement = kj::str(sqlCode.slice(0, statementLength));
458461
// Create a Query object, which will prepare & execute it
459-
Query(*this, regulator, nextStatement);
462+
auto q = Query(*this, regulator, nextStatement);
460463

464+
rowsRead += q.getRowsRead();
465+
rowsWritten += q.getRowsWritten();
461466
sqlCode = sqlCode.slice(statementLength);
462467
}
463468

464469
// Return the leftover buffer
465-
return sqlCode;
470+
return {.remainder = sqlCode, .rowsRead = rowsRead, .rowsWritten = rowsWritten};
466471
}
467472

468473
bool SqliteDatabase::isAuthorized(int actionCode,

src/workerd/util/sqlite.h

+7-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ class SqliteDatabase {
3636
class Regulator;
3737
struct VfsOptions;
3838

39+
struct IngestResult {
40+
kj::StringPtr remainder;
41+
uint64_t rowsRead;
42+
uint64_t rowsWritten;
43+
};
44+
3945
SqliteDatabase(const Vfs& vfs, kj::PathPtr path, kj::Maybe<kj::WriteMode> maybeMode = kj::none);
4046
~SqliteDatabase() noexcept(false);
4147
KJ_DISALLOW_COPY_AND_MOVE(SqliteDatabase);
@@ -92,7 +98,7 @@ class SqliteDatabase {
9298
// Helper to execute a chunk of SQL that may not be complete.
9399
// Executes every valid statement provided, and returns the remaining portion of the input
94100
// that was not processed. This is used for streaming SQL ingestion.
95-
kj::StringPtr ingestSql(Regulator& regulator, kj::StringPtr sqlCode);
101+
IngestResult ingestSql(Regulator& regulator, kj::StringPtr sqlCode);
96102

97103
private:
98104
sqlite3* db;

0 commit comments

Comments
 (0)