diff --git a/CMakeLists.txt b/CMakeLists.txt index fd3a331..b50a248 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,4 +23,5 @@ set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin) add_subdirectory(libs/om_common) add_subdirectory(libs/om_network) add_subdirectory(node) -add_subdirectory(oracles) \ No newline at end of file +add_subdirectory(oracles) +add_subdirectory(tools) \ No newline at end of file diff --git a/README.md b/README.md index a011a55..8f27d5d 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,7 @@ This is a middleware system that bridges Qubic nodes with external oracle servic ┌────────┴────────┐ ▼ ▼ ┌─────────┐ ┌──────────┐ - │ Price │ │ Weather │ + │ Price │ │ Mock │ │ Service │ │ Service │ │ :31842 │ │ :31843 │ └────┬────┘ └──────────┘ @@ -146,7 +146,7 @@ Offset | Size | Field | Type | Description Offset | Size | Field | Type | Description -------|------|-----------------------|--------|--------------------------- 0 | 8 | oracleQueryId | uint64 | Query identifier for reply correlation -8 | 4 | oracleInterfaceIndex | uint32 | Interface type (0=Price, 1=Weather) +8 | 4 | oracleInterfaceIndex | uint32 | Interface type (0=Price, 1=Mock) 12 | 4 | timeoutInMilliseconds | uint32 | Query timeout ``` @@ -271,8 +271,8 @@ cd docker | `QUBIC_NODES` | 10.29.1.22,192.168.1.2 | Comma-separated whitelist of Qubic node IPs | | `PRICE_SERVICE_HOST` | 0.0.0.0 | Price service host | | `PRICE_SERVICE_PORT` | 9001 | Price service port | -| `WEATHER_SERVICE_HOST` | 0.0.0.0 | Weather service host | -| `WEATHER_SERVICE_PORT` | 9002 | Weather service port | +| `MOCK_SERVICE_HOST` | 0.0.0.0 | Mock service host | +| `MOCK_SERVICE_PORT` | 9002 | Mock service port | ## .env File Setup @@ -292,8 +292,8 @@ QUBIC_NODES=10.29.1.22,127.0.0.1 PRICE_SERVICE_HOST=127.0.0.1 PRICE_SERVICE_PORT=31842 -WEATHER_SERVICE_HOST=127.0.0.1 -WEATHER_SERVICE_PORT=31843 +MOCK_SERVICE_HOST=127.0.0.1 +MOCK_SERVICE_PORT=31843 ``` ## Docker Compose Environment diff --git a/example_env b/example_env index c791b16..e376539 100644 --- a/example_env +++ b/example_env @@ -36,6 +36,19 @@ export PRICE_SERVICE_PORT=31842 export MOCK_SERVICE_HOST=192.168.1.5 export MOCK_SERVICE_PORT=31843 +# ----------------------------------------------------------------------------- +# Exchange API Keys (optional - for higher rate limits) +# If not set, free tier with rate limiting will be used. + +# Binance API Key +# export BINANCE_API_KEY=your_binance_api_key + +# MEXC API Key +# export MEXC_API_KEY=your_mexc_api_key + +# Gate.io API Key +# export GATE_API_KEY=your_gate_api_key + # ----------------------------------------------------------------------------- # Docker Settings (only used when running with docker compose) diff --git a/libs/om_common/include/om_common/qpi_adapter.h b/libs/om_common/include/om_common/qpi_adapter.h index cda9308..98cfc1b 100644 --- a/libs/om_common/include/om_common/qpi_adapter.h +++ b/libs/om_common/include/om_common/qpi_adapter.h @@ -88,3 +88,8 @@ QPI::id QPI::AssetIssuanceIterator::issuer() const { return QPI::id::zero(); } void QPI::AssetOwnershipIterator::begin(const QPI::Asset&, const QPI::AssetOwnershipSelect&) {} void QPI::AssetPossessionIterator::begin(const QPI::Asset&, const QPI::AssetOwnershipSelect&, const QPI::AssetPossessionSelect&) {} template void QPI::copyMemory(T1&, const T2&) {} +void* __acquireScratchpad(unsigned long long size, bool initZero) { return nullptr; } +void __releaseScratchpad(void* ptr) {} +#ifndef NDEBUG +static void addDebugMessageAssert(const char* message, const char* file, const unsigned int lineNumber) {} +#endif diff --git a/libs/om_network/include/om_network/tcp_server.h b/libs/om_network/include/om_network/tcp_server.h index 9554749..5980a06 100644 --- a/libs/om_network/include/om_network/tcp_server.h +++ b/libs/om_network/include/om_network/tcp_server.h @@ -21,6 +21,10 @@ #include #endif +#ifdef TRY_AGAIN +#undef TRY_AGAIN +#endif + namespace oracle { class Session; @@ -52,7 +56,7 @@ class TcpServer private: void acceptLoop(); - void clientThread(int client_fd, const std::string& client_ip); + void clientThread(int client_fd, std::string client_ip); void removeClientFD(int client_fd); void cleanupFinishedThreads(); diff --git a/libs/om_network/src/session.cpp b/libs/om_network/src/session.cpp index 8b5705f..bcee7a1 100644 --- a/libs/om_network/src/session.cpp +++ b/libs/om_network/src/session.cpp @@ -15,6 +15,7 @@ #include #endif +#include #include #include @@ -147,42 +148,71 @@ bool Session::sendData(const uint8_t* data, int size) return true; } -int Session::receiveExact(uint8_t* buffer, int sz) +// Single recv call. Returns: >0 bytes received, 0 timeout, -1 error/close. +int Session::receive(uint8_t* buffer, int sz) { if (!_active || _socketFD < 0) - return 0; + return -1; - int total_received = 0; - while (sz > 0) + int received = recv(_socketFD, (char*)buffer, sz, 0); + if (received > 0) + { + _bytesReceived += received; + return received; + } + else if (received == 0) + { + // Graceful close by peer + OM_LOG_DEBUG() << "receive: graceful close by peer (IP: " << _remoteIP << ")"; + _active.store(false); + return -1; + } + else // received < 0 { - int received = recv(_socketFD, (char*)buffer + total_received, sz, 0); - if (received <= 0) +#ifdef _MSC_VER + int err = WSAGetLastError(); + if (err == WSAETIMEDOUT || err == WSAEWOULDBLOCK) +#else + if (errno == EAGAIN || errno == EWOULDBLOCK) +#endif { - // 0 = Graceful close, -1 = Error (or timeout) - _active.store(false); - break; + // Socket timeout - connection still alive + OM_LOG_DEBUG() << "receive: timeout, no data (IP: " << _remoteIP << ")"; + return 0; } - total_received += received; - _bytesReceived += received; - sz -= received; + // Real error +#ifdef _MSC_VER + OM_LOG_ERROR() << "receive: recv error=" << err << " (IP: " << _remoteIP << ")"; +#else + OM_LOG_ERROR() << "receive: recv errno=" << errno << " (IP: " << _remoteIP << ")"; +#endif + _active.store(false); + return -1; } - return total_received; } -int Session::receive(uint8_t* buffer, int sz) +// Loop until exactly sz bytes received. Returns: sz on success, 0 timeout, -1 error/close. +int Session::receiveExact(uint8_t* buffer, int sz) { - if (!_active || _socketFD < 0) - return 0; - - int received = recv(_socketFD, (char*)buffer, sz, 0); - if (received <= 0) + int total_received = 0; + while (total_received < sz) { - _active.store(false); - return 0; + int received = receive(buffer + total_received, sz - total_received); + if (received > 0) + { + total_received += received; + } + else if (received == 0) + { + // Timeout - return what we have so far + return total_received; + } + else // -1: error or close + { + return (total_received > 0) ? total_received : -1; + } } - - _bytesReceived += received; - return received; + return total_received; } void Session::forceShutdown() diff --git a/libs/om_network/src/tcp_server.cpp b/libs/om_network/src/tcp_server.cpp index d5d04ba..62824e8 100644 --- a/libs/om_network/src/tcp_server.cpp +++ b/libs/om_network/src/tcp_server.cpp @@ -140,18 +140,19 @@ void TcpServer::cleanupFinishedThreads() std::vector threadsToJoin; { std::lock_guard lock(_threadsMutex); - auto it = _clientThreads.begin(); - while (it != _clientThreads.end()) + auto clientIt = _clientThreads.begin(); + while (clientIt != _clientThreads.end()) { - if (_finishedThreadIds.count(it->get_id()) > 0) + auto finishedIt = _finishedThreadIds.find(clientIt->get_id()); + if (finishedIt != _finishedThreadIds.end()) { - threadsToJoin.push_back(std::move(*it)); - _finishedThreadIds.erase(it->get_id()); - it = _clientThreads.erase(it); + _finishedThreadIds.erase(finishedIt); + threadsToJoin.push_back(std::move(*clientIt)); + clientIt = _clientThreads.erase(clientIt); } else { - ++it; + ++clientIt; } } } @@ -320,7 +321,7 @@ void TcpServer::acceptLoop() } } -void TcpServer::clientThread(int clientFd, const std::string& clientIP) +void TcpServer::clientThread(int clientFd, std::string clientIP) { auto startTime = std::chrono::steady_clock::now(); diff --git a/node/src/interface_client.cpp b/node/src/interface_client.cpp index 1b46fc1..ec75ec5 100644 --- a/node/src/interface_client.cpp +++ b/node/src/interface_client.cpp @@ -336,68 +336,88 @@ void InterfaceClient::workerThread() OM_LOG_DEBUG() << "InterfaceClient[" << _interfaceIndex << "] Worker thread exiting"; } -bool InterfaceClient::processQueryRequest(const QueryRequest& request, InterfaceQueryResult& result) +int InterfaceClient::sendAndReceive(const QueryRequest& request, InterfaceQueryResult& result) { - auto startTime = std::chrono::steady_clock::now(); + std::lock_guard lock(_connectionMutex); - _totalQueries.fetch_add(1); + if (!_session || !_session->isActive()) + { + return 0; // retryable: connection was lost, reconnect and try again + } - // Ensure connected - if (!connect()) + // Send query data + if (!_session->sendData(request.queryData.data(), request.queryData.size())) { - OM_LOG_ERROR() << "InterfaceClient[" << _interfaceIndex << "] Request #" - << request.requestID << " failed: cannot connect"; - return false; + _session->close(); + return 0; // retryable: likely stale connection } - // Send query and receive reply + // Receive reply header + RequestResponseHeader headerBuffer; + int headerReceived = + _session->receiveExact((uint8_t*)&headerBuffer, sizeof(RequestResponseHeader)); + if (headerReceived != sizeof(RequestResponseHeader)) { - std::lock_guard lock(_connectionMutex); + OM_LOG_ERROR() << "InterfaceClient[" << _interfaceIndex << "] Request #" + << request.requestID << " failed: invalid reply header"; + return -1; + } - if (!_session || !_session->isActive()) - { - OM_LOG_ERROR() << "InterfaceClient[" << _interfaceIndex << "] Request #" - << request.requestID << " failed: session not active"; - return false; - } + // Receive reply payload + result.replyData.resize(headerBuffer.size()); + int received = _session->receive( + result.replyData.data() + sizeof(RequestResponseHeader), headerBuffer.getPayloadSize()); - // Send query data - if (!_session->sendData(request.queryData.data(), request.queryData.size())) + if (received != (int)headerBuffer.getPayloadSize()) + { + if (_session->isActive()) { OM_LOG_ERROR() << "InterfaceClient[" << _interfaceIndex << "] Request #" - << request.requestID << " failed: cannot send"; - _session->close(); - return false; + << request.requestID << " failed: invalid reply size " << received; } + _session->close(); + return -1; + } - // Receive reply - // Header first - RequestResponseHeader headerBuffer; - int headerReceived = - _session->receiveExact((uint8_t*)&headerBuffer, sizeof(RequestResponseHeader)); - if (headerReceived != sizeof(RequestResponseHeader)) + result.valid = true; + return 1; // success +} + +bool InterfaceClient::processQueryRequest(const QueryRequest& request, InterfaceQueryResult& result) +{ + static constexpr int MAX_SEND_RETRIES = 8; + + auto startTime = std::chrono::steady_clock::now(); + + _totalQueries.fetch_add(1); + + for (int attempt = 0; attempt <= MAX_SEND_RETRIES; ++attempt) + { + // Ensure connected (reconnects if previous attempt closed the session) + if (!connect()) { OM_LOG_ERROR() << "InterfaceClient[" << _interfaceIndex << "] Request #" - << request.requestID << " failed: invalid reply header"; + << request.requestID << " failed: cannot connect"; return false; } - result.replyData.resize(headerBuffer.size()); - int received = _session->receive( - result.replyData.data() + sizeof(RequestResponseHeader), headerBuffer.getPayloadSize()); + int rc = sendAndReceive(request, result); + if (rc == 1) + break; // success + if (rc < 0) + return false; // non-retryable error - if (received != (int)headerBuffer.getPayloadSize()) - { - if (_session->isActive()) - { - OM_LOG_ERROR() << "InterfaceClient[" << _interfaceIndex << "] Request #" - << request.requestID << " failed: invalid reply size " << received; - } - _session->close(); - return false; - } + // rc == 0: send failed (stale connection), retry + OM_LOG_WARNING() << "InterfaceClient[" << _interfaceIndex << "] Request #" + << request.requestID << " send failed, reconnecting... (attempt " + << (attempt + 1) << "/" << MAX_SEND_RETRIES << ")"; + } - result.valid = true; + if (!result.valid) + { + OM_LOG_ERROR() << "InterfaceClient[" << _interfaceIndex << "] Request #" + << request.requestID << " failed after " << MAX_SEND_RETRIES << " retries"; + return false; } auto elapsed = std::chrono::duration_cast( diff --git a/node/src/interface_client.h b/node/src/interface_client.h index f1e9790..f687011 100644 --- a/node/src/interface_client.h +++ b/node/src/interface_client.h @@ -51,7 +51,7 @@ class InterfaceClient /** * Constructor * - * @param interfaceIndex Interface type (0 = Price, 1 = Weather, etc.) + * @param interfaceIndex Interface type (0 = Price, 1 = Mock, etc.) * @param host Host of the aggregator service * @param port Port of the aggregator service */ @@ -131,9 +131,13 @@ class InterfaceClient // Worker thread function void workerThread(); - // Process a single query request + // Process a single query request (with retry on stale connection) bool processQueryRequest(const QueryRequest& request, InterfaceQueryResult& result); + // Single attempt to send query and receive reply. Returns: + // 1 = success, 0 = send failed (retryable), -1 = non-retryable error + int sendAndReceive(const QueryRequest& request, InterfaceQueryResult& result); + // Interface info uint32_t _interfaceIndex; std::string _host; diff --git a/node/src/node_connection.cpp b/node/src/node_connection.cpp index 8ddc71d..876b118 100644 --- a/node/src/node_connection.cpp +++ b/node/src/node_connection.cpp @@ -128,6 +128,11 @@ void NodeConnection::handleSession(Session& session) // This will block until header is received, or TIMEOUT occurs (handled by Session) int received = session.receiveExact((uint8_t*)&header, sizeof(header)); + while (!received) + { + // No data received, try again + received = session.receiveExact((uint8_t*)&header, sizeof(header)); + } if (received != sizeof(header)) { diff --git a/oracles/core/base_oracle_service.cpp b/oracles/core/base_oracle_service.cpp index 002c828..4dc625b 100644 --- a/oracles/core/base_oracle_service.cpp +++ b/oracles/core/base_oracle_service.cpp @@ -9,6 +9,10 @@ #include #include +#ifdef min +#undef min +#endif + namespace oracle { diff --git a/oracles/core/base_oracle_service.h b/oracles/core/base_oracle_service.h index 84b6fb0..0feba24 100644 --- a/oracles/core/base_oracle_service.h +++ b/oracles/core/base_oracle_service.h @@ -44,7 +44,7 @@ class Session; * Abstract base class for Oracle Machine services. * * Handles protocol-level operations (header, query, reply parsing). - * Concrete services (Price, Weather, etc.) implement interface-specific logic. + * Concrete services (Price, Mock, Weather, etc.) implement interface-specific logic. * * Usage: * 1. Inherit from BaseOracleService diff --git a/oracles/price/CMakeLists.txt b/oracles/price/CMakeLists.txt index 82f3e85..529995b 100644 --- a/oracles/price/CMakeLists.txt +++ b/oracles/price/CMakeLists.txt @@ -4,6 +4,11 @@ project(price_oracle_service) set(SERVICE_SOURCES ${SERVICE_CORE_DIR}/base_oracle_service.cpp src/price_service.cpp + src/exchange_provider.cpp + src/binance_provider.cpp + src/mexc_provider.cpp + src/gate_provider.cpp + src/combined_provider.cpp ) # Create price oracle service executable diff --git a/oracles/price/src/binance_provider.cpp b/oracles/price/src/binance_provider.cpp new file mode 100644 index 0000000..d188b17 --- /dev/null +++ b/oracles/price/src/binance_provider.cpp @@ -0,0 +1,109 @@ +#include "binance_provider.h" +#include "om_common/logger.h" + +#include +#include + +namespace oracle +{ + +BinancePriceProvider::BinancePriceProvider(const std::string& apiKey) : + ExchangePriceProvider( + "Binance", + BASE_URL, + apiKey, + RATE_LIMIT_FREE, + RATE_LIMIT_PAID) +{ +} + +std::string BinancePriceProvider::buildKlineUrl( + const std::string& symbol, + int64_t startTimeMs, + int64_t endTimeMs, + int limit) +{ + std::ostringstream url; + url << _apiBaseUrl << "/api/v3/klines" + << "?symbol=" << symbol + << "&interval=1m" + << "&startTime=" << startTimeMs + << "&endTime=" << endTimeMs + << "&limit=" << limit; + return url.str(); +} + +std::string BinancePriceProvider::formatSymbol( + const std::string& base, + const std::string& quote) +{ + // Binance format: BTCUSDT (uppercase, no separator) + std::string symbol = base + quote; + std::transform(symbol.begin(), symbol.end(), symbol.begin(), ::toupper); + return symbol; +} + +uint16_t BinancePriceProvider::parseKlineResponse( + const std::string& response, + std::string& closePriceString) +{ + // Binance kline response format: + // [[openTime, open, high, low, close, volume, closeTime, ...], ...] + // Close price is at index 4 (5th element) + + // Check for empty response or empty array + if (response.empty() || response == "[]") + { + OM_LOG_ERROR() << "[Binance] Empty response - pair may not be supported"; + return RETURN_ERROR_INVALID_ARG; + } + + // Find the start of the inner array + size_t startPos = response.find('['); + if (startPos == std::string::npos) + { + OM_LOG_ERROR() << "[Binance] Invalid response format: " << response; + return RETURN_ERROR_ORACLE_UNAVAIL; + } + + // Skip to inner array + startPos = response.find('[', startPos + 1); + if (startPos == std::string::npos) + { + OM_LOG_ERROR() << "[Binance] No kline data in response: " << response; + return RETURN_ERROR_INVALID_ARG; + } + + // Find close price (5th comma-separated value, index 4) + int commaCount = 0; + size_t pos = startPos + 1; + while (commaCount < 4 && pos < response.size()) + { + if (response[pos] == ',') + { + commaCount++; + } + pos++; + } + + if (commaCount != 4) + { + OM_LOG_ERROR() << "[Binance] Could not find close price in response"; + return RETURN_ERROR_ORACLE_UNAVAIL; + } + + // Extract close price (quoted string) + size_t quoteStart = response.find('"', pos); + size_t quoteEnd = response.find('"', quoteStart + 1); + + if (quoteStart == std::string::npos || quoteEnd == std::string::npos) + { + OM_LOG_ERROR() << "[Binance] Could not parse close price"; + return RETURN_ERROR_ORACLE_UNAVAIL; + } + + closePriceString = response.substr(quoteStart + 1, quoteEnd - quoteStart - 1); + return RETURN_NO_ERROR; +} + +} // namespace oracle diff --git a/oracles/price/src/binance_provider.h b/oracles/price/src/binance_provider.h new file mode 100644 index 0000000..270ea80 --- /dev/null +++ b/oracles/price/src/binance_provider.h @@ -0,0 +1,39 @@ +#pragma once + +#include "exchange_provider.h" + +namespace oracle +{ + +/** + * Binance exchange price provider. + * API: https://api.binance.com/api/v3/klines + * Symbol format: BTCUSDT (uppercase, no separator) + */ +class BinancePriceProvider : public ExchangePriceProvider +{ +public: + explicit BinancePriceProvider(const std::string& apiKey = ""); + +protected: + std::string buildKlineUrl( + const std::string& symbol, + int64_t startTimeMs, + int64_t endTimeMs, + int limit) override; + + std::string formatSymbol( + const std::string& base, + const std::string& quote) override; + + uint16_t parseKlineResponse( + const std::string& response, + std::string& closePriceString) override; + +private: + static constexpr const char* BASE_URL = "https://api.binance.com"; + static constexpr double RATE_LIMIT_FREE = 0.5; // 2 req/sec for free tier + static constexpr double RATE_LIMIT_PAID = 0.1; // 10 req/sec for paid tier +}; + +} // namespace oracle diff --git a/oracles/price/src/combined_provider.cpp b/oracles/price/src/combined_provider.cpp new file mode 100644 index 0000000..45a5e64 --- /dev/null +++ b/oracles/price/src/combined_provider.cpp @@ -0,0 +1,212 @@ +#include "combined_provider.h" +#include "binance_provider.h" +#include "mexc_provider.h" +#include "gate_provider.h" +#include "om_common/logger.h" + +namespace oracle +{ + +// Helper function for GCD +static int64_t gcd(int64_t a, int64_t b) +{ + while (b != 0) + { + int64_t t = b; + b = a % b; + a = t; + } + return a; +} + +CombinedPriceProvider::CombinedPriceProvider( + const std::string& name, + std::shared_ptr provider1, + std::shared_ptr provider2) : + PriceProvider(name) +{ + _providers.push_back(provider1); + _providers.push_back(provider2); + + OM_LOG_INFO() << "[" << _name << "] Combined provider initialized with: " + << provider1->getName() << " + " << provider2->getName(); +} + +uint16_t CombinedPriceProvider::getPrice( + const std::string& currency1, + const std::string& currency2, + int64_t& numerator, + int64_t& denominator) +{ + std::vector> prices; + + // Query all providers + for (auto& provider : _providers) + { + int64_t num = 0, denom = 1; + uint16_t result = provider->getPrice(currency1, currency2, num, denom); + + if (result != RETURN_NO_ERROR) + { + OM_LOG_ERROR() << "[" << _name << "] Provider " << provider->getName() + << " failed with error 0x" << std::hex << result << std::dec; + // If ANY provider fails, propagate the error + return result; + } + + prices.emplace_back(num, denom); + OM_LOG_DEBUG() << "[" << _name << "] " << provider->getName() + << " returned: " << num << "/" << denom; + } + + // Compute mean of all prices + if (prices.size() == 2) + { + computeMean( + prices[0].first, prices[0].second, + prices[1].first, prices[1].second, + numerator, denominator); + } + else + { + // Fallback for single provider (shouldn't happen in normal usage) + numerator = prices[0].first; + denominator = prices[0].second; + } + + OM_LOG_DEBUG() << "[" << _name << "] Combined price: " << numerator << "/" << denominator; + + return RETURN_NO_ERROR; +} + +uint16_t CombinedPriceProvider::getPriceAtTimestamp( + const std::string& currency1, + const std::string& currency2, + int64_t timestampMs, + int64_t& numerator, + int64_t& denominator) +{ + std::vector> prices; + + // Query all providers at the specified timestamp + for (auto& provider : _providers) + { + int64_t num = 0, denom = 1; + uint16_t result = provider->getPriceAtTimestamp(currency1, currency2, timestampMs, num, denom); + + if (result != RETURN_NO_ERROR) + { + OM_LOG_ERROR() << "[" << _name << "] Provider " << provider->getName() + << " failed with error 0x" << std::hex << result << std::dec + << " for timestamp " << timestampMs; + // If ANY provider fails, propagate the error + return result; + } + + prices.emplace_back(num, denom); + OM_LOG_DEBUG() << "[" << _name << "] " << provider->getName() + << " returned: " << num << "/" << denom << " at timestamp " << timestampMs; + } + + // Compute mean of all prices + if (prices.size() == 2) + { + computeMean( + prices[0].first, prices[0].second, + prices[1].first, prices[1].second, + numerator, denominator); + } + else + { + // Fallback for single provider (shouldn't happen in normal usage) + numerator = prices[0].first; + denominator = prices[0].second; + } + + OM_LOG_DEBUG() << "[" << _name << "] Combined price at timestamp " << timestampMs + << ": " << numerator << "/" << denominator; + + return RETURN_NO_ERROR; +} + +void CombinedPriceProvider::computeMean( + int64_t num1, int64_t denom1, + int64_t num2, int64_t denom2, + int64_t& resultNum, int64_t& resultDenom) +{ + // Mean of a/b and c/d = (a*d + c*b) / (2*b*d) + // If both have the same denominator, we can simplify: mean = (num1 + num2) / 2 + + if (denom1 == denom2) + { + // Common case: same denominator + int64_t sum = num1 + num2; + + // Check if sum is even for exact division + if (sum % 2 == 0) + { + resultNum = sum / 2; + resultDenom = denom1; + } + else + { + // Odd sum: keep full precision + resultNum = sum; + resultDenom = denom1 * 2; + } + } + else + { + // General case: different denominators + // mean = (num1*denom2 + num2*denom1) / (2*denom1*denom2) + // Use careful computation to avoid overflow + + // For typical crypto prices with 10^8 denominator, this should be safe + int64_t crossNum1 = num1 * (denom2 / gcd(denom1, denom2)); + int64_t crossNum2 = num2 * (denom1 / gcd(denom1, denom2)); + int64_t lcmDenom = (denom1 / gcd(denom1, denom2)) * denom2; + + resultNum = (crossNum1 + crossNum2) / 2; + resultDenom = lcmDenom; + + // Handle odd sum + if ((crossNum1 + crossNum2) % 2 != 0) + { + resultNum = crossNum1 + crossNum2; + resultDenom = lcmDenom * 2; + } + } +} + +// Factory functions +std::shared_ptr createBinanceMexcProvider( + const std::string& binanceApiKey, + const std::string& mexcApiKey) +{ + return std::make_shared( + "Binance+MEXC", + std::make_shared(binanceApiKey), + std::make_shared(mexcApiKey)); +} + +std::shared_ptr createBinanceGateProvider( + const std::string& binanceApiKey, + const std::string& gateApiKey) +{ + return std::make_shared( + "Binance+Gate", + std::make_shared(binanceApiKey), + std::make_shared(gateApiKey)); +} + +std::shared_ptr createMexcGateProvider( + const std::string& mexcApiKey, + const std::string& gateApiKey) +{ + return std::make_shared( + "MEXC+Gate", + std::make_shared(mexcApiKey), + std::make_shared(gateApiKey)); +} + +} // namespace oracle diff --git a/oracles/price/src/combined_provider.h b/oracles/price/src/combined_provider.h new file mode 100644 index 0000000..b9e459e --- /dev/null +++ b/oracles/price/src/combined_provider.h @@ -0,0 +1,63 @@ +#pragma once + +#include "price_service.h" +#include "exchange_provider.h" + +#include +#include + +namespace oracle +{ + +/** + * Combined price provider that aggregates prices from multiple sources. + * Returns the arithmetic mean of prices from all configured providers. + * Returns error if ANY source is unavailable or returns an error. + */ +class CombinedPriceProvider : public PriceProvider +{ +public: + CombinedPriceProvider( + const std::string& name, + std::shared_ptr provider1, + std::shared_ptr provider2); + + uint16_t getPrice( + const std::string& currency1, + const std::string& currency2, + int64_t& numerator, + int64_t& denominator) override; + + uint16_t getPriceAtTimestamp( + const std::string& currency1, + const std::string& currency2, + int64_t timestampMs, + int64_t& numerator, + int64_t& denominator) override; + +private: + std::vector> _providers; + + /** + * Compute mean of two prices. + */ + static void computeMean( + int64_t num1, int64_t denom1, + int64_t num2, int64_t denom2, + int64_t& resultNum, int64_t& resultDenom); +}; + +// Convenience factory functions for creating combined providers +std::shared_ptr createBinanceMexcProvider( + const std::string& binanceApiKey = "", + const std::string& mexcApiKey = ""); + +std::shared_ptr createBinanceGateProvider( + const std::string& binanceApiKey = "", + const std::string& gateApiKey = ""); + +std::shared_ptr createMexcGateProvider( + const std::string& mexcApiKey = "", + const std::string& gateApiKey = ""); + +} // namespace oracle diff --git a/oracles/price/src/exchange_provider.cpp b/oracles/price/src/exchange_provider.cpp new file mode 100644 index 0000000..30a3fca --- /dev/null +++ b/oracles/price/src/exchange_provider.cpp @@ -0,0 +1,237 @@ +#include "exchange_provider.h" +#include "om_common/logger.h" + +#include +#include +#include +#include +#include +#include + +#include + +namespace oracle +{ + +ExchangePriceProvider::ExchangePriceProvider( + const std::string& name, + const std::string& baseUrl, + const std::string& apiKey, + double rateLimitDelayFree, + double rateLimitDelayPaid, + int cacheTtl) : + PriceProvider(name), + _apiKey(apiKey), + _apiBaseUrl(baseUrl), + _rateLimitDelay(apiKey.empty() ? rateLimitDelayFree : rateLimitDelayPaid), + _cacheTtl(cacheTtl), + _lastRequestTime(std::chrono::steady_clock::now() - std::chrono::seconds(10)) +{ + if (!apiKey.empty()) + { + OM_LOG_INFO() << "[" << _name << "] Using paid API tier"; + } + else + { + OM_LOG_INFO() << "[" << _name << "] Using free tier (rate limited)"; + } +} + +uint16_t ExchangePriceProvider::getPrice( + const std::string& currency1, + const std::string& currency2, + int64_t& numerator, + int64_t& denominator) +{ + // Get current time and calculate the previous finished minute + auto now = std::chrono::system_clock::now(); + auto nowMs = std::chrono::duration_cast( + now.time_since_epoch()).count(); + + // Get the previous finished minute (floor to minute, then go back one minute) + int64_t currentMinuteMs = (nowMs / 60000) * 60000; + int64_t prevMinuteMs = currentMinuteMs - 60000; + + return getPriceAtTimestamp(currency1, currency2, prevMinuteMs, numerator, denominator); +} + +uint16_t ExchangePriceProvider::getPriceAtTimestamp( + const std::string& currency1, + const std::string& currency2, + int64_t timestampMs, + int64_t& numerator, + int64_t& denominator) +{ + // If timestampMs is 0, use current time (previous finished minute) + if (timestampMs == 0) + { + auto now = std::chrono::system_clock::now(); + auto nowMs = std::chrono::duration_cast( + now.time_since_epoch()).count(); + int64_t currentMinuteMs = (nowMs / 60000) * 60000; + timestampMs = currentMinuteMs - 60000; // Previous finished minute + + // Log the actual time being used + time_t t = timestampMs / 1000; + struct tm* tm = gmtime(&t); + OM_LOG_DEBUG() << "[" << _name << "] Using current price timestamp: " + << (tm->tm_year + 1900) << "-" + << std::setw(2) << std::setfill('0') << (tm->tm_mon + 1) << "-" + << std::setw(2) << std::setfill('0') << tm->tm_mday << " " + << std::setw(2) << std::setfill('0') << tm->tm_hour << ":" + << std::setw(2) << std::setfill('0') << tm->tm_min << ":" + << std::setw(2) << std::setfill('0') << tm->tm_sec << " UTC"; + } + + // Convert currencies to uppercase + std::string base = currency1; + std::string quote = currency2; + std::transform(base.begin(), base.end(), base.begin(), ::toupper); + std::transform(quote.begin(), quote.end(), quote.begin(), ::toupper); + + std::string symbol = formatSymbol(base, quote); + + // Calculate the 1-minute candle that closed before this timestamp + // Floor to minute boundary + int64_t candleCloseMs = (timestampMs / 60000) * 60000; + int64_t candleOpenMs = candleCloseMs - 60000; + + std::string cacheKey = symbol + "@" + std::to_string(candleCloseMs); + + // Check cache + { + std::lock_guard lock(_cacheMutex); + auto it = _cache.find(cacheKey); + if (it != _cache.end()) + { + auto elapsed = std::chrono::steady_clock::now() - it->second.fetchTime; + if (std::chrono::duration_cast(elapsed).count() < _cacheTtl) + { + numerator = it->second.numerator; + denominator = it->second.denominator; + OM_LOG_DEBUG() << "[" << _name << "] Cache hit: " << cacheKey; + return RETURN_NO_ERROR; + } + } + } + + // Build URL for kline/candlestick data + std::string url = buildKlineUrl(symbol, candleOpenMs, candleCloseMs, 1); + + OM_LOG_INFO() << "[" << _name << "] Fetching kline: " << url; + + enforceRateLimit(); + + std::string response; + uint16_t httpResult = httpGet(url, response); + if (httpResult != RETURN_NO_ERROR) + { + return httpResult; + } + + std::string closePriceString; + uint16_t parseResult = parseKlineResponse(response, closePriceString); + if (parseResult != RETURN_NO_ERROR) + { + return parseResult; + } + + if (!priceStringToRational(closePriceString, numerator, denominator)) + { + OM_LOG_ERROR() << "Failed to convert price stirng " << closePriceString << " to rational number"; + return RETURN_ERROR_ORACLE_UNAVAIL; + } + + // Update cache + { + std::lock_guard lock(_cacheMutex); + CacheEntry entry; + entry.numerator = numerator; + entry.denominator = denominator; + entry.candleTimestampMs = candleCloseMs; + entry.fetchTime = std::chrono::steady_clock::now(); + _cache[cacheKey] = entry; + } + + OM_LOG_DEBUG() << "[" << _name << "] Price: " << closePriceString + << " (" << numerator << "/" << denominator << ")"; + + return RETURN_NO_ERROR; +} + +void ExchangePriceProvider::enforceRateLimit() +{ + std::lock_guard lock(_rateLimitMutex); + + auto now = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast( + now - _lastRequestTime).count(); + + int64_t delayMs = static_cast(_rateLimitDelay * 1000); + + if (elapsed < delayMs) + { + int64_t sleepMs = delayMs - elapsed; + OM_LOG_DEBUG() << "[" << _name << "] Rate limit: sleeping " << sleepMs << "ms"; + std::this_thread::sleep_for(std::chrono::milliseconds(sleepMs)); + } + + _lastRequestTime = std::chrono::steady_clock::now(); +} + +uint16_t ExchangePriceProvider::httpGet(const std::string& url, std::string& response) +{ + CURL* curl = curl_easy_init(); + if (!curl) + { + OM_LOG_ERROR() << "[" << _name << "] Failed to initialize curl"; + return RETURN_ERROR_ORACLE_UNAVAIL; + } + + curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, + +[](void* ptr, size_t size, size_t nmemb, void* userdata) -> size_t { + std::string* str = static_cast(userdata); + str->append(static_cast(ptr), size * nmemb); + return size * nmemb; + }); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, 30L); + + // Add API key header if available (subclasses may override header format) + struct curl_slist* headers = nullptr; + if (!_apiKey.empty()) + { + std::string header = "X-MBX-APIKEY: " + _apiKey; + headers = curl_slist_append(headers, header.c_str()); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); + } + + CURLcode res = curl_easy_perform(curl); + + if (headers) + { + curl_slist_free_all(headers); + } + + if (res != CURLE_OK) + { + OM_LOG_ERROR() << "[" << _name << "] Curl error: " << curl_easy_strerror(res); + curl_easy_cleanup(curl); + return RETURN_ERROR_ORACLE_UNAVAIL; + } + + long httpCode = 0; + curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode); + curl_easy_cleanup(curl); + + if (httpCode != 200) + { + OM_LOG_ERROR() << "[" << _name << "] HTTP " << httpCode << ": " << response; + return RETURN_ERROR_ORACLE_UNAVAIL; + } + + return RETURN_NO_ERROR; +} + +} // namespace oracle diff --git a/oracles/price/src/exchange_provider.h b/oracles/price/src/exchange_provider.h new file mode 100644 index 0000000..a62cc8e --- /dev/null +++ b/oracles/price/src/exchange_provider.h @@ -0,0 +1,118 @@ +#pragma once + +#include "price_service.h" + +#include +#include +#include +#include + +namespace oracle +{ + +/** + * Abstract base class for exchange-based price providers. + * Provides common HTTP request handling, caching, rate limiting, and timestamp-based price lookup. + */ +class ExchangePriceProvider : public PriceProvider +{ +public: + ExchangePriceProvider( + const std::string& name, + const std::string& baseUrl, + const std::string& apiKey = "", + double rateLimitDelayFree = 0.5, + double rateLimitDelayPaid = 0.1, + int cacheTtl = 60); + + virtual ~ExchangePriceProvider() = default; + + /** + * Get current price (uses latest completed 1-minute candle). + */ + uint16_t getPrice( + const std::string& currency1, + const std::string& currency2, + int64_t& numerator, + int64_t& denominator) override; + + /** + * Get price at a specific timestamp (previous finished minute candle). + * @param currency1 Base currency (e.g., "BTC") + * @param currency2 Quote currency (e.g., "USDT", "USD", "USDC") + * @param timestampMs Unix timestamp in milliseconds + * @param numerator Output: price numerator + * @param denominator Output: price denominator + * @return RETURN_NO_ERROR on success + */ + virtual uint16_t getPriceAtTimestamp( + const std::string& currency1, + const std::string& currency2, + int64_t timestampMs, + int64_t& numerator, + int64_t& denominator); + +protected: + // Must be implemented by each exchange provider + + /** + * Build the kline/candlestick API URL. + * @param symbol Trading symbol in exchange format (e.g., "BTCUSDT" or "BTC_USDT") + * @param startTimeMs Start time in milliseconds + * @param endTimeMs End time in milliseconds + * @param limit Number of candles to fetch + * @return Full API URL + */ + virtual std::string buildKlineUrl( + const std::string& symbol, + int64_t startTimeMs, + int64_t endTimeMs, + int limit = 1) = 0; + + /** + * Format the trading symbol for this exchange. + * @param base Base currency (e.g., "BTC") + * @param quote Quote currency (e.g., "USDT") + * @return Symbol in exchange format (e.g., "BTCUSDT" or "BTC_USDT") + */ + virtual std::string formatSymbol( + const std::string& base, + const std::string& quote) = 0; + + /** + * Parse the kline response and extract the close price. + * @param response Raw API response + * @param closePrice Output: extracted close price + * @return RETURN_NO_ERROR on success + */ + virtual uint16_t parseKlineResponse( + const std::string& response, + std::string& closePriceString) = 0; + + // Common HTTP functionality + uint16_t httpGet(const std::string& url, std::string& response); + + // Rate limiting + void enforceRateLimit(); + + // Cache structure with timestamp support + struct CacheEntry + { + int64_t numerator; + int64_t denominator; + int64_t candleTimestampMs; + std::chrono::steady_clock::time_point fetchTime; + }; + + std::string _apiKey; + std::string _apiBaseUrl; + double _rateLimitDelay; + int _cacheTtl; + + std::map _cache; // key: "BTC/USDT@timestamp" + std::mutex _cacheMutex; + std::mutex _rateLimitMutex; + std::chrono::steady_clock::time_point _lastRequestTime; +}; + +} // namespace oracle diff --git a/oracles/price/src/gate_provider.cpp b/oracles/price/src/gate_provider.cpp new file mode 100644 index 0000000..f036b79 --- /dev/null +++ b/oracles/price/src/gate_provider.cpp @@ -0,0 +1,113 @@ +#include "gate_provider.h" +#include "om_common/logger.h" + +#include +#include + +namespace oracle +{ + +GatePriceProvider::GatePriceProvider(const std::string& apiKey) : + ExchangePriceProvider( + "Gate", + BASE_URL, + apiKey, + RATE_LIMIT_FREE, + RATE_LIMIT_PAID) +{ +} + +std::string GatePriceProvider::buildKlineUrl( + const std::string& symbol, + int64_t startTimeMs, + int64_t endTimeMs, + int limit) +{ + // Gate.io uses seconds, not milliseconds + int64_t startTimeSec = startTimeMs / 1000; + int64_t endTimeSec = endTimeMs / 1000; + + std::ostringstream url; + url << _apiBaseUrl << "/api/v4/spot/candlesticks" + << "?currency_pair=" << symbol + << "&interval=1m" + << "&from=" << startTimeSec + << "&to=" << endTimeSec + << "&limit=" << limit; + return url.str(); +} + +std::string GatePriceProvider::formatSymbol( + const std::string& base, + const std::string& quote) +{ + // Gate.io format: BTC_USDT (uppercase with underscore) + std::string symbol = base + "_" + quote; + std::transform(symbol.begin(), symbol.end(), symbol.begin(), ::toupper); + return symbol; +} + +uint16_t GatePriceProvider::parseKlineResponse( + const std::string& response, + std::string& closePriceString) +{ + // Gate.io candlestick response format: + // [["timestamp", "volume", "close", "high", "low", "open", "is_final"], ...] + // Close price is at index 2 (3rd element) + + // Check for empty response or empty array + if (response.empty() || response == "[]") + { + OM_LOG_ERROR() << "[Gate] Empty response - pair may not be supported"; + return RETURN_ERROR_INVALID_ARG; + } + + // Find the start of the inner array + size_t startPos = response.find('['); + if (startPos == std::string::npos) + { + OM_LOG_ERROR() << "[Gate] Invalid response format: " << response; + return RETURN_ERROR_ORACLE_UNAVAIL; + } + + // Skip to inner array + startPos = response.find('[', startPos + 1); + if (startPos == std::string::npos) + { + OM_LOG_ERROR() << "[Gate] No kline data in response: " << response; + return RETURN_ERROR_INVALID_ARG; + } + + // Find close price (3rd comma-separated value, index 2) + int commaCount = 0; + size_t pos = startPos + 1; + while (commaCount < 2 && pos < response.size()) + { + if (response[pos] == ',') + { + commaCount++; + } + pos++; + } + + if (commaCount != 2) + { + OM_LOG_ERROR() << "[Gate] Could not find close price in response"; + return RETURN_ERROR_ORACLE_UNAVAIL; + } + + // Extract close price (quoted string) + size_t quoteStart = response.find('"', pos); + size_t quoteEnd = response.find('"', quoteStart + 1); + + if (quoteStart == std::string::npos || quoteEnd == std::string::npos) + { + OM_LOG_ERROR() << "[Gate] Could not parse close price"; + return RETURN_ERROR_ORACLE_UNAVAIL; + } + + closePriceString = response.substr(quoteStart + 1, quoteEnd - quoteStart - 1); + return RETURN_NO_ERROR; +} + +} // namespace oracle diff --git a/oracles/price/src/gate_provider.h b/oracles/price/src/gate_provider.h new file mode 100644 index 0000000..16158b0 --- /dev/null +++ b/oracles/price/src/gate_provider.h @@ -0,0 +1,39 @@ +#pragma once + +#include "exchange_provider.h" + +namespace oracle +{ + +/** + * Gate.io exchange price provider. + * API: https://api.gateio.ws/api/v4/spot/candlesticks + * Symbol format: BTC_USDT (uppercase with underscore) + */ +class GatePriceProvider : public ExchangePriceProvider +{ +public: + explicit GatePriceProvider(const std::string& apiKey = ""); + +protected: + std::string buildKlineUrl( + const std::string& symbol, + int64_t startTimeMs, + int64_t endTimeMs, + int limit) override; + + std::string formatSymbol( + const std::string& base, + const std::string& quote) override; + + uint16_t parseKlineResponse( + const std::string& response, + std::string& closePriceString) override; + +private: + static constexpr const char* BASE_URL = "https://api.gateio.ws"; + static constexpr double RATE_LIMIT_FREE = 1.0; // 1 req/sec for free tier + static constexpr double RATE_LIMIT_PAID = 0.2; // 5 req/sec for paid tier +}; + +} // namespace oracle diff --git a/oracles/price/src/mexc_provider.cpp b/oracles/price/src/mexc_provider.cpp new file mode 100644 index 0000000..71a58d8 --- /dev/null +++ b/oracles/price/src/mexc_provider.cpp @@ -0,0 +1,109 @@ +#include "mexc_provider.h" +#include "om_common/logger.h" + +#include +#include + +namespace oracle +{ + +MexcPriceProvider::MexcPriceProvider(const std::string& apiKey) : + ExchangePriceProvider( + "MEXC", + BASE_URL, + apiKey, + RATE_LIMIT_FREE, + RATE_LIMIT_PAID) +{ +} + +std::string MexcPriceProvider::buildKlineUrl( + const std::string& symbol, + int64_t startTimeMs, + int64_t endTimeMs, + int limit) +{ + std::ostringstream url; + url << _apiBaseUrl << "/api/v3/klines" + << "?symbol=" << symbol + << "&interval=1m" + << "&startTime=" << startTimeMs + << "&endTime=" << endTimeMs + << "&limit=" << limit; + return url.str(); +} + +std::string MexcPriceProvider::formatSymbol( + const std::string& base, + const std::string& quote) +{ + // MEXC format: BTCUSDT (uppercase, no separator) - same as Binance + std::string symbol = base + quote; + std::transform(symbol.begin(), symbol.end(), symbol.begin(), ::toupper); + return symbol; +} + +uint16_t MexcPriceProvider::parseKlineResponse( + const std::string& response, + std::string& closePriceString) +{ + // MEXC uses same kline format as Binance + // [[openTime, open, high, low, close, volume, closeTime, ...], ...] + // Close price is at index 4 (5th element) + + // Check for empty response or empty array + if (response.empty() || response == "[]") + { + OM_LOG_ERROR() << "[MEXC] Empty response - pair may not be supported"; + return RETURN_ERROR_INVALID_ARG; + } + + // Find the start of the inner array + size_t startPos = response.find('['); + if (startPos == std::string::npos) + { + OM_LOG_ERROR() << "[MEXC] Invalid response format: " << response; + return RETURN_ERROR_ORACLE_UNAVAIL; + } + + // Skip to inner array + startPos = response.find('[', startPos + 1); + if (startPos == std::string::npos) + { + OM_LOG_ERROR() << "[MEXC] No kline data in response: " << response; + return RETURN_ERROR_INVALID_ARG; + } + + // Find close price (5th comma-separated value, index 4) + int commaCount = 0; + size_t pos = startPos + 1; + while (commaCount < 4 && pos < response.size()) + { + if (response[pos] == ',') + { + commaCount++; + } + pos++; + } + + if (commaCount != 4) + { + OM_LOG_ERROR() << "[MEXC] Could not find close price in response"; + return RETURN_ERROR_ORACLE_UNAVAIL; + } + + // Extract close price (quoted string) + size_t quoteStart = response.find('"', pos); + size_t quoteEnd = response.find('"', quoteStart + 1); + + if (quoteStart == std::string::npos || quoteEnd == std::string::npos) + { + OM_LOG_ERROR() << "[MEXC] Could not parse close price"; + return RETURN_ERROR_ORACLE_UNAVAIL; + } + + closePriceString = response.substr(quoteStart + 1, quoteEnd - quoteStart - 1); + return RETURN_NO_ERROR; +} + +} // namespace oracle diff --git a/oracles/price/src/mexc_provider.h b/oracles/price/src/mexc_provider.h new file mode 100644 index 0000000..e20c8d3 --- /dev/null +++ b/oracles/price/src/mexc_provider.h @@ -0,0 +1,39 @@ +#pragma once + +#include "exchange_provider.h" + +namespace oracle +{ + +/** + * MEXC exchange price provider. + * API: https://api.mexc.com/api/v3/klines + * Symbol format: BTCUSDT (uppercase, no separator) + */ +class MexcPriceProvider : public ExchangePriceProvider +{ +public: + explicit MexcPriceProvider(const std::string& apiKey = ""); + +protected: + std::string buildKlineUrl( + const std::string& symbol, + int64_t startTimeMs, + int64_t endTimeMs, + int limit) override; + + std::string formatSymbol( + const std::string& base, + const std::string& quote) override; + + uint16_t parseKlineResponse( + const std::string& response, + std::string& closePriceString) override; + +private: + static constexpr const char* BASE_URL = "https://api.mexc.com"; + static constexpr double RATE_LIMIT_FREE = 0.5; // 2 req/sec for free tier + static constexpr double RATE_LIMIT_PAID = 0.1; // 10 req/sec for paid tier +}; + +} // namespace oracle diff --git a/oracles/price/src/price_service.cpp b/oracles/price/src/price_service.cpp index 5bbe745..8ae0b25 100644 --- a/oracles/price/src/price_service.cpp +++ b/oracles/price/src/price_service.cpp @@ -1,13 +1,19 @@ #include "price_service.h" +#include "binance_provider.h" +#include "mexc_provider.h" +#include "gate_provider.h" +#include "combined_provider.h" #include "om_common/logger.h" #include #include #include #include +#include #include #include #include +#include // For HTTP requests (using libcurl) #include @@ -15,19 +21,121 @@ namespace oracle { +bool priceStringToRational(const std::string& priceStr, int64_t& numerator, int64_t& denominator) +{ + // Try to convert string to rational directly + numerator = 0; + denominator = 1; + bool okay = true; + bool seenDot = false; + for (size_t i = 0; i < priceStr.size(); ++i) + { + char c = priceStr[i]; + if (c >= '0' && c <= '9') + { + // regular digit + numerator = numerator * 10 + (c - '0'); + if (seenDot) + denominator *= 10; + } + else if (c == '.' && !seenDot) + { + // decimal point + seenDot = true; + } + else + { + // unexpected character + okay = false; + break; + } + } + OM_LOG_DEBUG() << "priceStringToRational(): priceStr " << priceStr << ", num " << numerator << ", denom " << denominator << ", okay " << okay; + if (okay) + return true; + + // Fallback solution + double price; + try + { + price = std::stod(priceStr); + } + catch (const std::exception& e) + { + return false; + } + + // Use fixed 8 decimal places precision (10^8) + constexpr int64_t PRICE_DENOMINATOR = 100000000; + numerator = static_cast(std::round(price * PRICE_DENOMINATOR)); + denominator = PRICE_DENOMINATOR; + + return true; +} + static std::string getTimeStampString(const QPI::DateAndTime& rQpiDateTime) { std::ostringstream ss; - ss << rQpiDateTime.getYear(); - ss << rQpiDateTime.getMonth(); - ss << rQpiDateTime.getDay(); - ss << rQpiDateTime.getHour(); - ss << rQpiDateTime.getMinute(); - ss << rQpiDateTime.getSecond(); + ss << rQpiDateTime.getYear() << "-" + << std::setfill('0') << std::setw(2) << (int)rQpiDateTime.getMonth() << "-" + << std::setw(2) << (int)rQpiDateTime.getDay() << " " + << std::setw(2) << (int)rQpiDateTime.getHour() << ":" + << std::setw(2) << (int)rQpiDateTime.getMinute() << ":" + << std::setw(2) << (int)rQpiDateTime.getSecond() << " UTC"; return ss.str(); } +// Convert QPI::DateAndTime to Unix timestamp in milliseconds (UTC) +static int64_t dateTimeToUnixMs(const QPI::DateAndTime& dt) +{ + // Build tm struct (UTC) + struct tm tm = {}; + tm.tm_year = dt.getYear() - 1900; // tm_year is years since 1900 + tm.tm_mon = dt.getMonth() - 1; // tm_mon is 0-11 + tm.tm_mday = dt.getDay(); + tm.tm_hour = dt.getHour(); + tm.tm_min = dt.getMinute(); + tm.tm_sec = dt.getSecond(); + tm.tm_isdst = 0; // Not daylight saving + + // Convert to Unix timestamp (UTC) using timegm (POSIX) or _mkgmtime (Windows) +#ifdef _WIN32 + time_t t = _mkgmtime(&tm); +#else + time_t t = timegm(&tm); +#endif + + if (t == -1) + { + return 0; // Invalid date + } + + // Convert to milliseconds and add the millisecond component + return static_cast(t) * 1000 + dt.getMillisec(); +} + +// Normalize oracle ID for combined oracles (e.g., "gate_binance" -> "binance_gate") +// Sorts exchange names alphabetically so order doesn't matter +static std::string normalizeOracleId(const std::string& oracleId) +{ + size_t underscorePos = oracleId.find('_'); + if (underscorePos == std::string::npos) + { + return oracleId; + } + + std::string first = oracleId.substr(0, underscorePos); + std::string second = oracleId.substr(underscorePos + 1); + + if (first > second) + { + std::swap(first, second); + } + + return first + "_" + second; +} + std::string PriceService::bytesToString(const char* data, size_t maxLen) { // Find null terminator or end of buffer @@ -303,25 +411,18 @@ uint16_t CoinGeckoPriceProvider::fetchFromAPI( size_t endPos = response.find_first_of("},", pos); std::string priceStr = response.substr(pos, endPos - pos); - try + if (!priceStringToRational(priceStr, numerator, denominator)) + { + OM_LOG_ERROR() << "[" << _name << "] Failed to parse price: " << priceStr; + return RETURN_ERROR_ORACLE_UNAVAIL; + } + else { - double price = std::stod(priceStr); - - // Convert to rational number (numerator/denominator) - // For simplicity, use 6 decimal places of precision - numerator = static_cast(price * 1000000); - denominator = 1000000; - OM_LOG_DEBUG() << "[" << _name << "] Price fetched: " << currency1 << "/" << currency2 - << " = " << price << " (" << numerator << "/" << denominator << ")"; + << " = " << priceStr << " (" << numerator << "/" << denominator << ")"; return RETURN_NO_ERROR; } - catch (const std::exception& e) - { - OM_LOG_ERROR() << "[" << _name << "] Failed to parse price: " << e.what(); - return RETURN_ERROR_ORACLE_UNAVAIL; - } } // ============================================================================ @@ -338,12 +439,51 @@ PriceService::PriceService(const std::string& rHostname, uint16_t hostPort) : // Register default providers registerProvider("mock", std::make_shared()); - // Register CoinGecko if API key is available - const char* apiKey = std::getenv("COINGECKO_API_KEY"); - const char* apiType = std::getenv("COINGECKO_API_TYPE"); + // Register CoinGecko + const char* coingeckoApiKey = std::getenv("COINGECKO_API_KEY"); + const char* coingeckoApiType = std::getenv("COINGECKO_API_TYPE"); registerProvider( "coingecko", - std::make_shared(apiKey ? apiKey : "", apiType ? apiType : "free")); + std::make_shared( + coingeckoApiKey ? coingeckoApiKey : "", + coingeckoApiType ? coingeckoApiType : "free")); + + // Register exchange providers + const char* binanceApiKey = std::getenv("BINANCE_API_KEY"); + const char* mexcApiKey = std::getenv("MEXC_API_KEY"); + const char* gateApiKey = std::getenv("GATE_API_KEY"); + + // Single source providers + registerProvider( + "binance", + std::make_shared(binanceApiKey ? binanceApiKey : "")); + + registerProvider( + "mexc", + std::make_shared(mexcApiKey ? mexcApiKey : "")); + + registerProvider( + "gate", + std::make_shared(gateApiKey ? gateApiKey : "")); + + // Combined providers (mean of 2 sources) + registerProvider( + "binance_mexc", + createBinanceMexcProvider( + binanceApiKey ? binanceApiKey : "", + mexcApiKey ? mexcApiKey : "")); + + registerProvider( + "binance_gate", + createBinanceGateProvider( + binanceApiKey ? binanceApiKey : "", + gateApiKey ? gateApiKey : "")); + + registerProvider( + "gate_mexc", // Alphabetical order: g < m + createMexcGateProvider( + mexcApiKey ? mexcApiKey : "", + gateApiKey ? gateApiKey : "")); } PriceService::~PriceService() @@ -387,11 +527,25 @@ uint16_t PriceService::processInterfaceQuery( std::string currency1 = bytesToString((const char*)query.currency1.m256i_i8, 32); std::string currency2 = bytesToString((const char*)query.currency2.m256i_i8, 32); - OM_LOG_DEBUG() << " Query: oracle=" << oracleId << ", " << currency1 << "/" << currency2 - << ", timestamp=" << getTimeStampString(query.timestamp); + // Check if timestamp is zero (current price request) + bool isCurrentPrice = (query.timestamp.getYear() == 0 && query.timestamp.getMonth() == 0 && + query.timestamp.getDay() == 0 && query.timestamp.getHour() == 0 && + query.timestamp.getMinute() == 0 && query.timestamp.getSecond() == 0); - // Look for provider + if (isCurrentPrice) + { + OM_LOG_DEBUG() << " Query: oracle=" << oracleId << ", " << currency1 << "/" << currency2 + << ", timestamp=0 (current price)"; + } + else + { + OM_LOG_DEBUG() << " Query: oracle=" << oracleId << ", " << currency1 << "/" << currency2 + << ", timestamp=" << getTimeStampString(query.timestamp); + } + + // Look for provider (normalize to handle gate_binance == binance_gate) std::transform(oracleId.begin(), oracleId.end(), oracleId.begin(), ::tolower); + oracleId = normalizeOracleId(oracleId); std::shared_ptr provider; { @@ -409,11 +563,20 @@ uint16_t PriceService::processInterfaceQuery( return RETURN_ERROR_INVALID_ORACLE; // Provider not found } - // Get price + // Convert query timestamp to Unix milliseconds (UTC) + // A zero timestamp (all fields = 0) means "current price" - keep timestampMs = 0 + int64_t timestampMs = 0; + if (!isCurrentPrice) + { + timestampMs = dateTimeToUnixMs(query.timestamp); + } + + // Get price at the specified timestamp Price::OracleReply reply; int64_t numerator = 0; int64_t denominator = 1; - uint16_t returnValue = provider->getPrice(currency1, currency2, numerator, denominator); + uint16_t returnValue = provider->getPriceAtTimestamp( + currency1, currency2, timestampMs, numerator, denominator); if (returnValue != RETURN_NO_ERROR) { return returnValue; // Price not available diff --git a/oracles/price/src/price_service.h b/oracles/price/src/price_service.h index 7a9998c..b863459 100644 --- a/oracles/price/src/price_service.h +++ b/oracles/price/src/price_service.h @@ -20,7 +20,9 @@ namespace oracle constexpr size_t PRICE_ORACLE_QUERY_SIZE = sizeof(Price::OracleQuery); // 32 + 8 + 32 + 32 constexpr size_t PRICE_ORACLE_REPLY_SIZE = sizeof(Price::OracleReply); // 8 + 8 -static std::string getTimeStampString(const QPI::DateAndTime& rQpiDateTime); +bool priceStringToRational(const std::string& priceStr, int64_t& numerator, int64_t& denominator); + +static std::string getTimeStampString(const QPI::DateAndTime& rQpiDateTime); // Price Provider Interface class PriceProvider @@ -30,7 +32,7 @@ class PriceProvider virtual ~PriceProvider() = default; /** - * Get price for currency1/currency2. + * Get current price for currency1/currency2. * * @param currency1 Base currency (e.g., "BTC") * @param currency2 Quote currency (e.g., "USD") @@ -44,6 +46,34 @@ class PriceProvider int64_t& numerator, int64_t& denominator) = 0; + /** + * Get price for currency1/currency2 at a specific UTC timestamp. + * Default implementation returns current price if timestampMs=0, + * otherwise returns RETURN_ERROR_INVALID_ARG (historical not supported). + * + * @param currency1 Base currency (e.g., "BTC") + * @param currency2 Quote currency (e.g., "USD") + * @param timestampMs UTC timestamp in milliseconds since epoch (0 = current price) + * @param numerator Output: price numerator + * @param denominator Output: price denominator + * @return RETURN_NO_ERROR on success, another value of OracleErrorFlags otherwise. + */ + virtual uint16_t getPriceAtTimestamp( + const std::string& currency1, + const std::string& currency2, + int64_t timestampMs, + int64_t& numerator, + int64_t& denominator) + { + // timestampMs = 0 means get current price + if (timestampMs == 0) + { + return getPrice(currency1, currency2, numerator, denominator); + } + // Default: historical prices not supported + return RETURN_ERROR_INVALID_ARG; + } + const std::string& getName() const { return _name; } protected: diff --git a/submodules/qubic_core b/submodules/qubic_core index 841852e..5b8da83 160000 --- a/submodules/qubic_core +++ b/submodules/qubic_core @@ -1 +1 @@ -Subproject commit 841852e0c32d9541d33c0ac1a17440323a542f27 +Subproject commit 5b8da8339728c487cb63bdd107635e0094bb55f5 diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt new file mode 100644 index 0000000..de657a2 --- /dev/null +++ b/tools/CMakeLists.txt @@ -0,0 +1,2 @@ +# Tools subdirectory +add_subdirectory(qubic_test_node) diff --git a/tools/qubic_test_node/CMakeLists.txt b/tools/qubic_test_node/CMakeLists.txt new file mode 100644 index 0000000..2389b11 --- /dev/null +++ b/tools/qubic_test_node/CMakeLists.txt @@ -0,0 +1,28 @@ +project(qubic_test_node) + +# Qubic core directory +set(QUBIC_CORE_DIR ${CMAKE_SOURCE_DIR}/submodules/qubic_core/src) + +# Create test node executable +add_executable(${PROJECT_NAME} + src/main.cpp +) + +# Include directories +target_include_directories(${PROJECT_NAME} + PRIVATE + ${CMAKE_SOURCE_DIR}/libs/om_common/include + ${QUBIC_CORE_DIR} + ${QUBIC_CORE_DIR}/.. +) + +# Compiler options +if(CMAKE_CXX_COMPILER_ID MATCHES "Clang" OR CMAKE_CXX_COMPILER_ID MATCHES "GNU") + target_compile_options(${PROJECT_NAME} PRIVATE -Wall -Wextra -mrdrnd -mavx2 -fpermissive) +elseif(CMAKE_CXX_COMPILER_ID STREQUAL "MSVC") + target_compile_options(${PROJECT_NAME} PRIVATE /W4 /arch:AVX2) +endif() + +# Install target +install(TARGETS ${PROJECT_NAME} + RUNTIME DESTINATION bin) diff --git a/tools/qubic_test_node/src/main.cpp b/tools/qubic_test_node/src/main.cpp new file mode 100644 index 0000000..40e2a86 --- /dev/null +++ b/tools/qubic_test_node/src/main.cpp @@ -0,0 +1,680 @@ +/** + * Qubic Test Node - Oracle Machine Testing Tool + * + * Simulates a Qubic node to send oracle queries to the Oracle Machine (OM) for testing. + * Uses compatible message structures from qubic_core for protocol compatibility. + * + * Usage: + * qubic_test_node --ip --port --oracle --pair [--pair ...] + * + * Example: + * qubic_test_node --ip 127.0.0.1 --port 31841 --oracle binance --pair BTC/USDT + * qubic_test_node --ip 127.0.0.1 --oracle binance --pair BTC/USDT --pair ETH/USDT --interval 5000 + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// Include only the essential network message definitions from qubic_core +#ifndef NO_UEFI +#define NO_UEFI +#endif + +#ifndef NETWORK_MESSAGES_WITHOUT_CORE_DEPENDENCIES +#define NETWORK_MESSAGES_WITHOUT_CORE_DEPENDENCIES +#endif + +#include "network_messages/common_def.h" +#include "network_messages/header.h" +#include "oracle_core/core_om_network_messages.h" + +// Oracle interface indices (from oracle_core/oracle_interfaces_def.h) +constexpr unsigned int PRICE_ORACLE_INTERFACE_INDEX = 0; +constexpr unsigned int MOCK_ORACLE_INTERFACE_INDEX = 1; + +// Error flags (from network_messages/common_def.h and om_common/qpi_adapter.h) +enum OracleErrorFlags : uint16_t +{ + RETURN_NO_ERROR = 0, + RETURN_ERROR_INVALID_ORACLE = ORACLE_FLAG_INVALID_ORACLE, + RETURN_ERROR_ORACLE_UNAVAIL = ORACLE_FLAG_ORACLE_UNAVAIL, + RETURN_ERROR_INVALID_TIME = ORACLE_FLAG_INVALID_TIME, + RETURN_ERROR_INVALID_PLACE = ORACLE_FLAG_INVALID_PLACE, + RETURN_ERROR_INVALID_ARG = ORACLE_FLAG_INVALID_ARG, +}; + +// Price oracle interface structures (compatible with oracle_interfaces/Price.h) +// Layout matches the qubic_core Price interface exactly +#pragma pack(push, 1) + +// id type - 256-bit (32 bytes), compatible with m256i/QPI::id +struct OracleId +{ + uint8_t data[32]; + + void clear() { memset(data, 0, sizeof(data)); } + + void setFromString(const std::string& str) + { + clear(); + size_t len = std::min(str.length(), sizeof(data)); + memcpy(data, str.c_str(), len); + } + + std::string toString() const + { + size_t len = 0; + while (len < sizeof(data) && data[len] != '\0') + { + len++; + } + return std::string(reinterpret_cast(data), len); + } +}; + +// DateAndTime - 64-bit timestamp (compatible with QPI::DateAndTime from contracts/qpi.h) +struct OracleDateTime +{ + uint64_t value; + + // DateAndTime bit layout from qpi.h (all times are UTC): + // Bits 46-63: year (0-65535) + // Bits 42-45: month (1-12) + // Bits 37-41: day (1-31) + // Bits 32-36: hour (0-23) + // Bits 26-31: minute (0-59) + // Bits 20-25: second (0-59) + // Bits 10-19: millisecond (0-999) + // Bits 0-9: microsecond during millisecond (0-999) + + // Set to zero (means "current price" to the oracle) + void setZero() + { + value = 0; + } + + // Set from components (UTC) + void set(uint16_t year, uint8_t month, uint8_t day, uint8_t hour, uint8_t minute, uint8_t second) + { + value = 0; + value |= ((uint64_t)year << 46); + value |= ((uint64_t)month << 42); + value |= ((uint64_t)day << 37); + value |= ((uint64_t)hour << 32); + value |= ((uint64_t)minute << 26); + value |= ((uint64_t)second << 20); + } + + // Set to current UTC time + void setNow() + { + time_t now = time(nullptr); + struct tm* tm = gmtime(&now); // gmtime returns UTC + set(tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday, tm->tm_hour, tm->tm_min, tm->tm_sec); + } + + uint16_t getYear() const { return (value >> 46) & 0xFFFF; } + uint8_t getMonth() const { return (value >> 42) & 0xF; } + uint8_t getDay() const { return (value >> 37) & 0x1F; } + uint8_t getHour() const { return (value >> 32) & 0x1F; } + uint8_t getMinute() const { return (value >> 26) & 0x3F; } + uint8_t getSecond() const { return (value >> 20) & 0x3F; } +}; + +// Price::OracleQuery - compatible with oracle_interfaces/Price.h +struct PriceOracleQuery +{ + OracleId oracle; // 32 bytes - Oracle source (e.g., "binance") + OracleDateTime timestamp; // 8 bytes - Query timestamp + OracleId currency1; // 32 bytes - First currency + OracleId currency2; // 32 bytes - Second currency +}; + +// Price::OracleReply - compatible with oracle_interfaces/Price.h +struct PriceOracleReply +{ + int64_t numerator; // Exchange rate numerator + int64_t denominator; // Exchange rate denominator +}; + +#pragma pack(pop) + +// Verify structure sizes match qubic_core +static_assert(sizeof(OracleId) == 32, "OracleId must be 32 bytes"); +static_assert(sizeof(OracleDateTime) == 8, "OracleDateTime must be 8 bytes"); +static_assert(sizeof(PriceOracleQuery) == 104, "PriceOracleQuery must be 104 bytes"); +static_assert(sizeof(PriceOracleReply) == 16, "PriceOracleReply must be 16 bytes"); +static_assert(sizeof(RequestResponseHeader) == 8, "RequestResponseHeader must be 8 bytes"); +static_assert(sizeof(OracleMachineQuery) == 16, "OracleMachineQuery must be 16 bytes"); +static_assert(sizeof(OracleMachineReply) == 16, "OracleMachineReply must be 16 bytes"); + +// Currency pair structure +struct CurrencyPair +{ + std::string currency1; + std::string currency2; +}; + +// Global configuration +struct Config +{ + std::string host = "127.0.0.1"; + uint16_t port = 31841; + std::string oracle = "binance"; + std::vector pairs; + uint32_t timeout = 10000; + uint32_t interval = 0; // 0 = single run, >0 = repeat interval in ms + uint32_t pairDelay = 500; // Delay between currency pair queries in ms (avoid rate limits) + std::string timestamp; // Empty = current, "0" = zero timestamp, "yyyymmddhhmmss" = specific UTC time + bool verbose = false; +}; + +// Parse timestamp string in format "yyyymmddhhmmss" (e.g., "20240115143022") +// Returns true on success, fills the OracleDateTime +bool parseTimestamp(const std::string& ts, OracleDateTime& dt) +{ + if (ts == "0") + { + dt.setZero(); + return true; + } + + if (ts.length() != 14) + { + return false; + } + + // Validate all characters are digits + for (char c : ts) + { + if (!isdigit(c)) return false; + } + + uint16_t year = std::stoi(ts.substr(0, 4)); + uint8_t month = std::stoi(ts.substr(4, 2)); + uint8_t day = std::stoi(ts.substr(6, 2)); + uint8_t hour = std::stoi(ts.substr(8, 2)); + uint8_t minute = std::stoi(ts.substr(10, 2)); + uint8_t second = std::stoi(ts.substr(12, 2)); + + // Basic validation + if (month < 1 || month > 12) return false; + if (day < 1 || day > 31) return false; + if (hour > 23) return false; + if (minute > 59) return false; + if (second > 59) return false; + + dt.set(year, month, day, hour, minute, second); + return true; +} + +// Parse single currency pair string (e.g., "BTC/USDT" -> {"BTC", "USDT"}) +bool parseCurrencyPair(const std::string& pairStr, CurrencyPair& pair) +{ + size_t slashPos = pairStr.find('/'); + if (slashPos == std::string::npos || slashPos == 0 || slashPos == pairStr.length() - 1) + { + return false; + } + pair.currency1 = pairStr.substr(0, slashPos); + pair.currency2 = pairStr.substr(slashPos + 1); + return true; +} + +// Parse comma-separated currency pairs (e.g., "BTC/USDT,ETH/USDT,SOL/USDT") +bool parseCurrencyPairs(const std::string& input, std::vector& pairs) +{ + size_t start = 0; + size_t end; + + while ((end = input.find(',', start)) != std::string::npos) + { + std::string pairStr = input.substr(start, end - start); + CurrencyPair pair; + if (!parseCurrencyPair(pairStr, pair)) + { + return false; + } + pairs.push_back(pair); + start = end + 1; + } + + // Parse the last (or only) pair + if (start < input.length()) + { + std::string pairStr = input.substr(start); + CurrencyPair pair; + if (!parseCurrencyPair(pairStr, pair)) + { + return false; + } + pairs.push_back(pair); + } + + return !pairs.empty(); +} + +// Normalize oracle ID for combined oracles (e.g., "gate_binance" -> "binance_gate") +// Sorts exchange names alphabetically so order doesn't matter +std::string normalizeOracleId(const std::string& oracleId) +{ + size_t underscorePos = oracleId.find('_'); + if (underscorePos == std::string::npos) + { + // Single source oracle, no normalization needed + return oracleId; + } + + std::string first = oracleId.substr(0, underscorePos); + std::string second = oracleId.substr(underscorePos + 1); + + // Sort alphabetically + if (first > second) + { + std::swap(first, second); + } + + return first + "_" + second; +} + +void printUsage(const char* progName) +{ + std::cout << "Usage: " << progName << " [options]\n" + << "\nOptions:\n" + << " -i, --ip OM host address (default: 127.0.0.1)\n" + << " -p, --port OM port (default: 31841)\n" + << " -o, --oracle Oracle ID (e.g., binance, mexc, gate, binance_mexc)\n" + << " -c, --pair Currency pair(s), comma-separated (e.g., BTC/USDT,ETH/USDT)\n" + << " -s, --time UTC timestamp: 0 for current price, or yyyymmddhhmmss\n" + << " -n, --interval Repeat interval in milliseconds (0 = single run, default: 0)\n" + << " -d, --pair-delay Delay between currency pair queries in ms (default: 500)\n" + << " -t, --timeout Query timeout in milliseconds (default: 10000)\n" + << " -v, --verbose Verbose output\n" + << " -h, --help Show this help\n" + << "\nTimestamp Format:\n" + << " (empty) Use current time for each query\n" + << " 0 Request current price (timestamp=0)\n" + << " yyyymmddhhmmss Specific UTC time (e.g., 20240115143022 = 2024-01-15 14:30:22 UTC)\n" + << "\nSupported Oracles:\n" + << " Single source: binance, mexc, gate, coingecko, mock\n" + << " Combined: binance_gate, binance_mexc, gate_mexc (order doesn't matter)\n" + << "\nExamples:\n" + << " " << progName << " --oracle binance --pair BTC/USDT\n" + << " " << progName << " --oracle binance -c BTC/USDT,ETH/USDT --time 0\n" + << " " << progName << " --oracle binance -c BTC/USDT --time 20240115143000\n" + << " " << progName << " --oracle binance --pair BTC/USDT,ETH/USDT --interval 5000\n"; +} + +std::string errorFlagsToString(uint16_t flags) +{ + if (flags == RETURN_NO_ERROR) + return "SUCCESS"; + + std::string result; + if (flags & RETURN_ERROR_INVALID_ORACLE) + result += "INVALID_ORACLE "; + if (flags & RETURN_ERROR_ORACLE_UNAVAIL) + result += "ORACLE_UNAVAIL "; + if (flags & RETURN_ERROR_INVALID_TIME) + result += "INVALID_TIME "; + if (flags & RETURN_ERROR_INVALID_PLACE) + result += "INVALID_PLACE "; + if (flags & RETURN_ERROR_INVALID_ARG) + result += "INVALID_ARG "; + return result.empty() ? "UNKNOWN" : result; +} + +bool priceReplyIsValid(const PriceOracleReply& reply) +{ + return reply.numerator > 0 && reply.denominator > 0; +} + +// Send a single query and return success/failure +bool sendQuery(const Config& config, const CurrencyPair& pair, const OracleDateTime& timestamp, + int sock, uint64_t queryId) +{ + // Calculate packet sizes + constexpr size_t HEADER_SIZE = sizeof(RequestResponseHeader); + constexpr size_t QUERY_SIZE = sizeof(OracleMachineQuery); + constexpr size_t PRICE_QUERY_SIZE = sizeof(PriceOracleQuery); + constexpr size_t TOTAL_SIZE = HEADER_SIZE + QUERY_SIZE + PRICE_QUERY_SIZE; + + // Build query packet + std::vector packet(TOTAL_SIZE, 0); + + // Header + auto* header = reinterpret_cast(packet.data()); + header->checkAndSetSize(static_cast(TOTAL_SIZE)); + header->setType(OracleMachineQuery::type()); + header->setDejavu(static_cast(time(nullptr))); + + // OracleMachineQuery + auto* query = reinterpret_cast(packet.data() + HEADER_SIZE); + query->oracleQueryId = queryId; + query->oracleInterfaceIndex = PRICE_ORACLE_INTERFACE_INDEX; + query->timeoutInMilliseconds = config.timeout; + + // Price::OracleQuery + auto* priceQuery = reinterpret_cast(packet.data() + HEADER_SIZE + QUERY_SIZE); + priceQuery->oracle.setFromString(config.oracle); + priceQuery->timestamp = timestamp; + priceQuery->currency1.setFromString(pair.currency1); + priceQuery->currency2.setFromString(pair.currency2); + + if (config.verbose) + { + std::cout << " Query ID: " << query->oracleQueryId << "\n"; + std::cout << " Timestamp: " << priceQuery->timestamp.getYear() << "-" + << std::setw(2) << std::setfill('0') << (int)priceQuery->timestamp.getMonth() << "-" + << std::setw(2) << std::setfill('0') << (int)priceQuery->timestamp.getDay() << " " + << std::setw(2) << std::setfill('0') << (int)priceQuery->timestamp.getHour() << ":" + << std::setw(2) << std::setfill('0') << (int)priceQuery->timestamp.getMinute() << ":" + << std::setw(2) << std::setfill('0') << (int)priceQuery->timestamp.getSecond() << " UTC\n"; + } + + // Send query + ssize_t sent = send(sock, packet.data(), packet.size(), 0); + if (sent != static_cast(packet.size())) + { + std::cerr << " Error: Failed to send query\n"; + return false; + } + + // Receive response header + RequestResponseHeader respHeader; + ssize_t received = recv(sock, &respHeader, sizeof(respHeader), MSG_WAITALL); + if (received != sizeof(respHeader)) + { + std::cerr << " Error: Failed to receive response header\n"; + return false; + } + + if (respHeader.type() != OracleMachineReply::type()) + { + std::cerr << " Error: Unexpected response type: " << (int)respHeader.type() << "\n"; + return false; + } + + // Receive response payload + size_t payloadSize = respHeader.size() - sizeof(RequestResponseHeader); + std::vector respPayload(payloadSize); + received = recv(sock, respPayload.data(), payloadSize, MSG_WAITALL); + if (received != static_cast(payloadSize)) + { + std::cerr << " Error: Failed to receive response payload\n"; + return false; + } + + // Parse response + auto* reply = reinterpret_cast(respPayload.data()); + auto* priceReply = reinterpret_cast( + respPayload.data() + sizeof(OracleMachineReply)); + + std::cout << " " << pair.currency1 << "/" << pair.currency2 << ": "; + + if (reply->oracleMachineErrorFlags == RETURN_NO_ERROR) + { + if (priceReply->denominator != 0) + { + double price = static_cast(priceReply->numerator) / priceReply->denominator; + std::cout << std::fixed << std::setprecision(8) << price; + if (config.verbose) + { + std::cout << " (num=" << priceReply->numerator << ", denom=" << priceReply->denominator << ")"; + } + std::cout << "\n"; + } + else + { + std::cout << "ERROR (zero denominator)\n"; + return false; + } + } + else + { + std::cout << "ERROR: " << errorFlagsToString(reply->oracleMachineErrorFlags) << "\n"; + return false; + } + + return true; +} + +int main(int argc, char* argv[]) +{ + Config config; + + static struct option longOptions[] = { + {"ip", required_argument, nullptr, 'i'}, + {"port", required_argument, nullptr, 'p'}, + {"oracle", required_argument, nullptr, 'o'}, + {"pair", required_argument, nullptr, 'c'}, + {"time", required_argument, nullptr, 's'}, + {"interval", required_argument, nullptr, 'n'}, + {"pair-delay", required_argument, nullptr, 'd'}, + {"timeout", required_argument, nullptr, 't'}, + {"verbose", no_argument, nullptr, 'v'}, + {"help", no_argument, nullptr, 'h'}, + {nullptr, 0, nullptr, 0}}; + + int opt; + while ((opt = getopt_long(argc, argv, "i:p:o:c:s:n:d:t:vh", longOptions, nullptr)) != -1) + { + switch (opt) + { + case 'i': + config.host = optarg; + break; + case 'p': + config.port = static_cast(std::stoi(optarg)); + break; + case 'o': + config.oracle = normalizeOracleId(optarg); + break; + case 'c': + { + if (!parseCurrencyPairs(optarg, config.pairs)) + { + std::cerr << "Error: Invalid currency pair format: " << optarg << "\n"; + std::cerr << "Expected format: C1/C2 or C1/C2,C3/C4,... (e.g., BTC/USDT,ETH/USDT)\n"; + return 1; + } + break; + } + case 's': + config.timestamp = optarg; + break; + case 'n': + config.interval = static_cast(std::stoi(optarg)); + break; + case 'd': + config.pairDelay = static_cast(std::stoi(optarg)); + break; + case 't': + config.timeout = static_cast(std::stoi(optarg)); + break; + case 'v': + config.verbose = true; + break; + case 'h': + printUsage(argv[0]); + return 0; + default: + printUsage(argv[0]); + return 1; + } + } + + // Default pair if none specified + if (config.pairs.empty()) + { + config.pairs.push_back({"BTC", "USDT"}); + } + + // Parse and validate timestamp if specified + OracleDateTime fixedTimestamp; + bool useFixedTimestamp = false; + if (!config.timestamp.empty()) + { + if (!parseTimestamp(config.timestamp, fixedTimestamp)) + { + std::cerr << "Error: Invalid timestamp format: " << config.timestamp << "\n"; + std::cerr << "Expected: 0 or yyyymmddhhmmss (e.g., 20240115143022)\n"; + return 1; + } + useFixedTimestamp = true; + } + + std::cout << "=== Qubic Test Node - Oracle Query ===\n"; + std::cout << "Target: " << config.host << ":" << config.port << "\n"; + std::cout << "Oracle: " << config.oracle << "\n"; + std::cout << "Pairs: "; + for (size_t i = 0; i < config.pairs.size(); ++i) + { + if (i > 0) std::cout << ", "; + std::cout << config.pairs[i].currency1 << "/" << config.pairs[i].currency2; + } + std::cout << "\n"; + if (useFixedTimestamp) + { + if (fixedTimestamp.value == 0) + { + std::cout << "Time: 0 (current price)\n"; + } + else + { + std::cout << "Time: " << fixedTimestamp.getYear() << "-" + << std::setw(2) << std::setfill('0') << (int)fixedTimestamp.getMonth() << "-" + << std::setw(2) << std::setfill('0') << (int)fixedTimestamp.getDay() << " " + << std::setw(2) << std::setfill('0') << (int)fixedTimestamp.getHour() << ":" + << std::setw(2) << std::setfill('0') << (int)fixedTimestamp.getMinute() << ":" + << std::setw(2) << std::setfill('0') << (int)fixedTimestamp.getSecond() << " UTC\n"; + } + } + if (config.interval > 0) + { + std::cout << "Interval: " << config.interval << " ms\n"; + } + std::cout << "\n"; + + if (config.verbose) + { + std::cout << "Packet structure sizes (from qubic_core):\n"; + std::cout << " RequestResponseHeader: " << sizeof(RequestResponseHeader) << " bytes\n"; + std::cout << " OracleMachineQuery: " << sizeof(OracleMachineQuery) << " bytes\n"; + std::cout << " Price::OracleQuery: " << sizeof(PriceOracleQuery) << " bytes\n\n"; + } + + // generate random queryId to prevent wrong OM cache hits in subsequent calls of this program + srand(time(NULL)); + uint64_t queryId = rand(); + int exitCode = 0; + + // Main query loop + do + { + // Create socket + int sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) + { + std::cerr << "Error: Failed to create socket\n"; + return 1; + } + + // Set socket timeout + struct timeval tv; + tv.tv_sec = config.timeout / 1000; + tv.tv_usec = (config.timeout % 1000) * 1000; + setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); + setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); + + // Connect to OM + struct sockaddr_in serverAddr; + memset(&serverAddr, 0, sizeof(serverAddr)); + serverAddr.sin_family = AF_INET; + serverAddr.sin_port = htons(config.port); + + if (inet_pton(AF_INET, config.host.c_str(), &serverAddr.sin_addr) <= 0) + { + std::cerr << "Error: Invalid address\n"; + close(sock); + return 1; + } + + if (connect(sock, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) < 0) + { + std::cerr << "Error: Connection failed\n"; + close(sock); + if (config.interval > 0) + { + std::cout << "Retrying in " << config.interval << " ms...\n"; + usleep(config.interval * 1000); + continue; + } + return 1; + } + + // Prepare timestamp for this query round + OracleDateTime queryTimestamp; + if (useFixedTimestamp) + { + queryTimestamp = fixedTimestamp; + } + else + { + queryTimestamp.setNow(); + } + + // Print timestamp for this query round + std::cout << "["; + if (queryTimestamp.value == 0) + { + std::cout << "current"; + } + else + { + std::cout << queryTimestamp.getYear() << "-" + << std::setw(2) << std::setfill('0') << (int)queryTimestamp.getMonth() << "-" + << std::setw(2) << std::setfill('0') << (int)queryTimestamp.getDay() << " " + << std::setw(2) << std::setfill('0') << (int)queryTimestamp.getHour() << ":" + << std::setw(2) << std::setfill('0') << (int)queryTimestamp.getMinute() << ":" + << std::setw(2) << std::setfill('0') << (int)queryTimestamp.getSecond() << " UTC"; + } + std::cout << "] " << config.oracle << "\n"; + + // Query each currency pair + for (size_t i = 0; i < config.pairs.size(); ++i) + { + const auto& pair = config.pairs[i]; + if (!sendQuery(config, pair, queryTimestamp, sock, queryId++)) + { + exitCode = 1; + } + + // Delay between pairs to avoid rate limiting (skip delay after last pair) + if (config.pairDelay > 0 && i + 1 < config.pairs.size()) + { + usleep(config.pairDelay * 1000); + } + } + + close(sock); + + // If interval is set, wait and repeat + if (config.interval > 0) + { + std::cout << "\n"; + usleep(config.interval * 1000); + } + + } while (config.interval > 0); + + return exitCode; +}