Skip to content

Commit

Permalink
Merge pull request #2059 from smerritt/smerritt/sql-ingest-row-counts
Browse files Browse the repository at this point in the history
Return row and statement counts for SQL ingestion (storage.sql.ingest()).
  • Loading branch information
smerritt authored Apr 30, 2024
2 parents 58b7049 + 86458df commit 7737c74
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 27 deletions.
51 changes: 31 additions & 20 deletions src/workerd/api/sql-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,23 @@ async function test(storage) {


// Test partial query ingestion
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456; `), ' ')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456;`), '')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456`), ' SELECT 456')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 45`), ' SELECT 45')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 4`), ' SELECT 4')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT `), ' SELECT ')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT`), ' SELECT')
assert.deepEqual(sql.ingest(`SELECT 123; SELEC`), ' SELEC')
assert.deepEqual(sql.ingest(`SELECT 123; SELE`), ' SELE')
assert.deepEqual(sql.ingest(`SELECT 123; SEL`), ' SEL')
assert.deepEqual(sql.ingest(`SELECT 123; SE`), ' SE')
assert.deepEqual(sql.ingest(`SELECT 123; S`), ' S')
assert.deepEqual(sql.ingest(`SELECT 123; `), ' ')
assert.deepEqual(sql.ingest(`SELECT 123;`), '')
assert.deepEqual(sql.ingest(`SELECT 123`), 'SELECT 123')
assert.deepEqual(sql.ingest(`SELECT 12`), 'SELECT 12')
assert.deepEqual(sql.ingest(`SELECT 1`), 'SELECT 1')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456; `).remainder, ' ')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456;`).remainder, '')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456`).remainder, ' SELECT 456')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 45`).remainder, ' SELECT 45')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 4`).remainder, ' SELECT 4')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT `).remainder, ' SELECT ')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT`).remainder, ' SELECT')
assert.deepEqual(sql.ingest(`SELECT 123; SELEC`).remainder, ' SELEC')
assert.deepEqual(sql.ingest(`SELECT 123; SELE`).remainder, ' SELE')
assert.deepEqual(sql.ingest(`SELECT 123; SEL`).remainder, ' SEL')
assert.deepEqual(sql.ingest(`SELECT 123; SE`).remainder, ' SE')
assert.deepEqual(sql.ingest(`SELECT 123; S`).remainder, ' S')
assert.deepEqual(sql.ingest(`SELECT 123; `).remainder, ' ')
assert.deepEqual(sql.ingest(`SELECT 123;`).remainder, '')
assert.deepEqual(sql.ingest(`SELECT 123`).remainder, 'SELECT 123')
assert.deepEqual(sql.ingest(`SELECT 12`).remainder, 'SELECT 12')
assert.deepEqual(sql.ingest(`SELECT 1`).remainder, 'SELECT 1')

// Exec throws with trailing comments
assert.throws(
Expand All @@ -87,7 +87,7 @@ async function test(storage) {
)
// Ingest does not
assert.deepEqual(
sql.ingest(`SELECT 123; SELECT 456; -- trailing comment`),
sql.ingest(`SELECT 123; SELECT 456; -- trailing comment`).remainder,
' -- trailing comment'
)

Expand All @@ -107,6 +107,8 @@ async function test(storage) {

// Use a chunk size 1, 3, 9, 27, 81, ... bytes
for (let length = 1; length < inputBytes.length; length = length * 3) {
let totalRowsWritten = 0;
let totalSqlStatements = 0;
let buffer = ''
for (let offset = 0; offset < inputBytes.length; offset += length) {
// Simulate a single "chunk" arriving
Expand All @@ -116,7 +118,10 @@ async function test(storage) {
buffer += decoder.decode(chunk, { stream: true })

// Ingest any complete statements and snip those chars off the buffer
buffer = sql.ingest(buffer)
let result = sql.ingest(buffer);
buffer = result.remainder;
totalRowsWritten += result.rowsWritten;
totalSqlStatements += result.statementCount;

// Simulate awaiting next chunk
await scheduler.wait(1)
Expand All @@ -139,6 +144,11 @@ async function test(storage) {
{ val: 'f: 🔥😎🔥' },
]
)

// Verify that all 36 rows we inserted were accounted for.
assert.equal(totalRowsWritten, 36);
assert.equal(totalSqlStatements, 6);

sql.exec(`DELETE FROM streaming`)
await scheduler.wait(1)
}
Expand Down Expand Up @@ -1030,9 +1040,10 @@ async function testStreamingIngestion(request, storage) {
buffer += chunk

// Ingest any complete statements and snip those chars off the buffer
buffer = sql.ingest(buffer)
buffer = sql.ingest(buffer).remainder
}
})

// Verify exactly 36 rows were added
assert.deepEqual(Array.from(sql.exec(`SELECT count(*) FROM streaming`)), [
{ 'count(*)': 36 },
Expand Down
5 changes: 3 additions & 2 deletions src/workerd/api/sql.c++
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ jsg::Ref<SqlStorage::Cursor> SqlStorage::exec(jsg::Lock& js, kj::String querySql
return jsg::alloc<Cursor>(*sqlite, regulator, querySql, kj::mv(bindings));
}

kj::String SqlStorage::ingest(jsg::Lock& js, kj::String querySql) {
SqlStorage::IngestResult SqlStorage::ingest(jsg::Lock& js, kj::String querySql) {
SqliteDatabase::Regulator& regulator = *this;
return kj::str(sqlite->ingestSql(regulator, querySql));
auto result = sqlite->ingestSql(regulator, querySql);
return IngestResult(kj::str(result.remainder), result.rowsRead, result.rowsWritten, result.statementCount);
}

jsg::Ref<SqlStorage::Statement> SqlStorage::prepare(jsg::Lock& js, kj::String query) {
Expand Down
18 changes: 17 additions & 1 deletion src/workerd/api/sql.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ class SqlStorage final: public jsg::Object, private SqliteDatabase::Regulator {

class Cursor;
class Statement;
struct IngestResult;

jsg::Ref<Cursor> exec(jsg::Lock& js, kj::String query, jsg::Arguments<BindingValue> bindings);
kj::String ingest(jsg::Lock& js, kj::String query);
IngestResult ingest(jsg::Lock& js, kj::String query);

jsg::Ref<Statement> prepare(jsg::Lock& js, kj::String query);

Expand Down Expand Up @@ -249,10 +250,25 @@ class SqlStorage::Statement final: public jsg::Object {
friend class Cursor;
};

struct SqlStorage::IngestResult {
IngestResult(kj::String remainder, double rowsRead, double rowsWritten, double statementCount)
: remainder(kj::mv(remainder)), rowsRead(rowsRead), rowsWritten(rowsWritten),
statementCount(statementCount) {}

kj::String remainder;
double rowsRead;
double rowsWritten;
double statementCount;

JSG_STRUCT(remainder, rowsRead, rowsWritten, statementCount);
};


#define EW_SQL_ISOLATE_TYPES \
api::SqlStorage, \
api::SqlStorage::Statement, \
api::SqlStorage::Cursor, \
api::SqlStorage::IngestResult, \
api::SqlStorage::Cursor::RowIterator, \
api::SqlStorage::Cursor::RowIterator::Next, \
api::SqlStorage::Cursor::RawIterator, \
Expand Down
13 changes: 10 additions & 3 deletions src/workerd/util/sqlite.c++
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,11 @@ kj::Own<sqlite3_stmt> SqliteDatabase::prepareSql(
}
}

kj::StringPtr SqliteDatabase::ingestSql(Regulator& regulator, kj::StringPtr sqlCode) {
SqliteDatabase::IngestResult SqliteDatabase::ingestSql(Regulator& regulator, kj::StringPtr sqlCode) {
uint64_t rowsRead = 0;
uint64_t rowsWritten = 0;
uint64_t statementCount = 0;

// While there's still some input SQL to process
while (sqlCode.begin() != sqlCode.end()) {
// And there are still valid statements:
Expand All @@ -457,13 +461,16 @@ kj::StringPtr SqliteDatabase::ingestSql(Regulator& regulator, kj::StringPtr sqlC
// Slice off the next valid statement SQL
auto nextStatement = kj::str(sqlCode.slice(0, statementLength));
// Create a Query object, which will prepare & execute it
Query(*this, regulator, nextStatement);
auto q = Query(*this, regulator, nextStatement);

rowsRead += q.getRowsRead();
rowsWritten += q.getRowsWritten();
statementCount++;
sqlCode = sqlCode.slice(statementLength);
}

// Return the leftover buffer
return sqlCode;
return {.remainder = sqlCode, .rowsRead = rowsRead, .rowsWritten = rowsWritten, .statementCount = statementCount};
}

void SqliteDatabase::executeWithRegulator(Regulator& regulator, kj::FunctionParam<void()> func) {
Expand Down
9 changes: 8 additions & 1 deletion src/workerd/util/sqlite.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ class SqliteDatabase {
class Regulator;
struct VfsOptions;

struct IngestResult {
kj::StringPtr remainder;
uint64_t rowsRead;
uint64_t rowsWritten;
uint64_t statementCount;
};

SqliteDatabase(const Vfs& vfs, kj::PathPtr path, kj::Maybe<kj::WriteMode> maybeMode = kj::none);
~SqliteDatabase() noexcept(false);
KJ_DISALLOW_COPY_AND_MOVE(SqliteDatabase);
Expand Down Expand Up @@ -92,7 +99,7 @@ class SqliteDatabase {
// Helper to execute a chunk of SQL that may not be complete.
// Executes every valid statement provided, and returns the remaining portion of the input
// that was not processed. This is used for streaming SQL ingestion.
kj::StringPtr ingestSql(Regulator& regulator, kj::StringPtr sqlCode);
IngestResult ingestSql(Regulator& regulator, kj::StringPtr sqlCode);

// Execute a function with the given regulator.
void executeWithRegulator(Regulator& regulator, kj::FunctionParam<void()> func);
Expand Down

0 comments on commit 7737c74

Please sign in to comment.