diff --git a/Config.cpp b/Config.cpp index 6655796..1004184 100644 --- a/Config.cpp +++ b/Config.cpp @@ -362,6 +362,22 @@ bool LoadConfig(const std::string& path, AppConfig& out, std::string& error) { out.kafka.enabled = kafka["enabled"].asBool(); } + if (kafka.isMember("logs-enabled")) { + if (!kafka["logs-enabled"].isBool()) { + error = "Invalid type: boolean required for key 'kafka.logs-enabled'"; + return false; + } + out.kafka.logs_enabled = kafka["logs-enabled"].asBool(); + } + + if (kafka.isMember("txs-enabled")) { + if (!kafka["txs-enabled"].isBool()) { + error = "Invalid type: boolean required for key 'kafka.txs-enabled'"; + return false; + } + out.kafka.txs_enabled = kafka["txs-enabled"].asBool(); + } + if (kafka.isMember("brokers")) { if (!kafka["brokers"].isString()) { error = "Invalid type: string required for key 'kafka.brokers'"; diff --git a/Config.h b/Config.h index 18c77a1..0546ca5 100644 --- a/Config.h +++ b/Config.h @@ -20,6 +20,8 @@ enum class TxStorageMode { // Kafka configuration (optional feature) struct KafkaConfig { bool enabled = false; + bool logs_enabled = true; // enable sending logs to Kafka (when kafka enabled) + bool txs_enabled = true; // enable sending transactions to Kafka (when kafka enabled) std::string brokers = "localhost:9092"; std::string logs_topic = "qubic-logs"; std::string txs_topic = "qubic-txs"; diff --git a/KafkaProducer.cpp b/KafkaProducer.cpp index a9fabac..045f9b5 100644 --- a/KafkaProducer.cpp +++ b/KafkaProducer.cpp @@ -58,6 +58,10 @@ KafkaProducer::~KafkaProducer() { } bool KafkaProducer::init(const KafkaConfig& config) { + Logger::get()->info("Kafka config: enabled={}, logs_enabled={}, txs_enabled={}, brokers={}, logs_topic={}, txs_topic={}, compression={}, batch_size={}, linger_ms={}", + config.enabled, config.logs_enabled, config.txs_enabled, config.brokers, config.logs_topic, config.txs_topic, + config.compression, config.batch_size, config.linger_ms); + if (!config.enabled) { Logger::get()->info("Kafka producer is disabled"); return true; @@ -135,8 +139,10 @@ bool KafkaProducer::init(const KafkaConfig& config) { } enabled_ = true; - Logger::get()->info("Kafka producer initialized: brokers={}, logs_topic={}, txs_topic={}", - config.brokers, config.logs_topic, config.txs_topic); + logsEnabled_ = config.logs_enabled; + txsEnabled_ = config.txs_enabled; + Logger::get()->info("Kafka producer initialized: brokers={}, logs_topic={}, txs_topic={}, logs_enabled={}, txs_enabled={}", + config.brokers, config.logs_topic, config.txs_topic, config.logs_enabled, config.txs_enabled); return true; } @@ -176,7 +182,7 @@ void KafkaProducer::shutdown() { } void KafkaProducer::sendLog(const std::string& parsedJson, const std::string& logKey, uint64_t timestamp) { - if (!enabled_.load() || !producer_ || !logsTopic_) return; + if (!logsEnabled_.load() || !producer_ || !logsTopic_) return; (void)timestamp; // Reserved for future use // Use logKey (epoch:logId) as the message key for partitioning and deduplication @@ -220,7 +226,7 @@ void KafkaProducer::sendLog(const std::string& parsedJson, const std::string& lo } void KafkaProducer::sendTransaction(const std::string& txJson, const std::string& txHash) { - if (!enabled_.load() || !producer_ || !txsTopic_) return; + if (!txsEnabled_.load() || !producer_ || !txsTopic_) return; // Use txHash as the message key for partitioning and deduplication int result = rd_kafka_produce( diff --git a/KafkaProducer.h b/KafkaProducer.h index c54bef3..ef571b0 100644 --- a/KafkaProducer.h +++ b/KafkaProducer.h @@ -33,6 +33,12 @@ class KafkaProducer { // Check if the Kafka producer is enabled and initialized bool isEnabled() const { return enabled_.load(); } + // Check if logs sending is enabled + bool isLogsEnabled() const { return logsEnabled_.load(); } + + // Check if transactions sending is enabled + bool isTxsEnabled() const { return txsEnabled_.load(); } + // Poll for events (call periodically to handle callbacks) void poll(int timeout_ms = 0); @@ -46,6 +52,8 @@ class KafkaProducer { rd_kafka_topic_t* logsTopic_ = nullptr; rd_kafka_topic_t* txsTopic_ = nullptr; std::atomic enabled_{false}; + std::atomic logsEnabled_{false}; + std::atomic txsEnabled_{false}; std::atomic messagesSent_{0}; std::atomic messagesDelivered_{0}; std::atomic messagesFailed_{0}; diff --git a/LoggingEventProcessor.cpp b/LoggingEventProcessor.cpp index 51703ed..94757a6 100644 --- a/LoggingEventProcessor.cpp +++ b/LoggingEventProcessor.cpp @@ -21,6 +21,9 @@ #include #include "RESTAPI/LogSubscriptionManager.h" #include "RESTAPI/QubicSubscriptionManager.h" +#ifdef KAFKA_ENABLED +#include "KafkaProducer.h" +#endif using namespace std::chrono_literals; extern "C" { @@ -773,12 +776,18 @@ void verifyLoggingEvent(std::atomic_bool& stopFlag) } // Push verified logs to WebSocket subscribers (for logs/transfers subscriptions) + // and/or to Kafka if enabled // Note: tickStream subscriptions are notified from QubicIndexer after indexing // Wrapped in try-catch to ensure log verification continues even if notification fails bool hasLogSubClients = LogSubscriptionManager::instance().getClientCount() > 0; bool hasQubicSubClients = QubicSubscriptionManager::instance().getClientCount() > 0; +#ifdef KAFKA_ENABLED + bool kafkaLogsEnabled = KafkaProducer::instance().isLogsEnabled(); +#else + bool kafkaLogsEnabled = false; +#endif - if (hasLogSubClients || hasQubicSubClients) { + if (hasLogSubClients || hasQubicSubClients || kafkaLogsEnabled) { try { if (!vle.empty()) { // Group logs by tick for proper ordering diff --git a/README.md b/README.md index b179086..515279d 100644 --- a/README.md +++ b/README.md @@ -9,9 +9,16 @@ Install dependencies and necessary tools to operate bob: ``` sudo apt-get update; -apt install vim net-tools tmux cmake git libjsoncpp-dev build-essential cmake uuid-dev libhiredis-dev zlib1g-dev unzip -y; +apt install vim net-tools tmux cmake git libjsoncpp-dev build-essential cmake uuid-dev libhiredis-dev zlib1g-dev unzip pkg-config -y; ``` +### Optional: Kafka Support +To enable streaming logs and transactions to Kafka, install librdkafka: +``` +apt install librdkafka-dev -y; +``` +**Note:** `pkg-config` is required for cmake to detect librdkafka. Without it, Kafka support will be silently disabled. + ### BUILD On Linux, make sure `cmake` and `make` commands are installed and then run: @@ -68,7 +75,7 @@ For the trusted-node field, the expected format is `NODE_IP:NODE_PORT:PASSCODE_L All in one batch file for the lazy: ``` apt update && apt upgrade -y; -apt install vim net-tools tmux cmake git libjsoncpp-dev build-essential cmake uuid-dev libhiredis-dev zlib1g-dev unzip -y; +apt install vim net-tools tmux cmake git libjsoncpp-dev build-essential cmake uuid-dev libhiredis-dev zlib1g-dev unzip pkg-config librdkafka-dev -y; git clone https://github.com/krypdkat/qubicbob.git; cd qubicbob; mkdir build; diff --git a/RESTAPI/LogSubscriptionManager.cpp b/RESTAPI/LogSubscriptionManager.cpp index 1a3c1bd..f2c36f5 100644 --- a/RESTAPI/LogSubscriptionManager.cpp +++ b/RESTAPI/LogSubscriptionManager.cpp @@ -268,6 +268,7 @@ void LogSubscriptionManager::pushVerifiedLogs(uint32_t tick, uint16_t epoch, con } // Poll to handle delivery reports KafkaProducer::instance().poll(0); + Logger::get()->info("Kafka: sent {} logs for tick {}", logs.size(), tick); } #endif diff --git a/bob.cpp b/bob.cpp index 51fa3e2..da0e101 100644 --- a/bob.cpp +++ b/bob.cpp @@ -132,14 +132,22 @@ int runBob(int argc, char *argv[]) } startRESTServer(); + // Log Kafka compile-time and runtime status #ifdef KAFKA_ENABLED + Logger::get()->info("Kafka support: COMPILED IN (KAFKA_ENABLED=1)"); + Logger::get()->info("Kafka config enabled: {}", cfg.kafka.enabled); // Initialize Kafka producer if enabled if (cfg.kafka.enabled) { if (!KafkaProducer::instance().init(cfg.kafka)) { Logger::get()->error("Failed to initialize Kafka producer"); // Non-fatal - continue without Kafka } + } else { + Logger::get()->info("Kafka producer disabled in config"); } +#else + Logger::get()->info("Kafka support: NOT COMPILED (librdkafka not found during build)"); + (void)cfg.kafka; // Silence unused warning #endif if (gTickStorageMode == TickStorageMode::Kvrocks) diff --git a/default_config_bob.json b/default_config_bob.json index 49c592b..a69fd23 100644 --- a/default_config_bob.json +++ b/default_config_bob.json @@ -15,6 +15,8 @@ "spam-qu-threshold": 100, "kafka": { "enabled": false, + "logs-enabled": true, + "txs-enabled": true, "brokers": "localhost:9092", "logs-topic": "qubic-logs", "txs-topic": "qubic-txs",