Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return row counts for SQL ingestion (storage.sql.ingest()). #2059

Merged
merged 3 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 31 additions & 20 deletions src/workerd/api/sql-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,23 @@ async function test(storage) {


// Test partial query ingestion
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456; `), ' ')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456;`), '')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456`), ' SELECT 456')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 45`), ' SELECT 45')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 4`), ' SELECT 4')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT `), ' SELECT ')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT`), ' SELECT')
assert.deepEqual(sql.ingest(`SELECT 123; SELEC`), ' SELEC')
assert.deepEqual(sql.ingest(`SELECT 123; SELE`), ' SELE')
assert.deepEqual(sql.ingest(`SELECT 123; SEL`), ' SEL')
assert.deepEqual(sql.ingest(`SELECT 123; SE`), ' SE')
assert.deepEqual(sql.ingest(`SELECT 123; S`), ' S')
assert.deepEqual(sql.ingest(`SELECT 123; `), ' ')
assert.deepEqual(sql.ingest(`SELECT 123;`), '')
assert.deepEqual(sql.ingest(`SELECT 123`), 'SELECT 123')
assert.deepEqual(sql.ingest(`SELECT 12`), 'SELECT 12')
assert.deepEqual(sql.ingest(`SELECT 1`), 'SELECT 1')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456; `).remainder, ' ')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456;`).remainder, '')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456`).remainder, ' SELECT 456')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 45`).remainder, ' SELECT 45')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 4`).remainder, ' SELECT 4')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT `).remainder, ' SELECT ')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT`).remainder, ' SELECT')
assert.deepEqual(sql.ingest(`SELECT 123; SELEC`).remainder, ' SELEC')
assert.deepEqual(sql.ingest(`SELECT 123; SELE`).remainder, ' SELE')
assert.deepEqual(sql.ingest(`SELECT 123; SEL`).remainder, ' SEL')
assert.deepEqual(sql.ingest(`SELECT 123; SE`).remainder, ' SE')
assert.deepEqual(sql.ingest(`SELECT 123; S`).remainder, ' S')
assert.deepEqual(sql.ingest(`SELECT 123; `).remainder, ' ')
assert.deepEqual(sql.ingest(`SELECT 123;`).remainder, '')
assert.deepEqual(sql.ingest(`SELECT 123`).remainder, 'SELECT 123')
assert.deepEqual(sql.ingest(`SELECT 12`).remainder, 'SELECT 12')
assert.deepEqual(sql.ingest(`SELECT 1`).remainder, 'SELECT 1')

// Exec throws with trailing comments
assert.throws(
Expand All @@ -87,7 +87,7 @@ async function test(storage) {
)
// Ingest does not
assert.deepEqual(
sql.ingest(`SELECT 123; SELECT 456; -- trailing comment`),
sql.ingest(`SELECT 123; SELECT 456; -- trailing comment`).remainder,
' -- trailing comment'
)

Expand All @@ -107,6 +107,8 @@ async function test(storage) {

// Use a chunk size 1, 3, 9, 27, 81, ... bytes
for (let length = 1; length < inputBytes.length; length = length * 3) {
let totalRowsWritten = 0;
let totalSqlStatements = 0;
let buffer = ''
for (let offset = 0; offset < inputBytes.length; offset += length) {
// Simulate a single "chunk" arriving
Expand All @@ -116,7 +118,10 @@ async function test(storage) {
buffer += decoder.decode(chunk, { stream: true })

// Ingest any complete statements and snip those chars off the buffer
buffer = sql.ingest(buffer)
let result = sql.ingest(buffer);
buffer = result.remainder;
totalRowsWritten += result.rowsWritten;
totalSqlStatements += result.statementCount;

// Simulate awaiting next chunk
await scheduler.wait(1)
Expand All @@ -139,6 +144,11 @@ async function test(storage) {
{ val: 'f: 🔥😎🔥' },
]
)

// Verify that all 36 rows we inserted were accounted for.
assert.equal(totalRowsWritten, 36);
assert.equal(totalSqlStatements, 6);

sql.exec(`DELETE FROM streaming`)
await scheduler.wait(1)
}
Expand Down Expand Up @@ -1030,9 +1040,10 @@ async function testStreamingIngestion(request, storage) {
buffer += chunk

// Ingest any complete statements and snip those chars off the buffer
buffer = sql.ingest(buffer)
buffer = sql.ingest(buffer).remainder
}
})

// Verify exactly 36 rows were added
assert.deepEqual(Array.from(sql.exec(`SELECT count(*) FROM streaming`)), [
{ 'count(*)': 36 },
Expand Down
5 changes: 3 additions & 2 deletions src/workerd/api/sql.c++
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ jsg::Ref<SqlStorage::Cursor> SqlStorage::exec(jsg::Lock& js, kj::String querySql
return jsg::alloc<Cursor>(*sqlite, regulator, querySql, kj::mv(bindings));
}

kj::String SqlStorage::ingest(jsg::Lock& js, kj::String querySql) {
SqlStorage::IngestResult SqlStorage::ingest(jsg::Lock& js, kj::String querySql) {
SqliteDatabase::Regulator& regulator = *this;
return kj::str(sqlite->ingestSql(regulator, querySql));
auto result = sqlite->ingestSql(regulator, querySql);
return IngestResult(kj::str(result.remainder), result.rowsRead, result.rowsWritten, result.statementCount);
}

jsg::Ref<SqlStorage::Statement> SqlStorage::prepare(jsg::Lock& js, kj::String query) {
Expand Down
18 changes: 17 additions & 1 deletion src/workerd/api/sql.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ class SqlStorage final: public jsg::Object, private SqliteDatabase::Regulator {

class Cursor;
class Statement;
struct IngestResult;

jsg::Ref<Cursor> exec(jsg::Lock& js, kj::String query, jsg::Arguments<BindingValue> bindings);
kj::String ingest(jsg::Lock& js, kj::String query);
IngestResult ingest(jsg::Lock& js, kj::String query);

jsg::Ref<Statement> prepare(jsg::Lock& js, kj::String query);

Expand Down Expand Up @@ -249,10 +250,25 @@ class SqlStorage::Statement final: public jsg::Object {
friend class Cursor;
};

struct SqlStorage::IngestResult {
IngestResult(kj::String remainder, double rowsRead, double rowsWritten, double statementCount)
: remainder(kj::mv(remainder)), rowsRead(rowsRead), rowsWritten(rowsWritten),
statementCount(statementCount) {}

kj::String remainder;
double rowsRead;
double rowsWritten;
double statementCount;

JSG_STRUCT(remainder, rowsRead, rowsWritten, statementCount);
};


#define EW_SQL_ISOLATE_TYPES \
api::SqlStorage, \
api::SqlStorage::Statement, \
api::SqlStorage::Cursor, \
api::SqlStorage::IngestResult, \
api::SqlStorage::Cursor::RowIterator, \
api::SqlStorage::Cursor::RowIterator::Next, \
api::SqlStorage::Cursor::RawIterator, \
Expand Down
13 changes: 10 additions & 3 deletions src/workerd/util/sqlite.c++
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,11 @@ kj::Own<sqlite3_stmt> SqliteDatabase::prepareSql(
}
}

kj::StringPtr SqliteDatabase::ingestSql(Regulator& regulator, kj::StringPtr sqlCode) {
SqliteDatabase::IngestResult SqliteDatabase::ingestSql(Regulator& regulator, kj::StringPtr sqlCode) {
uint64_t rowsRead = 0;
uint64_t rowsWritten = 0;
uint64_t statementCount = 0;

// While there's still some input SQL to process
while (sqlCode.begin() != sqlCode.end()) {
// And there are still valid statements:
Expand All @@ -457,13 +461,16 @@ kj::StringPtr SqliteDatabase::ingestSql(Regulator& regulator, kj::StringPtr sqlC
// Slice off the next valid statement SQL
auto nextStatement = kj::str(sqlCode.slice(0, statementLength));
// Create a Query object, which will prepare & execute it
Query(*this, regulator, nextStatement);
auto q = Query(*this, regulator, nextStatement);

rowsRead += q.getRowsRead();
rowsWritten += q.getRowsWritten();
statementCount++;
sqlCode = sqlCode.slice(statementLength);
}

// Return the leftover buffer
return sqlCode;
return {.remainder = sqlCode, .rowsRead = rowsRead, .rowsWritten = rowsWritten, .statementCount = statementCount};
}

void SqliteDatabase::executeWithRegulator(Regulator& regulator, kj::FunctionParam<void()> func) {
Expand Down
9 changes: 8 additions & 1 deletion src/workerd/util/sqlite.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ class SqliteDatabase {
class Regulator;
struct VfsOptions;

struct IngestResult {
kj::StringPtr remainder;
uint64_t rowsRead;
uint64_t rowsWritten;
uint64_t statementCount;
};

SqliteDatabase(const Vfs& vfs, kj::PathPtr path, kj::Maybe<kj::WriteMode> maybeMode = kj::none);
~SqliteDatabase() noexcept(false);
KJ_DISALLOW_COPY_AND_MOVE(SqliteDatabase);
Expand Down Expand Up @@ -92,7 +99,7 @@ class SqliteDatabase {
// Helper to execute a chunk of SQL that may not be complete.
// Executes every valid statement provided, and returns the remaining portion of the input
// that was not processed. This is used for streaming SQL ingestion.
kj::StringPtr ingestSql(Regulator& regulator, kj::StringPtr sqlCode);
IngestResult ingestSql(Regulator& regulator, kj::StringPtr sqlCode);

// Execute a function with the given regulator.
void executeWithRegulator(Regulator& regulator, kj::FunctionParam<void()> func);
Expand Down
Loading