Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin)
add_subdirectory(libs/om_common)
add_subdirectory(libs/om_network)
add_subdirectory(node)
add_subdirectory(oracles)
add_subdirectory(oracles)
add_subdirectory(tools)
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ This is a middleware system that bridges Qubic nodes with external oracle servic
┌────────┴────────┐
▼ ▼
┌─────────┐ ┌──────────┐
│ Price │ │ Weather
│ Price │ │ Mock
│ Service │ │ Service │
│ :31842 │ │ :31843 │
└────┬────┘ └──────────┘
Expand Down Expand Up @@ -146,7 +146,7 @@ Offset | Size | Field | Type | Description
Offset | Size | Field | Type | Description
-------|------|-----------------------|--------|---------------------------
0 | 8 | oracleQueryId | uint64 | Query identifier for reply correlation
8 | 4 | oracleInterfaceIndex | uint32 | Interface type (0=Price, 1=Weather)
8 | 4 | oracleInterfaceIndex | uint32 | Interface type (0=Price, 1=Mock)
12 | 4 | timeoutInMilliseconds | uint32 | Query timeout
```

Expand Down Expand Up @@ -271,8 +271,8 @@ cd docker
| `QUBIC_NODES` | 10.29.1.22,192.168.1.2 | Comma-separated whitelist of Qubic node IPs |
| `PRICE_SERVICE_HOST` | 0.0.0.0 | Price service host |
| `PRICE_SERVICE_PORT` | 9001 | Price service port |
| `WEATHER_SERVICE_HOST` | 0.0.0.0 | Weather service host |
| `WEATHER_SERVICE_PORT` | 9002 | Weather service port |
| `MOCK_SERVICE_HOST` | 0.0.0.0 | Mock service host |
| `MOCK_SERVICE_PORT` | 9002 | Mock service port |

## .env File Setup

Expand All @@ -292,8 +292,8 @@ QUBIC_NODES=10.29.1.22,127.0.0.1
PRICE_SERVICE_HOST=127.0.0.1
PRICE_SERVICE_PORT=31842

WEATHER_SERVICE_HOST=127.0.0.1
WEATHER_SERVICE_PORT=31843
MOCK_SERVICE_HOST=127.0.0.1
MOCK_SERVICE_PORT=31843
```

## Docker Compose Environment
Expand Down
13 changes: 13 additions & 0 deletions example_env
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,19 @@ export PRICE_SERVICE_PORT=31842
export MOCK_SERVICE_HOST=192.168.1.5
export MOCK_SERVICE_PORT=31843

# -----------------------------------------------------------------------------
# Exchange API Keys (optional - for higher rate limits)
# If not set, free tier with rate limiting will be used.

# Binance API Key
# export BINANCE_API_KEY=your_binance_api_key

# MEXC API Key
# export MEXC_API_KEY=your_mexc_api_key

# Gate.io API Key
# export GATE_API_KEY=your_gate_api_key

# -----------------------------------------------------------------------------
# Docker Settings (only used when running with docker compose)

Expand Down
5 changes: 5 additions & 0 deletions libs/om_common/include/om_common/qpi_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,8 @@ QPI::id QPI::AssetIssuanceIterator::issuer() const { return QPI::id::zero(); }
void QPI::AssetOwnershipIterator::begin(const QPI::Asset&, const QPI::AssetOwnershipSelect&) {}
void QPI::AssetPossessionIterator::begin(const QPI::Asset&, const QPI::AssetOwnershipSelect&, const QPI::AssetPossessionSelect&) {}
template <typename T1, typename T2> void QPI::copyMemory(T1&, const T2&) {}
void* __acquireScratchpad(unsigned long long size, bool initZero) { return nullptr; }
void __releaseScratchpad(void* ptr) {}
#ifndef NDEBUG
static void addDebugMessageAssert(const char* message, const char* file, const unsigned int lineNumber) {}
#endif
6 changes: 5 additions & 1 deletion libs/om_network/include/om_network/tcp_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
#include <sys/socket.h>
#endif

#ifdef TRY_AGAIN
#undef TRY_AGAIN
#endif

namespace oracle
{
class Session;
Expand Down Expand Up @@ -52,7 +56,7 @@ class TcpServer

private:
void acceptLoop();
void clientThread(int client_fd, const std::string& client_ip);
void clientThread(int client_fd, std::string client_ip);
void removeClientFD(int client_fd);
void cleanupFinishedThreads();

Expand Down
78 changes: 54 additions & 24 deletions libs/om_network/src/session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <unistd.h>
#endif

#include <cerrno>
#include <cstring>
#include <iostream>

Expand Down Expand Up @@ -147,42 +148,71 @@ bool Session::sendData(const uint8_t* data, int size)
return true;
}

int Session::receiveExact(uint8_t* buffer, int sz)
// Single recv call. Returns: >0 bytes received, 0 timeout, -1 error/close.
int Session::receive(uint8_t* buffer, int sz)
{
if (!_active || _socketFD < 0)
return 0;
return -1;

int total_received = 0;
while (sz > 0)
int received = recv(_socketFD, (char*)buffer, sz, 0);
if (received > 0)
{
_bytesReceived += received;
return received;
}
else if (received == 0)
{
// Graceful close by peer
OM_LOG_DEBUG() << "receive: graceful close by peer (IP: " << _remoteIP << ")";
_active.store(false);
return -1;
}
else // received < 0
{
int received = recv(_socketFD, (char*)buffer + total_received, sz, 0);
if (received <= 0)
#ifdef _MSC_VER
int err = WSAGetLastError();
if (err == WSAETIMEDOUT || err == WSAEWOULDBLOCK)
#else
if (errno == EAGAIN || errno == EWOULDBLOCK)
#endif
{
// 0 = Graceful close, -1 = Error (or timeout)
_active.store(false);
break;
// Socket timeout - connection still alive
OM_LOG_DEBUG() << "receive: timeout, no data (IP: " << _remoteIP << ")";
return 0;
}
total_received += received;
_bytesReceived += received;
sz -= received;
// Real error
#ifdef _MSC_VER
OM_LOG_ERROR() << "receive: recv error=" << err << " (IP: " << _remoteIP << ")";
#else
OM_LOG_ERROR() << "receive: recv errno=" << errno << " (IP: " << _remoteIP << ")";
#endif
_active.store(false);
return -1;
}
return total_received;
}

int Session::receive(uint8_t* buffer, int sz)
// Loop until exactly sz bytes received. Returns: sz on success, 0 timeout, -1 error/close.
int Session::receiveExact(uint8_t* buffer, int sz)
{
if (!_active || _socketFD < 0)
return 0;

int received = recv(_socketFD, (char*)buffer, sz, 0);
if (received <= 0)
int total_received = 0;
while (total_received < sz)
{
_active.store(false);
return 0;
int received = receive(buffer + total_received, sz - total_received);
if (received > 0)
{
total_received += received;
}
else if (received == 0)
{
// Timeout - return what we have so far
return total_received;
}
else // -1: error or close
{
return (total_received > 0) ? total_received : -1;
}
}

_bytesReceived += received;
return received;
return total_received;
}

void Session::forceShutdown()
Expand Down
17 changes: 9 additions & 8 deletions libs/om_network/src/tcp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,18 +140,19 @@ void TcpServer::cleanupFinishedThreads()
std::vector<std::thread> threadsToJoin;
{
std::lock_guard<std::mutex> lock(_threadsMutex);
auto it = _clientThreads.begin();
while (it != _clientThreads.end())
auto clientIt = _clientThreads.begin();
while (clientIt != _clientThreads.end())
{
if (_finishedThreadIds.count(it->get_id()) > 0)
auto finishedIt = _finishedThreadIds.find(clientIt->get_id());
if (finishedIt != _finishedThreadIds.end())
{
threadsToJoin.push_back(std::move(*it));
_finishedThreadIds.erase(it->get_id());
it = _clientThreads.erase(it);
_finishedThreadIds.erase(finishedIt);
threadsToJoin.push_back(std::move(*clientIt));
clientIt = _clientThreads.erase(clientIt);
}
else
{
++it;
++clientIt;
}
}
}
Expand Down Expand Up @@ -320,7 +321,7 @@ void TcpServer::acceptLoop()
}
}

void TcpServer::clientThread(int clientFd, const std::string& clientIP)
void TcpServer::clientThread(int clientFd, std::string clientIP)
{
auto startTime = std::chrono::steady_clock::now();

Expand Down
104 changes: 62 additions & 42 deletions node/src/interface_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,68 +336,88 @@ void InterfaceClient::workerThread()
OM_LOG_DEBUG() << "InterfaceClient[" << _interfaceIndex << "] Worker thread exiting";
}

bool InterfaceClient::processQueryRequest(const QueryRequest& request, InterfaceQueryResult& result)
int InterfaceClient::sendAndReceive(const QueryRequest& request, InterfaceQueryResult& result)
{
auto startTime = std::chrono::steady_clock::now();
std::lock_guard<std::mutex> lock(_connectionMutex);

_totalQueries.fetch_add(1);
if (!_session || !_session->isActive())
{
return 0; // retryable: connection was lost, reconnect and try again
}

// Ensure connected
if (!connect())
// Send query data
if (!_session->sendData(request.queryData.data(), request.queryData.size()))
{
OM_LOG_ERROR() << "InterfaceClient[" << _interfaceIndex << "] Request #"
<< request.requestID << " failed: cannot connect";
return false;
_session->close();
return 0; // retryable: likely stale connection
}

// Send query and receive reply
// Receive reply header
RequestResponseHeader headerBuffer;
int headerReceived =
_session->receiveExact((uint8_t*)&headerBuffer, sizeof(RequestResponseHeader));
if (headerReceived != sizeof(RequestResponseHeader))
{
std::lock_guard<std::mutex> lock(_connectionMutex);
OM_LOG_ERROR() << "InterfaceClient[" << _interfaceIndex << "] Request #"
<< request.requestID << " failed: invalid reply header";
return -1;
}

if (!_session || !_session->isActive())
{
OM_LOG_ERROR() << "InterfaceClient[" << _interfaceIndex << "] Request #"
<< request.requestID << " failed: session not active";
return false;
}
// Receive reply payload
result.replyData.resize(headerBuffer.size());
int received = _session->receive(
result.replyData.data() + sizeof(RequestResponseHeader), headerBuffer.getPayloadSize());

// Send query data
if (!_session->sendData(request.queryData.data(), request.queryData.size()))
if (received != (int)headerBuffer.getPayloadSize())
{
if (_session->isActive())
{
OM_LOG_ERROR() << "InterfaceClient[" << _interfaceIndex << "] Request #"
<< request.requestID << " failed: cannot send";
_session->close();
return false;
<< request.requestID << " failed: invalid reply size " << received;
}
_session->close();
return -1;
}

// Receive reply
// Header first
RequestResponseHeader headerBuffer;
int headerReceived =
_session->receiveExact((uint8_t*)&headerBuffer, sizeof(RequestResponseHeader));
if (headerReceived != sizeof(RequestResponseHeader))
result.valid = true;
return 1; // success
}

bool InterfaceClient::processQueryRequest(const QueryRequest& request, InterfaceQueryResult& result)
{
static constexpr int MAX_SEND_RETRIES = 8;

auto startTime = std::chrono::steady_clock::now();

_totalQueries.fetch_add(1);

for (int attempt = 0; attempt <= MAX_SEND_RETRIES; ++attempt)
{
// Ensure connected (reconnects if previous attempt closed the session)
if (!connect())
{
OM_LOG_ERROR() << "InterfaceClient[" << _interfaceIndex << "] Request #"
<< request.requestID << " failed: invalid reply header";
<< request.requestID << " failed: cannot connect";
return false;
}

result.replyData.resize(headerBuffer.size());
int received = _session->receive(
result.replyData.data() + sizeof(RequestResponseHeader), headerBuffer.getPayloadSize());
int rc = sendAndReceive(request, result);
if (rc == 1)
break; // success
if (rc < 0)
return false; // non-retryable error

if (received != (int)headerBuffer.getPayloadSize())
{
if (_session->isActive())
{
OM_LOG_ERROR() << "InterfaceClient[" << _interfaceIndex << "] Request #"
<< request.requestID << " failed: invalid reply size " << received;
}
_session->close();
return false;
}
// rc == 0: send failed (stale connection), retry
OM_LOG_WARNING() << "InterfaceClient[" << _interfaceIndex << "] Request #"
<< request.requestID << " send failed, reconnecting... (attempt "
<< (attempt + 1) << "/" << MAX_SEND_RETRIES << ")";
}

result.valid = true;
if (!result.valid)
{
OM_LOG_ERROR() << "InterfaceClient[" << _interfaceIndex << "] Request #"
<< request.requestID << " failed after " << MAX_SEND_RETRIES << " retries";
return false;
}

auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
Expand Down
8 changes: 6 additions & 2 deletions node/src/interface_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class InterfaceClient
/**
* Constructor
*
* @param interfaceIndex Interface type (0 = Price, 1 = Weather, etc.)
* @param interfaceIndex Interface type (0 = Price, 1 = Mock, etc.)
* @param host Host of the aggregator service
* @param port Port of the aggregator service
*/
Expand Down Expand Up @@ -131,9 +131,13 @@ class InterfaceClient
// Worker thread function
void workerThread();

// Process a single query request
// Process a single query request (with retry on stale connection)
bool processQueryRequest(const QueryRequest& request, InterfaceQueryResult& result);

// Single attempt to send query and receive reply. Returns:
// 1 = success, 0 = send failed (retryable), -1 = non-retryable error
int sendAndReceive(const QueryRequest& request, InterfaceQueryResult& result);

// Interface info
uint32_t _interfaceIndex;
std::string _host;
Expand Down
Loading