From 6a0dc9963f81a9c1a511806dd43b66461e889d49 Mon Sep 17 00:00:00 2001 From: lmangani Date: Fri, 18 Oct 2024 18:55:54 +0000 Subject: [PATCH 1/4] url duck_flock function --- chsql/CMakeLists.txt | 2 +- chsql/src/chsql_extension.cpp | 4 +- chsql/src/duck_flock.cpp | 78 +++++++++++++++++++++++++++ chsql/src/include/chsql_extension.hpp | 3 ++ 4 files changed, 85 insertions(+), 2 deletions(-) create mode 100644 chsql/src/duck_flock.cpp diff --git a/chsql/CMakeLists.txt b/chsql/CMakeLists.txt index 06de4a9..f2b1f46 100644 --- a/chsql/CMakeLists.txt +++ b/chsql/CMakeLists.txt @@ -21,7 +21,7 @@ include_directories( ../duckdb/third_party/mbedtls ../duckdb/third_party/mbedtls/include ../duckdb/third_party/brotli/include) -set(EXTENSION_SOURCES src/chsql_extension.cpp) +set(EXTENSION_SOURCES src/chsql_extension.cpp src/duck_flock.cpp) build_static_extension(${TARGET_NAME} ${EXTENSION_SOURCES}) build_loadable_extension(${TARGET_NAME} " " ${EXTENSION_SOURCES}) # Link OpenSSL in both the static library as the loadable extension diff --git a/chsql/src/chsql_extension.cpp b/chsql/src/chsql_extension.cpp index 5d55613..1f8576a 100644 --- a/chsql/src/chsql_extension.cpp +++ b/chsql/src/chsql_extension.cpp @@ -179,7 +179,7 @@ static void LoadInternal(DatabaseInstance &instance) { ExtensionUtil::RegisterFunction(instance, chsql_openssl_version_scalar_function); // Macros - for (idx_t index = 0; chsql_macros[index].name != nullptr; index++) { + for (idx_t index = 0; chsql_macros[index].name != nullptr; index++) { auto info = DefaultFunctionGenerator::CreateInternalMacroInfo(chsql_macros[index]); ExtensionUtil::RegisterFunction(instance, *info); } @@ -189,6 +189,8 @@ static void LoadInternal(DatabaseInstance &instance) { ExtensionUtil::RegisterFunction(instance, *table_info); } ExtensionUtil::RegisterFunction(instance, ReadParquetOrderedFunction()); + // Flock + ExtensionUtil::RegisterFunction(instance, DuckFlockTableFunction()); } void ChsqlExtension::Load(DuckDB &db) { diff --git a/chsql/src/duck_flock.cpp b/chsql/src/duck_flock.cpp new file mode 100644 index 0000000..4a9903f --- /dev/null +++ b/chsql/src/duck_flock.cpp @@ -0,0 +1,78 @@ +#ifndef DUCK_FLOCK_H +#define DUCK_FLOCK_H +#include "chsql_extension.hpp" +namespace duckdb { + struct DuckFlockData : FunctionData{ + vector> conn; + vector> results; + unique_ptr Copy() const override { + throw std::runtime_error("not implemented"); + } + bool Equals(const FunctionData &other) const override { + throw std::runtime_error("not implemented"); + }; + }; + + + + unique_ptr DuckFlockBind(ClientContext &context, TableFunctionBindInput &input, + vector &return_types, vector &names) { + auto data = make_uniq(); + auto strQuery = input.inputs[0].GetValue(); + vector flock; + auto &raw_flock = ListValue::GetChildren(input.inputs[1]); + for (auto &duck : raw_flock) { + flock.push_back(duck.ToString()); + auto conn = make_uniq(*context.db); + conn->Query("SET autoload_known_extensions=1;SET autoinstall_known_extensions=1;"); + auto req = conn->Prepare("SELECT * FROM read_json($2 || '/?q=' || url_encode($1::VARCHAR))"); + if (req->HasError()) { + throw std::runtime_error("duck_flock: error: " + req->GetError()); + } + data->conn.push_back(std::move(conn)); + data->results.push_back(std::move(req->Execute(strQuery.c_str(), duck.ToString()))); + } + if (data->results[0]->HasError()) { + throw std::runtime_error("duck_flock: error: " + data->results[0]->GetError()); + } + return_types.clear(); + copy(data->results[0]->types.begin(), data->results[0]->types.end(), back_inserter(return_types)); + names.clear(); + copy(data->results[0]->names.begin(), data->results[0]->names.end(), back_inserter(names)); + return std::move(data); + } + + void DuckFlockImplementation(ClientContext &context, duckdb::TableFunctionInput &data_p, + DataChunk &output) { + auto &data = data_p.bind_data->Cast(); + for (const auto &res : data.results) { + ErrorData error_data; + unique_ptr data_chunk = make_uniq(); + if (res->TryFetch(data_chunk, error_data)) { + if (data_chunk != nullptr) { + output.Append(*data_chunk); + return; + } + } + } + } + + TableFunction DuckFlockTableFunction() { + TableFunction f( + "duck_flock", + {LogicalType::VARCHAR, LogicalType::LIST(LogicalType::VARCHAR)}, + DuckFlockImplementation, + DuckFlockBind, + nullptr, + nullptr + ); + return f; + } + + +} + + + + +#endif diff --git a/chsql/src/include/chsql_extension.hpp b/chsql/src/include/chsql_extension.hpp index 33e2c6f..b1c7a5e 100644 --- a/chsql/src/include/chsql_extension.hpp +++ b/chsql/src/include/chsql_extension.hpp @@ -12,4 +12,7 @@ class ChsqlExtension : public Extension { }; duckdb::TableFunction ReadParquetOrderedFunction(); static void RegisterSillyBTreeStore(DatabaseInstance &instance); + +TableFunction DuckFlockTableFunction(); + } // namespace duckdb From 5d4c2e22658fd83e18c463f12a4c74f91f6768b7 Mon Sep 17 00:00:00 2001 From: Lorenzo Mangani Date: Sat, 19 Oct 2024 01:37:07 +0200 Subject: [PATCH 2/4] Full URL param --- chsql/src/duck_flock.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chsql/src/duck_flock.cpp b/chsql/src/duck_flock.cpp index 4a9903f..4fa9622 100644 --- a/chsql/src/duck_flock.cpp +++ b/chsql/src/duck_flock.cpp @@ -25,7 +25,7 @@ namespace duckdb { flock.push_back(duck.ToString()); auto conn = make_uniq(*context.db); conn->Query("SET autoload_known_extensions=1;SET autoinstall_known_extensions=1;"); - auto req = conn->Prepare("SELECT * FROM read_json($2 || '/?q=' || url_encode($1::VARCHAR))"); + auto req = conn->Prepare("SELECT * FROM read_json($2 || '/?default_format=JSONEachRow&query=' || url_encode($1::VARCHAR))"); if (req->HasError()) { throw std::runtime_error("duck_flock: error: " + req->GetError()); } From 970971ba14b21fdaf6737ed44b2cd0644824ac60 Mon Sep 17 00:00:00 2001 From: Lorenzo Mangani Date: Tue, 22 Oct 2024 23:19:11 +0200 Subject: [PATCH 3/4] Rename to url_flock --- chsql/src/duck_flock.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chsql/src/duck_flock.cpp b/chsql/src/duck_flock.cpp index 4fa9622..f27a11a 100644 --- a/chsql/src/duck_flock.cpp +++ b/chsql/src/duck_flock.cpp @@ -59,7 +59,7 @@ namespace duckdb { TableFunction DuckFlockTableFunction() { TableFunction f( - "duck_flock", + "url_flock", {LogicalType::VARCHAR, LogicalType::LIST(LogicalType::VARCHAR)}, DuckFlockImplementation, DuckFlockBind, From ad1aca1f19123d36ec97ab96ce1758c8f8bb2478 Mon Sep 17 00:00:00 2001 From: Lorenzo Mangani Date: Wed, 23 Oct 2024 00:08:09 +0200 Subject: [PATCH 4/4] Add some error handling --- chsql/src/duck_flock.cpp | 138 +++++++++++++++++++++++++++------------ 1 file changed, 98 insertions(+), 40 deletions(-) diff --git a/chsql/src/duck_flock.cpp b/chsql/src/duck_flock.cpp index f27a11a..90e3588 100644 --- a/chsql/src/duck_flock.cpp +++ b/chsql/src/duck_flock.cpp @@ -1,8 +1,9 @@ #ifndef DUCK_FLOCK_H #define DUCK_FLOCK_H #include "chsql_extension.hpp" + namespace duckdb { - struct DuckFlockData : FunctionData{ + struct DuckFlockData : FunctionData { vector> conn; vector> results; unique_ptr Copy() const override { @@ -13,66 +14,123 @@ namespace duckdb { }; }; - - unique_ptr DuckFlockBind(ClientContext &context, TableFunctionBindInput &input, - vector &return_types, vector &names) { + vector &return_types, vector &names) { auto data = make_uniq(); + + // Check for NULL input parameters + if (input.inputs.empty() || input.inputs.size() < 2) { + throw std::runtime_error("url_flock: missing required parameters"); + } + if (input.inputs[0].IsNull() || input.inputs[1].IsNull()) { + throw std::runtime_error("url_flock: NULL parameters are not allowed"); + } + auto strQuery = input.inputs[0].GetValue(); - vector flock; + if (strQuery.empty()) { + throw std::runtime_error("url_flock: empty query string"); + } + auto &raw_flock = ListValue::GetChildren(input.inputs[1]); + if (raw_flock.empty()) { + throw std::runtime_error("url_flock: empty flock list"); + } + + bool has_valid_result = false; + // Process each connection for (auto &duck : raw_flock) { - flock.push_back(duck.ToString()); - auto conn = make_uniq(*context.db); - conn->Query("SET autoload_known_extensions=1;SET autoinstall_known_extensions=1;"); - auto req = conn->Prepare("SELECT * FROM read_json($2 || '/?default_format=JSONEachRow&query=' || url_encode($1::VARCHAR))"); - if (req->HasError()) { - throw std::runtime_error("duck_flock: error: " + req->GetError()); + if (duck.IsNull() || duck.ToString().empty()) { + continue; + } + + try { + auto conn = make_uniq(*context.db); + if (!conn) { + continue; + } + + auto settingResult = conn->Query("SET autoload_known_extensions=1;SET autoinstall_known_extensions=1;"); + if (settingResult->HasError()) { + continue; + } + + auto req = conn->Prepare("SELECT * FROM read_json($2 || '/?default_format=JSONEachRow&query=' || url_encode($1::VARCHAR))"); + if (req->HasError()) { + continue; + } + + auto queryResult = req->Execute(strQuery.c_str(), duck.ToString()); + if (!queryResult || queryResult->HasError()) { + continue; + } + + // Store the first valid result's types and names + if (!has_valid_result) { + return_types.clear(); + copy(queryResult->types.begin(), queryResult->types.end(), back_inserter(return_types)); + names.clear(); + copy(queryResult->names.begin(), queryResult->names.end(), back_inserter(names)); + + if (return_types.empty()) { + throw std::runtime_error("url_flock: query must return at least one column"); + } + has_valid_result = true; + } + + data->conn.push_back(std::move(conn)); + data->results.push_back(std::move(queryResult)); + } catch (const std::exception &e) { + continue; } - data->conn.push_back(std::move(conn)); - data->results.push_back(std::move(req->Execute(strQuery.c_str(), duck.ToString()))); } - if (data->results[0]->HasError()) { - throw std::runtime_error("duck_flock: error: " + data->results[0]->GetError()); + + // Verify we have at least one valid result + if (!has_valid_result || data->results.empty()) { + throw std::runtime_error("url_flock: invalid or no results"); } - return_types.clear(); - copy(data->results[0]->types.begin(), data->results[0]->types.end(), back_inserter(return_types)); - names.clear(); - copy(data->results[0]->names.begin(), data->results[0]->names.end(), back_inserter(names)); + return std::move(data); } - void DuckFlockImplementation(ClientContext &context, duckdb::TableFunctionInput &data_p, - DataChunk &output) { + void DuckFlockImplementation(ClientContext &context, TableFunctionInput &data_p, + DataChunk &output) { auto &data = data_p.bind_data->Cast(); + + if (data.results.empty()) { + return; + } + for (const auto &res : data.results) { + if (!res) { + continue; + } + ErrorData error_data; unique_ptr data_chunk = make_uniq(); - if (res->TryFetch(data_chunk, error_data)) { - if (data_chunk != nullptr) { - output.Append(*data_chunk); - return; + + try { + if (res->TryFetch(data_chunk, error_data)) { + if (data_chunk && !data_chunk->size() == 0) { + output.Append(*data_chunk); + return; + } } + } catch (...) { + continue; } } } TableFunction DuckFlockTableFunction() { - TableFunction f( - "url_flock", - {LogicalType::VARCHAR, LogicalType::LIST(LogicalType::VARCHAR)}, - DuckFlockImplementation, - DuckFlockBind, - nullptr, - nullptr - ); - return f; + TableFunction f( + "url_flock", + {LogicalType::VARCHAR, LogicalType::LIST(LogicalType::VARCHAR)}, + DuckFlockImplementation, + DuckFlockBind, + nullptr, + nullptr + ); + return f; } - - } - - - - #endif