Skip to content
This repository was archived by the owner on Feb 2, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,22 @@ bool LoadConfig(const std::string& path, AppConfig& out, std::string& error) {
out.kafka.enabled = kafka["enabled"].asBool();
}

if (kafka.isMember("logs-enabled")) {
if (!kafka["logs-enabled"].isBool()) {
error = "Invalid type: boolean required for key 'kafka.logs-enabled'";
return false;
}
out.kafka.logs_enabled = kafka["logs-enabled"].asBool();
}

if (kafka.isMember("txs-enabled")) {
if (!kafka["txs-enabled"].isBool()) {
error = "Invalid type: boolean required for key 'kafka.txs-enabled'";
return false;
}
out.kafka.txs_enabled = kafka["txs-enabled"].asBool();
}

if (kafka.isMember("brokers")) {
if (!kafka["brokers"].isString()) {
error = "Invalid type: string required for key 'kafka.brokers'";
Expand Down
2 changes: 2 additions & 0 deletions Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ enum class TxStorageMode {
// Kafka configuration (optional feature)
struct KafkaConfig {
bool enabled = false;
bool logs_enabled = true; // enable sending logs to Kafka (when kafka enabled)
bool txs_enabled = true; // enable sending transactions to Kafka (when kafka enabled)
std::string brokers = "localhost:9092";
std::string logs_topic = "qubic-logs";
std::string txs_topic = "qubic-txs";
Expand Down
14 changes: 10 additions & 4 deletions KafkaProducer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ KafkaProducer::~KafkaProducer() {
}

bool KafkaProducer::init(const KafkaConfig& config) {
Logger::get()->info("Kafka config: enabled={}, logs_enabled={}, txs_enabled={}, brokers={}, logs_topic={}, txs_topic={}, compression={}, batch_size={}, linger_ms={}",
config.enabled, config.logs_enabled, config.txs_enabled, config.brokers, config.logs_topic, config.txs_topic,
config.compression, config.batch_size, config.linger_ms);

if (!config.enabled) {
Logger::get()->info("Kafka producer is disabled");
return true;
Expand Down Expand Up @@ -135,8 +139,10 @@ bool KafkaProducer::init(const KafkaConfig& config) {
}

enabled_ = true;
Logger::get()->info("Kafka producer initialized: brokers={}, logs_topic={}, txs_topic={}",
config.brokers, config.logs_topic, config.txs_topic);
logsEnabled_ = config.logs_enabled;
txsEnabled_ = config.txs_enabled;
Logger::get()->info("Kafka producer initialized: brokers={}, logs_topic={}, txs_topic={}, logs_enabled={}, txs_enabled={}",
config.brokers, config.logs_topic, config.txs_topic, config.logs_enabled, config.txs_enabled);
return true;
}

Expand Down Expand Up @@ -176,7 +182,7 @@ void KafkaProducer::shutdown() {
}

void KafkaProducer::sendLog(const std::string& parsedJson, const std::string& logKey, uint64_t timestamp) {
if (!enabled_.load() || !producer_ || !logsTopic_) return;
if (!logsEnabled_.load() || !producer_ || !logsTopic_) return;
(void)timestamp; // Reserved for future use

// Use logKey (epoch:logId) as the message key for partitioning and deduplication
Expand Down Expand Up @@ -220,7 +226,7 @@ void KafkaProducer::sendLog(const std::string& parsedJson, const std::string& lo
}

void KafkaProducer::sendTransaction(const std::string& txJson, const std::string& txHash) {
if (!enabled_.load() || !producer_ || !txsTopic_) return;
if (!txsEnabled_.load() || !producer_ || !txsTopic_) return;

// Use txHash as the message key for partitioning and deduplication
int result = rd_kafka_produce(
Expand Down
8 changes: 8 additions & 0 deletions KafkaProducer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ class KafkaProducer {
// Check if the Kafka producer is enabled and initialized
bool isEnabled() const { return enabled_.load(); }

// Check if logs sending is enabled
bool isLogsEnabled() const { return logsEnabled_.load(); }

// Check if transactions sending is enabled
bool isTxsEnabled() const { return txsEnabled_.load(); }

// Poll for events (call periodically to handle callbacks)
void poll(int timeout_ms = 0);

Expand All @@ -46,6 +52,8 @@ class KafkaProducer {
rd_kafka_topic_t* logsTopic_ = nullptr;
rd_kafka_topic_t* txsTopic_ = nullptr;
std::atomic<bool> enabled_{false};
std::atomic<bool> logsEnabled_{false};
std::atomic<bool> txsEnabled_{false};
std::atomic<uint64_t> messagesSent_{0};
std::atomic<uint64_t> messagesDelivered_{0};
std::atomic<uint64_t> messagesFailed_{0};
Expand Down
11 changes: 10 additions & 1 deletion LoggingEventProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include <future>
#include "RESTAPI/LogSubscriptionManager.h"
#include "RESTAPI/QubicSubscriptionManager.h"
#ifdef KAFKA_ENABLED
#include "KafkaProducer.h"
#endif

using namespace std::chrono_literals;
extern "C" {
Expand Down Expand Up @@ -773,12 +776,18 @@ void verifyLoggingEvent(std::atomic_bool& stopFlag)
}

// Push verified logs to WebSocket subscribers (for logs/transfers subscriptions)
// and/or to Kafka if enabled
// Note: tickStream subscriptions are notified from QubicIndexer after indexing
// Wrapped in try-catch to ensure log verification continues even if notification fails
bool hasLogSubClients = LogSubscriptionManager::instance().getClientCount() > 0;
bool hasQubicSubClients = QubicSubscriptionManager::instance().getClientCount() > 0;
#ifdef KAFKA_ENABLED
bool kafkaLogsEnabled = KafkaProducer::instance().isLogsEnabled();
#else
bool kafkaLogsEnabled = false;
#endif

if (hasLogSubClients || hasQubicSubClients) {
if (hasLogSubClients || hasQubicSubClients || kafkaLogsEnabled) {
try {
if (!vle.empty()) {
// Group logs by tick for proper ordering
Expand Down
11 changes: 9 additions & 2 deletions README.md
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we separate the kafka components?
normal users will not compile with kafka.

Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,16 @@
Install dependencies and necessary tools to operate bob:
```
sudo apt-get update;
apt install vim net-tools tmux cmake git libjsoncpp-dev build-essential cmake uuid-dev libhiredis-dev zlib1g-dev unzip -y;
apt install vim net-tools tmux cmake git libjsoncpp-dev build-essential cmake uuid-dev libhiredis-dev zlib1g-dev unzip pkg-config -y;
```

### Optional: Kafka Support
To enable streaming logs and transactions to Kafka, install librdkafka:
```
apt install librdkafka-dev -y;
```
**Note:** `pkg-config` is required for cmake to detect librdkafka. Without it, Kafka support will be silently disabled.

### BUILD

On Linux, make sure `cmake` and `make` commands are installed and then run:
Expand Down Expand Up @@ -68,7 +75,7 @@ For the trusted-node field, the expected format is `NODE_IP:NODE_PORT:PASSCODE_L
All in one batch file for the lazy:
```
apt update && apt upgrade -y;
apt install vim net-tools tmux cmake git libjsoncpp-dev build-essential cmake uuid-dev libhiredis-dev zlib1g-dev unzip -y;
apt install vim net-tools tmux cmake git libjsoncpp-dev build-essential cmake uuid-dev libhiredis-dev zlib1g-dev unzip pkg-config librdkafka-dev -y;
git clone https://github.com/krypdkat/qubicbob.git;
cd qubicbob;
mkdir build;
Expand Down
1 change: 1 addition & 0 deletions RESTAPI/LogSubscriptionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ void LogSubscriptionManager::pushVerifiedLogs(uint32_t tick, uint16_t epoch, con
}
// Poll to handle delivery reports
KafkaProducer::instance().poll(0);
Logger::get()->info("Kafka: sent {} logs for tick {}", logs.size(), tick);
}
#endif

Expand Down
8 changes: 8 additions & 0 deletions bob.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,22 @@ int runBob(int argc, char *argv[])
}
startRESTServer();

// Log Kafka compile-time and runtime status
#ifdef KAFKA_ENABLED
Logger::get()->info("Kafka support: COMPILED IN (KAFKA_ENABLED=1)");
Logger::get()->info("Kafka config enabled: {}", cfg.kafka.enabled);
// Initialize Kafka producer if enabled
if (cfg.kafka.enabled) {
if (!KafkaProducer::instance().init(cfg.kafka)) {
Logger::get()->error("Failed to initialize Kafka producer");
// Non-fatal - continue without Kafka
}
} else {
Logger::get()->info("Kafka producer disabled in config");
}
#else
Logger::get()->info("Kafka support: NOT COMPILED (librdkafka not found during build)");
(void)cfg.kafka; // Silence unused warning
#endif

if (gTickStorageMode == TickStorageMode::Kvrocks)
Expand Down
2 changes: 2 additions & 0 deletions default_config_bob.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
"spam-qu-threshold": 100,
"kafka": {
"enabled": false,
"logs-enabled": true,
"txs-enabled": true,
"brokers": "localhost:9092",
"logs-topic": "qubic-logs",
"txs-topic": "qubic-txs",
Expand Down