Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return row counts for SQL ingestion (storage.sql.ingest()). #2059

Merged
merged 3 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 31 additions & 20 deletions src/workerd/api/sql-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,23 @@ async function test(storage) {


// Test partial query ingestion
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456; `), ' ')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456;`), '')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456`), ' SELECT 456')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 45`), ' SELECT 45')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 4`), ' SELECT 4')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT `), ' SELECT ')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT`), ' SELECT')
assert.deepEqual(sql.ingest(`SELECT 123; SELEC`), ' SELEC')
assert.deepEqual(sql.ingest(`SELECT 123; SELE`), ' SELE')
assert.deepEqual(sql.ingest(`SELECT 123; SEL`), ' SEL')
assert.deepEqual(sql.ingest(`SELECT 123; SE`), ' SE')
assert.deepEqual(sql.ingest(`SELECT 123; S`), ' S')
assert.deepEqual(sql.ingest(`SELECT 123; `), ' ')
assert.deepEqual(sql.ingest(`SELECT 123;`), '')
assert.deepEqual(sql.ingest(`SELECT 123`), 'SELECT 123')
assert.deepEqual(sql.ingest(`SELECT 12`), 'SELECT 12')
assert.deepEqual(sql.ingest(`SELECT 1`), 'SELECT 1')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456; `).remainder, ' ')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456;`).remainder, '')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 456`).remainder, ' SELECT 456')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 45`).remainder, ' SELECT 45')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT 4`).remainder, ' SELECT 4')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT `).remainder, ' SELECT ')
assert.deepEqual(sql.ingest(`SELECT 123; SELECT`).remainder, ' SELECT')
assert.deepEqual(sql.ingest(`SELECT 123; SELEC`).remainder, ' SELEC')
assert.deepEqual(sql.ingest(`SELECT 123; SELE`).remainder, ' SELE')
assert.deepEqual(sql.ingest(`SELECT 123; SEL`).remainder, ' SEL')
assert.deepEqual(sql.ingest(`SELECT 123; SE`).remainder, ' SE')
assert.deepEqual(sql.ingest(`SELECT 123; S`).remainder, ' S')
assert.deepEqual(sql.ingest(`SELECT 123; `).remainder, ' ')
assert.deepEqual(sql.ingest(`SELECT 123;`).remainder, '')
assert.deepEqual(sql.ingest(`SELECT 123`).remainder, 'SELECT 123')
assert.deepEqual(sql.ingest(`SELECT 12`).remainder, 'SELECT 12')
assert.deepEqual(sql.ingest(`SELECT 1`).remainder, 'SELECT 1')

// Exec throws with trailing comments
assert.throws(
Expand All @@ -87,7 +87,7 @@ async function test(storage) {
)
// Ingest does not
assert.deepEqual(
sql.ingest(`SELECT 123; SELECT 456; -- trailing comment`),
sql.ingest(`SELECT 123; SELECT 456; -- trailing comment`).remainder,
' -- trailing comment'
)

Expand All @@ -107,6 +107,8 @@ async function test(storage) {

// Use a chunk size 1, 3, 9, 27, 81, ... bytes
for (let length = 1; length < inputBytes.length; length = length * 3) {
let totalRowsWritten = 0;
let totalSqlStatements = 0;
let buffer = ''
for (let offset = 0; offset < inputBytes.length; offset += length) {
// Simulate a single "chunk" arriving
Expand All @@ -116,7 +118,10 @@ async function test(storage) {
buffer += decoder.decode(chunk, { stream: true })

// Ingest any complete statements and snip those chars off the buffer
buffer = sql.ingest(buffer)
let result = sql.ingest(buffer);
buffer = result.remainder;
totalRowsWritten += result.rowsWritten;
totalSqlStatements += result.statementCount;

// Simulate awaiting next chunk
await scheduler.wait(1)
Expand All @@ -139,6 +144,11 @@ async function test(storage) {
{ val: 'f: 🔥😎🔥' },
]
)

// Verify that all 36 rows we inserted were accounted for.
assert.equal(totalRowsWritten, 36);
assert.equal(totalSqlStatements, 6);

sql.exec(`DELETE FROM streaming`)
await scheduler.wait(1)
}
Expand Down Expand Up @@ -1030,9 +1040,10 @@ async function testStreamingIngestion(request, storage) {
buffer += chunk

// Ingest any complete statements and snip those chars off the buffer
buffer = sql.ingest(buffer)
buffer = sql.ingest(buffer).remainder
}
})

// Verify exactly 36 rows were added
assert.deepEqual(Array.from(sql.exec(`SELECT count(*) FROM streaming`)), [
{ 'count(*)': 36 },
Expand Down
16 changes: 14 additions & 2 deletions src/workerd/api/sql.c++
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ jsg::Ref<SqlStorage::Cursor> SqlStorage::exec(jsg::Lock& js, kj::String querySql
return jsg::alloc<Cursor>(*sqlite, regulator, querySql, kj::mv(bindings));
}

kj::String SqlStorage::ingest(jsg::Lock& js, kj::String querySql) {
jsg::Ref<SqlStorage::IngestResult> SqlStorage::ingest(jsg::Lock& js, kj::String querySql) {
SqliteDatabase::Regulator& regulator = *this;
return kj::str(sqlite->ingestSql(regulator, querySql));
auto result = sqlite->ingestSql(regulator, querySql);
return jsg::alloc<IngestResult>(kj::str(result.remainder), result.rowsRead, result.rowsWritten, result.statementCount);
}

jsg::Ref<SqlStorage::Statement> SqlStorage::prepare(jsg::Lock& js, kj::String query) {
Expand Down Expand Up @@ -295,4 +296,15 @@ void SqlStorage::visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
}
}

SqlStorage::IngestResult::IngestResult(kj::String remainder, uint64_t rowsRead, uint64_t rowsWritten, uint64_t statementCount) :
remainder(kj::mv(remainder)), rowsRead(rowsRead), rowsWritten(rowsWritten), statementCount(statementCount) {}

kj::StringPtr SqlStorage::IngestResult::getRemainder() { return remainder; }

double SqlStorage::IngestResult::getRowsRead() { return rowsRead; }

double SqlStorage::IngestResult::getRowsWritten() { return rowsWritten; }

double SqlStorage::IngestResult::getStatementCount() { return statementCount; }

} // namespace workerd::api
31 changes: 30 additions & 1 deletion src/workerd/api/sql.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ class SqlStorage final: public jsg::Object, private SqliteDatabase::Regulator {

class Cursor;
class Statement;
class IngestResult;

jsg::Ref<Cursor> exec(jsg::Lock& js, kj::String query, jsg::Arguments<BindingValue> bindings);
kj::String ingest(jsg::Lock& js, kj::String query);
jsg::Ref<IngestResult> ingest(jsg::Lock& js, kj::String query);

jsg::Ref<Statement> prepare(jsg::Lock& js, kj::String query);

Expand Down Expand Up @@ -249,10 +250,38 @@ class SqlStorage::Statement final: public jsg::Object {
friend class Cursor;
};



class SqlStorage::IngestResult final : public jsg::Object {
public:

IngestResult(kj::String remainder, uint64_t rowsRead, uint64_t rowsWritten, uint64_t statementCount);

JSG_RESOURCE_TYPE(IngestResult) {
JSG_READONLY_PROTOTYPE_PROPERTY(statementCount, getStatementCount);
JSG_READONLY_PROTOTYPE_PROPERTY(rowsRead, getRowsRead);
JSG_READONLY_PROTOTYPE_PROPERTY(rowsWritten, getRowsWritten);
JSG_READONLY_PROTOTYPE_PROPERTY(remainder, getRemainder);
}

kj::StringPtr getRemainder();
double getStatementCount();
double getRowsRead();
double getRowsWritten();

private:
kj::String remainder;
uint64_t rowsRead;
uint64_t rowsWritten;
uint64_t statementCount;
};


#define EW_SQL_ISOLATE_TYPES \
api::SqlStorage, \
api::SqlStorage::Statement, \
api::SqlStorage::Cursor, \
api::SqlStorage::IngestResult, \
api::SqlStorage::Cursor::RowIterator, \
api::SqlStorage::Cursor::RowIterator::Next, \
api::SqlStorage::Cursor::RawIterator, \
Expand Down
13 changes: 10 additions & 3 deletions src/workerd/util/sqlite.c++
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,11 @@ kj::Own<sqlite3_stmt> SqliteDatabase::prepareSql(
}
}

kj::StringPtr SqliteDatabase::ingestSql(Regulator& regulator, kj::StringPtr sqlCode) {
SqliteDatabase::IngestResult SqliteDatabase::ingestSql(Regulator& regulator, kj::StringPtr sqlCode) {
uint64_t rowsRead = 0;
uint64_t rowsWritten = 0;
uint64_t statementCount = 0;

// While there's still some input SQL to process
while (sqlCode.begin() != sqlCode.end()) {
// And there are still valid statements:
Expand All @@ -457,13 +461,16 @@ kj::StringPtr SqliteDatabase::ingestSql(Regulator& regulator, kj::StringPtr sqlC
// Slice off the next valid statement SQL
auto nextStatement = kj::str(sqlCode.slice(0, statementLength));
// Create a Query object, which will prepare & execute it
Query(*this, regulator, nextStatement);
auto q = Query(*this, regulator, nextStatement);

rowsRead += q.getRowsRead();
rowsWritten += q.getRowsWritten();
statementCount++;
sqlCode = sqlCode.slice(statementLength);
}

// Return the leftover buffer
return sqlCode;
return {.remainder = sqlCode, .rowsRead = rowsRead, .rowsWritten = rowsWritten, .statementCount = statementCount};
}

void SqliteDatabase::executeWithRegulator(Regulator& regulator, kj::FunctionParam<void()> func) {
Expand Down
9 changes: 8 additions & 1 deletion src/workerd/util/sqlite.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ class SqliteDatabase {
class Regulator;
struct VfsOptions;

struct IngestResult {
kj::StringPtr remainder;
uint64_t rowsRead;
uint64_t rowsWritten;
uint64_t statementCount;
};

SqliteDatabase(const Vfs& vfs, kj::PathPtr path, kj::Maybe<kj::WriteMode> maybeMode = kj::none);
~SqliteDatabase() noexcept(false);
KJ_DISALLOW_COPY_AND_MOVE(SqliteDatabase);
Expand Down Expand Up @@ -92,7 +99,7 @@ class SqliteDatabase {
// Helper to execute a chunk of SQL that may not be complete.
// Executes every valid statement provided, and returns the remaining portion of the input
// that was not processed. This is used for streaming SQL ingestion.
kj::StringPtr ingestSql(Regulator& regulator, kj::StringPtr sqlCode);
IngestResult ingestSql(Regulator& regulator, kj::StringPtr sqlCode);

// Execute a function with the given regulator.
void executeWithRegulator(Regulator& regulator, kj::FunctionParam<void()> func);
Expand Down
Loading