diff --git a/src/workerd/api/sql-test.js b/src/workerd/api/sql-test.js index 0dfba456013..494c11fa1fb 100644 --- a/src/workerd/api/sql-test.js +++ b/src/workerd/api/sql-test.js @@ -62,23 +62,23 @@ async function test(storage) { // Test partial query ingestion - assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456; `), ' ') - assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456;`), '') - assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456`), ' SELECT 456') - assert.deepEqual(sql.ingest(`SELECT 123; SELECT 45`), ' SELECT 45') - assert.deepEqual(sql.ingest(`SELECT 123; SELECT 4`), ' SELECT 4') - assert.deepEqual(sql.ingest(`SELECT 123; SELECT `), ' SELECT ') - assert.deepEqual(sql.ingest(`SELECT 123; SELECT`), ' SELECT') - assert.deepEqual(sql.ingest(`SELECT 123; SELEC`), ' SELEC') - assert.deepEqual(sql.ingest(`SELECT 123; SELE`), ' SELE') - assert.deepEqual(sql.ingest(`SELECT 123; SEL`), ' SEL') - assert.deepEqual(sql.ingest(`SELECT 123; SE`), ' SE') - assert.deepEqual(sql.ingest(`SELECT 123; S`), ' S') - assert.deepEqual(sql.ingest(`SELECT 123; `), ' ') - assert.deepEqual(sql.ingest(`SELECT 123;`), '') - assert.deepEqual(sql.ingest(`SELECT 123`), 'SELECT 123') - assert.deepEqual(sql.ingest(`SELECT 12`), 'SELECT 12') - assert.deepEqual(sql.ingest(`SELECT 1`), 'SELECT 1') + assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456; `).remainder, ' ') + assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456;`).remainder, '') + assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456`).remainder, ' SELECT 456') + assert.deepEqual(sql.ingest(`SELECT 123; SELECT 45`).remainder, ' SELECT 45') + assert.deepEqual(sql.ingest(`SELECT 123; SELECT 4`).remainder, ' SELECT 4') + assert.deepEqual(sql.ingest(`SELECT 123; SELECT `).remainder, ' SELECT ') + assert.deepEqual(sql.ingest(`SELECT 123; SELECT`).remainder, ' SELECT') + assert.deepEqual(sql.ingest(`SELECT 123; SELEC`).remainder, ' SELEC') + assert.deepEqual(sql.ingest(`SELECT 123; SELE`).remainder, ' SELE') + assert.deepEqual(sql.ingest(`SELECT 123; SEL`).remainder, ' SEL') + assert.deepEqual(sql.ingest(`SELECT 123; SE`).remainder, ' SE') + assert.deepEqual(sql.ingest(`SELECT 123; S`).remainder, ' S') + assert.deepEqual(sql.ingest(`SELECT 123; `).remainder, ' ') + assert.deepEqual(sql.ingest(`SELECT 123;`).remainder, '') + assert.deepEqual(sql.ingest(`SELECT 123`).remainder, 'SELECT 123') + assert.deepEqual(sql.ingest(`SELECT 12`).remainder, 'SELECT 12') + assert.deepEqual(sql.ingest(`SELECT 1`).remainder, 'SELECT 1') // Exec throws with trailing comments assert.throws( @@ -87,7 +87,7 @@ async function test(storage) { ) // Ingest does not assert.deepEqual( - sql.ingest(`SELECT 123; SELECT 456; -- trailing comment`), + sql.ingest(`SELECT 123; SELECT 456; -- trailing comment`).remainder, ' -- trailing comment' ) @@ -107,6 +107,8 @@ async function test(storage) { // Use a chunk size 1, 3, 9, 27, 81, ... bytes for (let length = 1; length < inputBytes.length; length = length * 3) { + let totalRowsWritten = 0; + let totalSqlStatements = 0; let buffer = '' for (let offset = 0; offset < inputBytes.length; offset += length) { // Simulate a single "chunk" arriving @@ -116,7 +118,10 @@ async function test(storage) { buffer += decoder.decode(chunk, { stream: true }) // Ingest any complete statements and snip those chars off the buffer - buffer = sql.ingest(buffer) + let result = sql.ingest(buffer); + buffer = result.remainder; + totalRowsWritten += result.rowsWritten; + totalSqlStatements += result.statementCount; // Simulate awaiting next chunk await scheduler.wait(1) @@ -139,6 +144,11 @@ async function test(storage) { { val: 'f: 🔥😎🔥' }, ] ) + + // Verify that all 36 rows we inserted were accounted for. + assert.equal(totalRowsWritten, 36); + assert.equal(totalSqlStatements, 6); + sql.exec(`DELETE FROM streaming`) await scheduler.wait(1) } @@ -1030,9 +1040,10 @@ async function testStreamingIngestion(request, storage) { buffer += chunk // Ingest any complete statements and snip those chars off the buffer - buffer = sql.ingest(buffer) + buffer = sql.ingest(buffer).remainder } }) + // Verify exactly 36 rows were added assert.deepEqual(Array.from(sql.exec(`SELECT count(*) FROM streaming`)), [ { 'count(*)': 36 }, diff --git a/src/workerd/api/sql.c++ b/src/workerd/api/sql.c++ index 5ddb2c75951..e712e18aae5 100644 --- a/src/workerd/api/sql.c++ +++ b/src/workerd/api/sql.c++ @@ -19,9 +19,10 @@ jsg::Ref SqlStorage::exec(jsg::Lock& js, kj::String querySql return jsg::alloc(*sqlite, regulator, querySql, kj::mv(bindings)); } -kj::String SqlStorage::ingest(jsg::Lock& js, kj::String querySql) { +SqlStorage::IngestResult SqlStorage::ingest(jsg::Lock& js, kj::String querySql) { SqliteDatabase::Regulator& regulator = *this; - return kj::str(sqlite->ingestSql(regulator, querySql)); + auto result = sqlite->ingestSql(regulator, querySql); + return IngestResult(kj::str(result.remainder), result.rowsRead, result.rowsWritten, result.statementCount); } jsg::Ref SqlStorage::prepare(jsg::Lock& js, kj::String query) { diff --git a/src/workerd/api/sql.h b/src/workerd/api/sql.h index 4720e280a92..47079ce44f8 100644 --- a/src/workerd/api/sql.h +++ b/src/workerd/api/sql.h @@ -22,9 +22,10 @@ class SqlStorage final: public jsg::Object, private SqliteDatabase::Regulator { class Cursor; class Statement; + struct IngestResult; jsg::Ref exec(jsg::Lock& js, kj::String query, jsg::Arguments bindings); - kj::String ingest(jsg::Lock& js, kj::String query); + IngestResult ingest(jsg::Lock& js, kj::String query); jsg::Ref prepare(jsg::Lock& js, kj::String query); @@ -249,10 +250,25 @@ class SqlStorage::Statement final: public jsg::Object { friend class Cursor; }; +struct SqlStorage::IngestResult { + IngestResult(kj::String remainder, double rowsRead, double rowsWritten, double statementCount) + : remainder(kj::mv(remainder)), rowsRead(rowsRead), rowsWritten(rowsWritten), + statementCount(statementCount) {} + + kj::String remainder; + double rowsRead; + double rowsWritten; + double statementCount; + + JSG_STRUCT(remainder, rowsRead, rowsWritten, statementCount); +}; + + #define EW_SQL_ISOLATE_TYPES \ api::SqlStorage, \ api::SqlStorage::Statement, \ api::SqlStorage::Cursor, \ + api::SqlStorage::IngestResult, \ api::SqlStorage::Cursor::RowIterator, \ api::SqlStorage::Cursor::RowIterator::Next, \ api::SqlStorage::Cursor::RawIterator, \ diff --git a/src/workerd/util/sqlite.c++ b/src/workerd/util/sqlite.c++ index 353bbb08d9b..6f819e1e87d 100644 --- a/src/workerd/util/sqlite.c++ +++ b/src/workerd/util/sqlite.c++ @@ -447,7 +447,11 @@ kj::Own SqliteDatabase::prepareSql( } } -kj::StringPtr SqliteDatabase::ingestSql(Regulator& regulator, kj::StringPtr sqlCode) { +SqliteDatabase::IngestResult SqliteDatabase::ingestSql(Regulator& regulator, kj::StringPtr sqlCode) { + uint64_t rowsRead = 0; + uint64_t rowsWritten = 0; + uint64_t statementCount = 0; + // While there's still some input SQL to process while (sqlCode.begin() != sqlCode.end()) { // And there are still valid statements: @@ -457,13 +461,16 @@ kj::StringPtr SqliteDatabase::ingestSql(Regulator& regulator, kj::StringPtr sqlC // Slice off the next valid statement SQL auto nextStatement = kj::str(sqlCode.slice(0, statementLength)); // Create a Query object, which will prepare & execute it - Query(*this, regulator, nextStatement); + auto q = Query(*this, regulator, nextStatement); + rowsRead += q.getRowsRead(); + rowsWritten += q.getRowsWritten(); + statementCount++; sqlCode = sqlCode.slice(statementLength); } // Return the leftover buffer - return sqlCode; + return {.remainder = sqlCode, .rowsRead = rowsRead, .rowsWritten = rowsWritten, .statementCount = statementCount}; } void SqliteDatabase::executeWithRegulator(Regulator& regulator, kj::FunctionParam func) { diff --git a/src/workerd/util/sqlite.h b/src/workerd/util/sqlite.h index 5e1a5e24845..df855a695e2 100644 --- a/src/workerd/util/sqlite.h +++ b/src/workerd/util/sqlite.h @@ -36,6 +36,13 @@ class SqliteDatabase { class Regulator; struct VfsOptions; + struct IngestResult { + kj::StringPtr remainder; + uint64_t rowsRead; + uint64_t rowsWritten; + uint64_t statementCount; + }; + SqliteDatabase(const Vfs& vfs, kj::PathPtr path, kj::Maybe maybeMode = kj::none); ~SqliteDatabase() noexcept(false); KJ_DISALLOW_COPY_AND_MOVE(SqliteDatabase); @@ -92,7 +99,7 @@ class SqliteDatabase { // Helper to execute a chunk of SQL that may not be complete. // Executes every valid statement provided, and returns the remaining portion of the input // that was not processed. This is used for streaming SQL ingestion. - kj::StringPtr ingestSql(Regulator& regulator, kj::StringPtr sqlCode); + IngestResult ingestSql(Regulator& regulator, kj::StringPtr sqlCode); // Execute a function with the given regulator. void executeWithRegulator(Regulator& regulator, kj::FunctionParam func);