diff --git a/ecal/benchmarks/pubsub/benchmark_pubsub.cpp b/ecal/benchmarks/pubsub/benchmark_pubsub.cpp index 7f1a6bcf50..e019ec9a6f 100644 --- a/ecal/benchmarks/pubsub/benchmark_pubsub.cpp +++ b/ecal/benchmarks/pubsub/benchmark_pubsub.cpp @@ -5,9 +5,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,196 +18,297 @@ */ #include + +// Per-publisher / per-subscriber configuration (transport layers etc.) +#include +#include + #include +#include +#include +#include #include #include -#include -#include +#include +// ------------------------------------------------------------------------ +// Benchmark parameters +// ------------------------------------------------------------------------ constexpr int registration_delay_ms = 2000; -constexpr int range_multiplier = 1 << 6; +constexpr int range_multiplier = 1 << 3; constexpr int range_start = 1; -constexpr int range_limit = 1 << 24; +constexpr int range_limit = 1 << 21; + +// 2nd benchmark argument: transport layer +enum class TransportLayer : int +{ + Shm = 0, + Udp = 1, + Tcp = 2, +}; + +inline const char* TransportLayerName(TransportLayer layer) +{ + switch (layer) + { + case TransportLayer::Shm: return "shm"; + case TransportLayer::Udp: return "udp"; + case TransportLayer::Tcp: return "tcp"; + } + return "unknown"; +} +// Generate (payload_size, transport_layer) argument tuples. +static void TransportAndSizeArgs(benchmark::internal::Benchmark* b) +{ + for (int layer = static_cast(TransportLayer::Shm); + layer <= static_cast(TransportLayer::Tcp); + ++layer) + { + for (int size = range_start; size <= range_limit; size *= range_multiplier) + { + b->Args({ size, layer }); + } + } +} + +// ------------------------------------------------------------------------ +// Payload generator (deterministic for reproducibility) +// ------------------------------------------------------------------------ -// Random byte generator -char gen() { - static std::random_device rd; - static std::mt19937 engine(rd()); - static std::uniform_int_distribution<> distr(0,255); - return static_cast(distr(engine)); +char gen() +{ + // Fixed seed -> deterministic content + static std::mt19937 engine(0x12345678u); + static std::uniform_int_distribution distr(0, 255); + return static_cast(distr(engine)); } +// ------------------------------------------------------------------------ +// Helper: Create fixed eCAL process configuration +// ------------------------------------------------------------------------ +// Uses a local eCAL::Configuration object, so the run does not depend on +// any external ecal.yaml files / environment. +inline eCAL::Configuration MakeFixedEcalConfiguration() +{ + eCAL::Configuration config; + config.registration.local.transport_type = eCAL::Registration::Local::eTransportType::shm; + // NOTE: We intentionally *do not* call config.InitFromConfig() + // so we stay with eCAL's compiled-in defaults only. + return config; +} -/* - * - * Benchmarking the eCAL send process - * -*/ -namespace Send { - // Benchmark function - void BM_eCAL_Send(benchmark::State& state) { - // Create payload to send, size depends on current argument - const size_t payload_size = state.range(0); - std::vector content_vector(payload_size); - std::generate(content_vector.begin(), content_vector.end(), gen); - const char* content_addr = content_vector.data(); - - // Initialize eCAL and create sender - eCAL::Initialize("Benchmark"); - eCAL::CPublisher publisher("benchmark_topic"); - - // Create receiver in a different thread - std::thread receiver_thread([]() { - eCAL::CSubscriber subscriber("benchmark_topic"); - while(eCAL::Ok()) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } - } ); - - // Wait for eCAL synchronization - std::this_thread::sleep_for(std::chrono::milliseconds(registration_delay_ms)); - - // This is the benchmarked section: Sending the payload - for (auto _ : state) { - publisher.Send(content_addr, payload_size); +// ------------------------------------------------------------------------ +// Helper: Create per-publisher / per-subscriber transport config +// ------------------------------------------------------------------------ + +struct PubSubConfig +{ + eCAL::Publisher::Configuration pub_cfg; + eCAL::Subscriber::Configuration sub_cfg; +}; + +inline PubSubConfig MakeTransportConfig(TransportLayer layer) +{ + PubSubConfig cfg; + + // Start with all layers disabled and selectively enable one. + cfg.pub_cfg.layer.shm.enable = false; + cfg.pub_cfg.layer.udp.enable = false; + cfg.pub_cfg.layer.tcp.enable = false; + + cfg.sub_cfg.layer.shm.enable = false; + cfg.sub_cfg.layer.udp.enable = false; + cfg.sub_cfg.layer.tcp.enable = false; + + switch (layer) + { + case TransportLayer::Shm: + cfg.pub_cfg.layer.shm.enable = true; + cfg.sub_cfg.layer.shm.enable = true; + break; + case TransportLayer::Udp: + cfg.pub_cfg.layer.udp.enable = true; + cfg.sub_cfg.layer.udp.enable = true; + break; + case TransportLayer::Tcp: + cfg.pub_cfg.layer.tcp.enable = true; + cfg.sub_cfg.layer.tcp.enable = true; + break; } - // Finalize eCAL and wait for receiver thread to finish - eCAL::Finalize(); - receiver_thread.join(); - } - // Register the benchmark function - BENCHMARK(BM_eCAL_Send)->RangeMultiplier(range_multiplier)->Range(range_start, range_limit)->UseRealTime(); + return cfg; } +// ------------------------------------------------------------------------ +// SEND-ONLY BENCHMARK +// ------------------------------------------------------------------------ +namespace Send { -/* - * - * Benchmarking the eCAL send and receive process - * -*/ -namespace Send_and_Receive { - // Define mutex and condition variable - std::mutex mtx; - std::condition_variable convar; - bool msg_received = false; - - // Define callback function to register incoming message - void callback(){ - std::lock_guard lock(mtx); - msg_received = true; - convar.notify_one(); - }; - - // Benchmark function - void BM_eCAL_Send_and_Receive(benchmark::State& state) { - // Create payload to send, size depends on current argument - const size_t payload_size = state.range(0); - std::vector content_vector(payload_size); - std::generate(content_vector.begin(), content_vector.end(), gen); - const char* content_addr = content_vector.data(); - - // Initialize eCAL, create sender - eCAL::Initialize("Benchmark"); - eCAL::CPublisher publisher("benchmark_topic"); - - // Create receiver in a different thread and register callback function - std::thread receiver_thread([](){ - eCAL::CSubscriber subscriber("benchmark_topic"); - subscriber.SetReceiveCallback(std::bind(&callback)); - while(eCAL::Ok()) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } - }); - - // Wait for eCAL synchronization - std::this_thread::sleep_for(std::chrono::milliseconds(registration_delay_ms)); - - // This is the benchmarked section: Sending the payload and waiting for the receive callback - for (auto _ : state) { - msg_received = false; - publisher.Send(content_addr, payload_size); - std::unique_lock lock(mtx); - convar.wait(lock, [] {return msg_received;}); + void BM_eCAL_Send(benchmark::State& state) + { + const size_t payload_size = static_cast(state.range(0)); + const auto layer = static_cast(state.range(1)); + + // Label row in benchmark output with transport + state.SetLabel(TransportLayerName(layer)); + + // Create payload + std::vector content_vector(payload_size); + std::generate(content_vector.begin(), content_vector.end(), gen); + const char* content_addr = content_vector.data(); + + // Fixed process configuration (no external ecal.yaml) + auto process_config = MakeFixedEcalConfiguration(); + eCAL::Initialize(process_config, "Benchmark_Send", eCAL::Init::Default); + + // Per-publisher / per-subscriber configuration for this transport + auto pubsub_cfg = MakeTransportConfig(layer); + + // Publisher + eCAL::CPublisher publisher("benchmark_topic", {}, pubsub_cfg.pub_cfg); + + // Receiver thread (dummy subscriber that just keeps the connection alive) + std::atomic stop_receiver{ false }; + std::thread receiver_thread([&]() { + eCAL::CSubscriber subscriber("benchmark_topic", {}, pubsub_cfg.sub_cfg); + + subscriber.SetReceiveCallback( + [](const eCAL::STopicId&, + const eCAL::SDataTypeInformation&, + const eCAL::SReceiveCallbackData&) { + // Intentionally empty: we only need a subscription + }); + + while (!stop_receiver.load(std::memory_order_relaxed)) + { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + }); + + // Wait for registration / matching + std::this_thread::sleep_for(std::chrono::milliseconds(registration_delay_ms)); + while (!publisher.GetSubscriberCount() > 0) + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + // Benchmarked section: Send data + for (auto _ : state) + { + publisher.Send(content_addr, payload_size); + } + + // Report bytes processed (for bytes/second metrics) + state.SetBytesProcessed(static_cast(state.iterations()) * + static_cast(payload_size)); + + // Shutdown subscriber thread & eCAL + stop_receiver.store(true, std::memory_order_relaxed); + receiver_thread.join(); + + eCAL::Finalize(); } - // Finalize eCAL and wait for receiver thread to finish - eCAL::Finalize(); - receiver_thread.join(); - } - // Register the benchmark function - BENCHMARK(BM_eCAL_Send_and_Receive)->RangeMultiplier(range_multiplier)->Range(range_start, range_limit)->UseRealTime(); -} + // Register benchmark + BENCHMARK(BM_eCAL_Send) + ->Apply(TransportAndSizeArgs) + ->UseRealTime(); +} // namespace Send -/* - * - * Benchmarking the eCAL receive latency (with manual timing) - * -*/ -namespace Receive_Latency { - // Define mutex and condition variable - std::mutex mtx; - std::condition_variable convar; - bool msg_received = false; - - // Define variables for manual timing - std::chrono::high_resolution_clock::time_point time_start; - std::chrono::high_resolution_clock::time_point time_end; - - // Define callback function to register incoming message - void callback_timed(){ - time_end = std::chrono::high_resolution_clock::now(); - - std::lock_guard lock(mtx); - msg_received = true; - convar.notify_one(); - }; - - // Benchmark function - void BM_eCAL_Receive_Latency(benchmark::State& state) { - // Create payload to send, size depends on current argument - const size_t payload_size = state.range(0); - std::vector content_vector(payload_size); - std::generate(content_vector.begin(), content_vector.end(), gen); - const char* content_addr = content_vector.data(); - - // Initialize eCAL, create sender and receiver and register callback function - eCAL::Initialize("Benchmark"); - eCAL::CPublisher publisher("benchmark_topic"); - - // Create receiver in a different thread and register callback function - std::thread receiver_thread([](){ - eCAL::CSubscriber subscriber("benchmark_topic"); - subscriber.SetReceiveCallback(std::bind(&callback_timed)); - while(eCAL::Ok()) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } - }); - - // Wait for eCAL synchronization - std::this_thread::sleep_for(std::chrono::milliseconds(registration_delay_ms)); - - // This is the benchmarked section: Sending the payload (untimed) and waiting for the receive callback - for (auto _ : state) { - msg_received = false; - publisher.Send(content_addr, payload_size); - std::unique_lock lock(mtx); - - time_start = std::chrono::high_resolution_clock::now(); - convar.wait(lock, [&]() {return msg_received;}); - - // Calculate time difference between message sent and message received - auto time_elapsed = std::chrono::duration_cast> (time_end - time_start); - state.SetIterationTime(time_elapsed.count()); +// ------------------------------------------------------------------------ +// SEND + RECEIVE BENCHMARK +// ------------------------------------------------------------------------ +namespace Send_and_Receive { + + std::mutex mtx; + std::condition_variable convar; + bool msg_received = false; + + void BM_eCAL_Send_and_Receive(benchmark::State& state) + { + const size_t payload_size = static_cast(state.range(0)); + const auto layer = static_cast(state.range(1)); + + state.SetLabel(TransportLayerName(layer)); + + std::vector content_vector(payload_size); + std::generate(content_vector.begin(), content_vector.end(), gen); + const char* content_addr = content_vector.data(); + + auto process_config = MakeFixedEcalConfiguration(); + eCAL::Initialize(process_config, "Benchmark_SendRecv", eCAL::Init::Default); + + auto pubsub_cfg = MakeTransportConfig(layer); + + eCAL::CPublisher publisher("benchmark_topic", {}, pubsub_cfg.pub_cfg); + + std::atomic stop_receiver{ false }; + std::thread receiver_thread([&stop_receiver, &pubsub_cfg]() { + eCAL::CSubscriber subscriber("benchmark_topic", {}, pubsub_cfg.sub_cfg); + + subscriber.SetReceiveCallback( + [](const eCAL::STopicId&, + const eCAL::SDataTypeInformation&, + const eCAL::SReceiveCallbackData&) { + { + std::lock_guard lock(mtx); + msg_received = true; + } + convar.notify_one(); + }); + + while (!stop_receiver.load(std::memory_order_relaxed)) + { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + }); + + std::this_thread::sleep_for(std::chrono::milliseconds(registration_delay_ms)); + while (!publisher.GetSubscriberCount() > 0) + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + auto timeout = std::chrono::seconds(2); + for (auto _ : state) + { + { + std::lock_guard lock(mtx); + msg_received = false; + } + + publisher.Send(content_addr, payload_size); + + std::unique_lock lock(mtx); + if (!convar.wait_for(lock, timeout, [] { return msg_received; })) + { + state.SkipWithError("Timeout waiting for message"); + break; + } + } + + state.SetBytesProcessed(static_cast(state.iterations()) * + static_cast(payload_size)); + + stop_receiver.store(true, std::memory_order_relaxed); + receiver_thread.join(); + + eCAL::Finalize(); } - // Finalize eCAL and wait for receiver thread to finish - eCAL::Finalize(); - receiver_thread.join(); - } - // Register the benchmark function - BENCHMARK(BM_eCAL_Receive_Latency)->RangeMultiplier(range_multiplier)->Range(range_start, range_limit)->UseManualTime(); -} + BENCHMARK(BM_eCAL_Send_and_Receive) + ->Apply(TransportAndSizeArgs) + ->UseRealTime(); +} // namespace Send_and_Receive +// ------------------------------------------------------------------------ // Benchmark execution -BENCHMARK_MAIN(); \ No newline at end of file +// ------------------------------------------------------------------------ + +BENCHMARK_MAIN();