From 479edbafc884c89bad664d159f55a75764d65627 Mon Sep 17 00:00:00 2001 From: Samuel Merritt Date: Thu, 25 Apr 2024 15:07:38 -0700 Subject: [PATCH 1/3] Return row counts for SQL ingestion (storage.sql.ingest()). D1 bills based on rows read and written, but sql.ingest() didn't return those. Now it does. This required changing the return type of sql.ingest(), but it's an experimental API so that should be okay. let sqlCode = "INSERT INTO tbl (col) VALUES (123); INSERT I"; let result = sql.ingest(sqlCode); Old: result == " INSERT I" New: result.remainder == " INSERT I" result.rowsRead == 0 result.rowsWritten == 1 This also gives us a convenient spot to stick additional information. For example, if we wanted to count how many SQL statements ingest() executed, we could easily add "result.statementsExecuted" without breaking any existing users. [1] This is a terrible understatement. --- src/workerd/api/sql-test.js | 49 ++++++++++++++++++++++--------------- src/workerd/api/sql.c++ | 14 +++++++++-- src/workerd/api/sql.h | 28 ++++++++++++++++++++- src/workerd/util/sqlite.c++ | 11 ++++++--- src/workerd/util/sqlite.h | 8 +++++- 5 files changed, 83 insertions(+), 27 deletions(-) diff --git a/src/workerd/api/sql-test.js b/src/workerd/api/sql-test.js index 0dfba456013..fbe28d66cd4 100644 --- a/src/workerd/api/sql-test.js +++ b/src/workerd/api/sql-test.js @@ -62,23 +62,23 @@ async function test(storage) { // Test partial query ingestion - assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456; `), ' ') - assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456;`), '') - assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456`), ' SELECT 456') - assert.deepEqual(sql.ingest(`SELECT 123; SELECT 45`), ' SELECT 45') - assert.deepEqual(sql.ingest(`SELECT 123; SELECT 4`), ' SELECT 4') - assert.deepEqual(sql.ingest(`SELECT 123; SELECT `), ' SELECT ') - assert.deepEqual(sql.ingest(`SELECT 123; SELECT`), ' SELECT') - assert.deepEqual(sql.ingest(`SELECT 123; SELEC`), ' SELEC') - assert.deepEqual(sql.ingest(`SELECT 123; SELE`), ' SELE') - assert.deepEqual(sql.ingest(`SELECT 123; SEL`), ' SEL') - assert.deepEqual(sql.ingest(`SELECT 123; SE`), ' SE') - assert.deepEqual(sql.ingest(`SELECT 123; S`), ' S') - assert.deepEqual(sql.ingest(`SELECT 123; `), ' ') - assert.deepEqual(sql.ingest(`SELECT 123;`), '') - assert.deepEqual(sql.ingest(`SELECT 123`), 'SELECT 123') - assert.deepEqual(sql.ingest(`SELECT 12`), 'SELECT 12') - assert.deepEqual(sql.ingest(`SELECT 1`), 'SELECT 1') + assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456; `).remainder, ' ') + assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456;`).remainder, '') + assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456`).remainder, ' SELECT 456') + assert.deepEqual(sql.ingest(`SELECT 123; SELECT 45`).remainder, ' SELECT 45') + assert.deepEqual(sql.ingest(`SELECT 123; SELECT 4`).remainder, ' SELECT 4') + assert.deepEqual(sql.ingest(`SELECT 123; SELECT `).remainder, ' SELECT ') + assert.deepEqual(sql.ingest(`SELECT 123; SELECT`).remainder, ' SELECT') + assert.deepEqual(sql.ingest(`SELECT 123; SELEC`).remainder, ' SELEC') + assert.deepEqual(sql.ingest(`SELECT 123; SELE`).remainder, ' SELE') + assert.deepEqual(sql.ingest(`SELECT 123; SEL`).remainder, ' SEL') + assert.deepEqual(sql.ingest(`SELECT 123; SE`).remainder, ' SE') + assert.deepEqual(sql.ingest(`SELECT 123; S`).remainder, ' S') + assert.deepEqual(sql.ingest(`SELECT 123; `).remainder, ' ') + assert.deepEqual(sql.ingest(`SELECT 123;`).remainder, '') + assert.deepEqual(sql.ingest(`SELECT 123`).remainder, 'SELECT 123') + assert.deepEqual(sql.ingest(`SELECT 12`).remainder, 'SELECT 12') + assert.deepEqual(sql.ingest(`SELECT 1`).remainder, 'SELECT 1') // Exec throws with trailing comments assert.throws( @@ -87,7 +87,7 @@ async function test(storage) { ) // Ingest does not assert.deepEqual( - sql.ingest(`SELECT 123; SELECT 456; -- trailing comment`), + sql.ingest(`SELECT 123; SELECT 456; -- trailing comment`).remainder, ' -- trailing comment' ) @@ -105,8 +105,10 @@ async function test(storage) { const inputBytes = new TextEncoder().encode(INSERT_36_ROWS) const decoder = new TextDecoder() + // Use a chunk size 1, 3, 9, 27, 81, ... bytes for (let length = 1; length < inputBytes.length; length = length * 3) { + let totalRowsWritten = 0; let buffer = '' for (let offset = 0; offset < inputBytes.length; offset += length) { // Simulate a single "chunk" arriving @@ -116,7 +118,9 @@ async function test(storage) { buffer += decoder.decode(chunk, { stream: true }) // Ingest any complete statements and snip those chars off the buffer - buffer = sql.ingest(buffer) + let result = sql.ingest(buffer); + buffer = result.remainder; + totalRowsWritten += result.rowsWritten; // Simulate awaiting next chunk await scheduler.wait(1) @@ -139,6 +143,10 @@ async function test(storage) { { val: 'f: 🔥😎🔥' }, ] ) + + // Verify that all 36 rows we inserted were accounted for. + assert.equal(totalRowsWritten, 36); + sql.exec(`DELETE FROM streaming`) await scheduler.wait(1) } @@ -1030,9 +1038,10 @@ async function testStreamingIngestion(request, storage) { buffer += chunk // Ingest any complete statements and snip those chars off the buffer - buffer = sql.ingest(buffer) + buffer = sql.ingest(buffer).remainder } }) + // Verify exactly 36 rows were added assert.deepEqual(Array.from(sql.exec(`SELECT count(*) FROM streaming`)), [ { 'count(*)': 36 }, diff --git a/src/workerd/api/sql.c++ b/src/workerd/api/sql.c++ index 5ddb2c75951..5fb231d3475 100644 --- a/src/workerd/api/sql.c++ +++ b/src/workerd/api/sql.c++ @@ -19,9 +19,10 @@ jsg::Ref SqlStorage::exec(jsg::Lock& js, kj::String querySql return jsg::alloc(*sqlite, regulator, querySql, kj::mv(bindings)); } -kj::String SqlStorage::ingest(jsg::Lock& js, kj::String querySql) { +jsg::Ref SqlStorage::ingest(jsg::Lock& js, kj::String querySql) { SqliteDatabase::Regulator& regulator = *this; - return kj::str(sqlite->ingestSql(regulator, querySql)); + auto result = sqlite->ingestSql(regulator, querySql); + return jsg::alloc(kj::str(result.remainder), result.rowsRead, result.rowsWritten); } jsg::Ref SqlStorage::prepare(jsg::Lock& js, kj::String query) { @@ -295,4 +296,13 @@ void SqlStorage::visitForMemoryInfo(jsg::MemoryTracker& tracker) const { } } +SqlStorage::IngestResult::IngestResult(kj::String remainder, uint64_t rowsRead, uint64_t rowsWritten) : remainder(kj::mv(remainder)), rowsRead(rowsRead), rowsWritten(rowsWritten) {} + +kj::StringPtr SqlStorage::IngestResult::getRemainder() { return remainder; } + +double SqlStorage::IngestResult::getRowsRead() { return rowsRead; } + +double SqlStorage::IngestResult::getRowsWritten() { return rowsWritten; } + + } // namespace workerd::api diff --git a/src/workerd/api/sql.h b/src/workerd/api/sql.h index 4720e280a92..8b72cbd1053 100644 --- a/src/workerd/api/sql.h +++ b/src/workerd/api/sql.h @@ -22,9 +22,10 @@ class SqlStorage final: public jsg::Object, private SqliteDatabase::Regulator { class Cursor; class Statement; + class IngestResult; jsg::Ref exec(jsg::Lock& js, kj::String query, jsg::Arguments bindings); - kj::String ingest(jsg::Lock& js, kj::String query); + jsg::Ref ingest(jsg::Lock& js, kj::String query); jsg::Ref prepare(jsg::Lock& js, kj::String query); @@ -249,10 +250,35 @@ class SqlStorage::Statement final: public jsg::Object { friend class Cursor; }; + + +class SqlStorage::IngestResult final : public jsg::Object { +public: + + IngestResult(kj::String remainder, uint64_t rowsRead, uint64_t rowsWritten); + + JSG_RESOURCE_TYPE(IngestResult) { + JSG_READONLY_PROTOTYPE_PROPERTY(rowsRead, getRowsRead); + JSG_READONLY_PROTOTYPE_PROPERTY(rowsWritten, getRowsWritten); + JSG_READONLY_PROTOTYPE_PROPERTY(remainder, getRemainder); + } + + kj::StringPtr getRemainder(); + double getRowsRead(); + double getRowsWritten(); + +private: + kj::String remainder; + uint64_t rowsRead; + uint64_t rowsWritten; +}; + + #define EW_SQL_ISOLATE_TYPES \ api::SqlStorage, \ api::SqlStorage::Statement, \ api::SqlStorage::Cursor, \ + api::SqlStorage::IngestResult, \ api::SqlStorage::Cursor::RowIterator, \ api::SqlStorage::Cursor::RowIterator::Next, \ api::SqlStorage::Cursor::RawIterator, \ diff --git a/src/workerd/util/sqlite.c++ b/src/workerd/util/sqlite.c++ index f8248afe32e..58db8f72b21 100644 --- a/src/workerd/util/sqlite.c++ +++ b/src/workerd/util/sqlite.c++ @@ -447,7 +447,10 @@ kj::Own SqliteDatabase::prepareSql( } } -kj::StringPtr SqliteDatabase::ingestSql(Regulator& regulator, kj::StringPtr sqlCode) { +SqliteDatabase::IngestResult SqliteDatabase::ingestSql(Regulator& regulator, kj::StringPtr sqlCode) { + uint64_t rowsRead = 0; + uint64_t rowsWritten = 0; + // While there's still some input SQL to process while (sqlCode.begin() != sqlCode.end()) { // And there are still valid statements: @@ -457,13 +460,15 @@ kj::StringPtr SqliteDatabase::ingestSql(Regulator& regulator, kj::StringPtr sqlC // Slice off the next valid statement SQL auto nextStatement = kj::str(sqlCode.slice(0, statementLength)); // Create a Query object, which will prepare & execute it - Query(*this, regulator, nextStatement); + auto q = Query(*this, regulator, nextStatement); + rowsRead += q.getRowsRead(); + rowsWritten += q.getRowsWritten(); sqlCode = sqlCode.slice(statementLength); } // Return the leftover buffer - return sqlCode; + return {.remainder = sqlCode, .rowsRead = rowsRead, .rowsWritten = rowsWritten}; } void SqliteDatabase::executeWithRegulator(Regulator& regulator, kj::FunctionParam func) { diff --git a/src/workerd/util/sqlite.h b/src/workerd/util/sqlite.h index 5e1a5e24845..280bd11b379 100644 --- a/src/workerd/util/sqlite.h +++ b/src/workerd/util/sqlite.h @@ -36,6 +36,12 @@ class SqliteDatabase { class Regulator; struct VfsOptions; + struct IngestResult { + kj::StringPtr remainder; + uint64_t rowsRead; + uint64_t rowsWritten; + }; + SqliteDatabase(const Vfs& vfs, kj::PathPtr path, kj::Maybe maybeMode = kj::none); ~SqliteDatabase() noexcept(false); KJ_DISALLOW_COPY_AND_MOVE(SqliteDatabase); @@ -92,7 +98,7 @@ class SqliteDatabase { // Helper to execute a chunk of SQL that may not be complete. // Executes every valid statement provided, and returns the remaining portion of the input // that was not processed. This is used for streaming SQL ingestion. - kj::StringPtr ingestSql(Regulator& regulator, kj::StringPtr sqlCode); + IngestResult ingestSql(Regulator& regulator, kj::StringPtr sqlCode); // Execute a function with the given regulator. void executeWithRegulator(Regulator& regulator, kj::FunctionParam func); From 0cc33b7740f871ba9007a88aa826321dcf846722 Mon Sep 17 00:00:00 2001 From: Samuel Merritt Date: Tue, 30 Apr 2024 08:44:44 -0700 Subject: [PATCH 2/3] Return statement counts for SQL ingestion (storage.sql.ingest()). Wrangler wants this so it can show users how many statements were executed. This matches how it used to work when Wrangler parsed the SQL locally. --- src/workerd/api/sql-test.js | 4 +++- src/workerd/api/sql.c++ | 6 ++++-- src/workerd/api/sql.h | 5 ++++- src/workerd/util/sqlite.c++ | 4 +++- src/workerd/util/sqlite.h | 1 + 5 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/workerd/api/sql-test.js b/src/workerd/api/sql-test.js index fbe28d66cd4..494c11fa1fb 100644 --- a/src/workerd/api/sql-test.js +++ b/src/workerd/api/sql-test.js @@ -105,10 +105,10 @@ async function test(storage) { const inputBytes = new TextEncoder().encode(INSERT_36_ROWS) const decoder = new TextDecoder() - // Use a chunk size 1, 3, 9, 27, 81, ... bytes for (let length = 1; length < inputBytes.length; length = length * 3) { let totalRowsWritten = 0; + let totalSqlStatements = 0; let buffer = '' for (let offset = 0; offset < inputBytes.length; offset += length) { // Simulate a single "chunk" arriving @@ -121,6 +121,7 @@ async function test(storage) { let result = sql.ingest(buffer); buffer = result.remainder; totalRowsWritten += result.rowsWritten; + totalSqlStatements += result.statementCount; // Simulate awaiting next chunk await scheduler.wait(1) @@ -146,6 +147,7 @@ async function test(storage) { // Verify that all 36 rows we inserted were accounted for. assert.equal(totalRowsWritten, 36); + assert.equal(totalSqlStatements, 6); sql.exec(`DELETE FROM streaming`) await scheduler.wait(1) diff --git a/src/workerd/api/sql.c++ b/src/workerd/api/sql.c++ index 5fb231d3475..b5b57c5ee4d 100644 --- a/src/workerd/api/sql.c++ +++ b/src/workerd/api/sql.c++ @@ -22,7 +22,7 @@ jsg::Ref SqlStorage::exec(jsg::Lock& js, kj::String querySql jsg::Ref SqlStorage::ingest(jsg::Lock& js, kj::String querySql) { SqliteDatabase::Regulator& regulator = *this; auto result = sqlite->ingestSql(regulator, querySql); - return jsg::alloc(kj::str(result.remainder), result.rowsRead, result.rowsWritten); + return jsg::alloc(kj::str(result.remainder), result.rowsRead, result.rowsWritten, result.statementCount); } jsg::Ref SqlStorage::prepare(jsg::Lock& js, kj::String query) { @@ -296,7 +296,8 @@ void SqlStorage::visitForMemoryInfo(jsg::MemoryTracker& tracker) const { } } -SqlStorage::IngestResult::IngestResult(kj::String remainder, uint64_t rowsRead, uint64_t rowsWritten) : remainder(kj::mv(remainder)), rowsRead(rowsRead), rowsWritten(rowsWritten) {} +SqlStorage::IngestResult::IngestResult(kj::String remainder, uint64_t rowsRead, uint64_t rowsWritten, uint64_t statementCount) : + remainder(kj::mv(remainder)), rowsRead(rowsRead), rowsWritten(rowsWritten), statementCount(statementCount) {} kj::StringPtr SqlStorage::IngestResult::getRemainder() { return remainder; } @@ -304,5 +305,6 @@ double SqlStorage::IngestResult::getRowsRead() { return rowsRead; } double SqlStorage::IngestResult::getRowsWritten() { return rowsWritten; } +double SqlStorage::IngestResult::getStatementCount() { return statementCount; } } // namespace workerd::api diff --git a/src/workerd/api/sql.h b/src/workerd/api/sql.h index 8b72cbd1053..62877406cac 100644 --- a/src/workerd/api/sql.h +++ b/src/workerd/api/sql.h @@ -255,15 +255,17 @@ class SqlStorage::Statement final: public jsg::Object { class SqlStorage::IngestResult final : public jsg::Object { public: - IngestResult(kj::String remainder, uint64_t rowsRead, uint64_t rowsWritten); + IngestResult(kj::String remainder, uint64_t rowsRead, uint64_t rowsWritten, uint64_t statementCount); JSG_RESOURCE_TYPE(IngestResult) { + JSG_READONLY_PROTOTYPE_PROPERTY(statementCount, getStatementCount); JSG_READONLY_PROTOTYPE_PROPERTY(rowsRead, getRowsRead); JSG_READONLY_PROTOTYPE_PROPERTY(rowsWritten, getRowsWritten); JSG_READONLY_PROTOTYPE_PROPERTY(remainder, getRemainder); } kj::StringPtr getRemainder(); + double getStatementCount(); double getRowsRead(); double getRowsWritten(); @@ -271,6 +273,7 @@ class SqlStorage::IngestResult final : public jsg::Object { kj::String remainder; uint64_t rowsRead; uint64_t rowsWritten; + uint64_t statementCount; }; diff --git a/src/workerd/util/sqlite.c++ b/src/workerd/util/sqlite.c++ index 58db8f72b21..067a5f5708e 100644 --- a/src/workerd/util/sqlite.c++ +++ b/src/workerd/util/sqlite.c++ @@ -450,6 +450,7 @@ kj::Own SqliteDatabase::prepareSql( SqliteDatabase::IngestResult SqliteDatabase::ingestSql(Regulator& regulator, kj::StringPtr sqlCode) { uint64_t rowsRead = 0; uint64_t rowsWritten = 0; + uint64_t statementCount = 0; // While there's still some input SQL to process while (sqlCode.begin() != sqlCode.end()) { @@ -464,11 +465,12 @@ SqliteDatabase::IngestResult SqliteDatabase::ingestSql(Regulator& regulator, kj: rowsRead += q.getRowsRead(); rowsWritten += q.getRowsWritten(); + statementCount++; sqlCode = sqlCode.slice(statementLength); } // Return the leftover buffer - return {.remainder = sqlCode, .rowsRead = rowsRead, .rowsWritten = rowsWritten}; + return {.remainder = sqlCode, .rowsRead = rowsRead, .rowsWritten = rowsWritten, .statementCount = statementCount}; } void SqliteDatabase::executeWithRegulator(Regulator& regulator, kj::FunctionParam func) { diff --git a/src/workerd/util/sqlite.h b/src/workerd/util/sqlite.h index 280bd11b379..df855a695e2 100644 --- a/src/workerd/util/sqlite.h +++ b/src/workerd/util/sqlite.h @@ -40,6 +40,7 @@ class SqliteDatabase { kj::StringPtr remainder; uint64_t rowsRead; uint64_t rowsWritten; + uint64_t statementCount; }; SqliteDatabase(const Vfs& vfs, kj::PathPtr path, kj::Maybe maybeMode = kj::none); From 86458dfabd53b03a60d48ed787ff4d5a2421cc69 Mon Sep 17 00:00:00 2001 From: Samuel Merritt Date: Tue, 30 Apr 2024 12:16:54 -0700 Subject: [PATCH 3/3] Convert SqlStorage::IngestResult to a JSG_STRUCT. This removes a bunch of boilerplate C++. It does not change the JS interface. --- src/workerd/api/sql.c++ | 15 ++------------- src/workerd/api/sql.h | 35 +++++++++++------------------------ 2 files changed, 13 insertions(+), 37 deletions(-) diff --git a/src/workerd/api/sql.c++ b/src/workerd/api/sql.c++ index b5b57c5ee4d..e712e18aae5 100644 --- a/src/workerd/api/sql.c++ +++ b/src/workerd/api/sql.c++ @@ -19,10 +19,10 @@ jsg::Ref SqlStorage::exec(jsg::Lock& js, kj::String querySql return jsg::alloc(*sqlite, regulator, querySql, kj::mv(bindings)); } -jsg::Ref SqlStorage::ingest(jsg::Lock& js, kj::String querySql) { +SqlStorage::IngestResult SqlStorage::ingest(jsg::Lock& js, kj::String querySql) { SqliteDatabase::Regulator& regulator = *this; auto result = sqlite->ingestSql(regulator, querySql); - return jsg::alloc(kj::str(result.remainder), result.rowsRead, result.rowsWritten, result.statementCount); + return IngestResult(kj::str(result.remainder), result.rowsRead, result.rowsWritten, result.statementCount); } jsg::Ref SqlStorage::prepare(jsg::Lock& js, kj::String query) { @@ -296,15 +296,4 @@ void SqlStorage::visitForMemoryInfo(jsg::MemoryTracker& tracker) const { } } -SqlStorage::IngestResult::IngestResult(kj::String remainder, uint64_t rowsRead, uint64_t rowsWritten, uint64_t statementCount) : - remainder(kj::mv(remainder)), rowsRead(rowsRead), rowsWritten(rowsWritten), statementCount(statementCount) {} - -kj::StringPtr SqlStorage::IngestResult::getRemainder() { return remainder; } - -double SqlStorage::IngestResult::getRowsRead() { return rowsRead; } - -double SqlStorage::IngestResult::getRowsWritten() { return rowsWritten; } - -double SqlStorage::IngestResult::getStatementCount() { return statementCount; } - } // namespace workerd::api diff --git a/src/workerd/api/sql.h b/src/workerd/api/sql.h index 62877406cac..47079ce44f8 100644 --- a/src/workerd/api/sql.h +++ b/src/workerd/api/sql.h @@ -22,10 +22,10 @@ class SqlStorage final: public jsg::Object, private SqliteDatabase::Regulator { class Cursor; class Statement; - class IngestResult; + struct IngestResult; jsg::Ref exec(jsg::Lock& js, kj::String query, jsg::Arguments bindings); - jsg::Ref ingest(jsg::Lock& js, kj::String query); + IngestResult ingest(jsg::Lock& js, kj::String query); jsg::Ref prepare(jsg::Lock& js, kj::String query); @@ -250,30 +250,17 @@ class SqlStorage::Statement final: public jsg::Object { friend class Cursor; }; +struct SqlStorage::IngestResult { + IngestResult(kj::String remainder, double rowsRead, double rowsWritten, double statementCount) + : remainder(kj::mv(remainder)), rowsRead(rowsRead), rowsWritten(rowsWritten), + statementCount(statementCount) {} - -class SqlStorage::IngestResult final : public jsg::Object { -public: - - IngestResult(kj::String remainder, uint64_t rowsRead, uint64_t rowsWritten, uint64_t statementCount); - - JSG_RESOURCE_TYPE(IngestResult) { - JSG_READONLY_PROTOTYPE_PROPERTY(statementCount, getStatementCount); - JSG_READONLY_PROTOTYPE_PROPERTY(rowsRead, getRowsRead); - JSG_READONLY_PROTOTYPE_PROPERTY(rowsWritten, getRowsWritten); - JSG_READONLY_PROTOTYPE_PROPERTY(remainder, getRemainder); - } - - kj::StringPtr getRemainder(); - double getStatementCount(); - double getRowsRead(); - double getRowsWritten(); - -private: kj::String remainder; - uint64_t rowsRead; - uint64_t rowsWritten; - uint64_t statementCount; + double rowsRead; + double rowsWritten; + double statementCount; + + JSG_STRUCT(remainder, rowsRead, rowsWritten, statementCount); };