diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d25be00ac..ec02876298 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ All notable changes to this project are documented in this file. - Implement low-pass filter for estimated friction torques in `JointTorqueControlDevice` (https://github.com/ami-iit/bipedal-locomotion-framework/pull/892) - Add `blf-motor-current-tracking.py` application (https://github.com/ami-iit/bipedal-locomotion-framework/pull/894) - Add the possibility to initialize the base position and the feet pose in the `unicycleTrajectoryGenerator` (https://github.com/ami-iit/bipedal-locomotion-framework/pull/887) +- Add a `Periodic Thread Class` and add options to reduce `YarpLoggerDevice` latency (https://github.com/ami-iit/bipedal-locomotion-framework/pull/889) ### Changed diff --git a/devices/YarpRobotLoggerDevice/README.md b/devices/YarpRobotLoggerDevice/README.md index 3b6fe796f4..c3ff265767 100644 --- a/devices/YarpRobotLoggerDevice/README.md +++ b/devices/YarpRobotLoggerDevice/README.md @@ -138,3 +138,28 @@ robot-log-visualizer Then, you can open the mat file generated by the logger and explore the logged data as in the following video: [robot-log-visualizer.webm](https://github.com/ami-iit/robot-log-visualizer/assets/16744101/3fd5c516-da17-4efa-b83b-392b5ce1383b) + +## How to reduce latency +If you are experiencing some latency in logged data, you can try to enable different real time scheduling strategies through the `real_time_scheduling_strategy` in the [yarp-robot-logger.xml](https://github.com/ami-iit/bipedal-locomotion-framework/blob/master/devices/YarpRobotLoggerDevice/app/robots/ergoCubSN000/yarp-robot-logger.xml). For example: + +```xml + + + ergocub + 0.001 + early_wakeup_and_fifo +``` + +The available strategies are `none` (which is the default and means no strategy will be applied), `early_wakeup`, `fifo`, and `early_wakeup_and_fifo`. +The `early_wakeup` strategy makes the YarpLoggerDevice run thread wake up earlier and then busy wait until it is time to resume. +The `fifo` strategy increases the YarpLoggerDevice run thread priority and changes its scheduling policy to SCHED-FIFO. +The `early_wakeup_and_fifo` combines the `early_wakeup` and `fifo` strategies. + +Note that the `fifo` and `early_wakeup_and_fifo` strategies are only available for Linux. Moreover, you should run the `YarpLoggerDevice` with elevated privileges when deploying them. +For example: + +```console +sudo -E /yarprobotinterface --config launch-yarp-robot-logger.xml +``` + +with `` being the directory containing `yarprobotinterface`, that you can determine with the terminal command `which yarprobotinterface`. \ No newline at end of file diff --git a/devices/YarpRobotLoggerDevice/include/BipedalLocomotion/YarpRobotLoggerDevice.h b/devices/YarpRobotLoggerDevice/include/BipedalLocomotion/YarpRobotLoggerDevice.h index 63f09af1db..d762df46f6 100644 --- a/devices/YarpRobotLoggerDevice/include/BipedalLocomotion/YarpRobotLoggerDevice.h +++ b/devices/YarpRobotLoggerDevice/include/BipedalLocomotion/YarpRobotLoggerDevice.h @@ -21,13 +21,13 @@ #include #include #include -#include #include #include #include #include +#include #include #include #include @@ -37,20 +37,17 @@ namespace BipedalLocomotion class YarpRobotLoggerDevice : public yarp::dev::DeviceDriver, public yarp::dev::IMultipleWrapper, - public yarp::os::PeriodicThread + public BipedalLocomotion::System::PeriodicThread { public: - YarpRobotLoggerDevice(double period, - yarp::os::ShouldUseSystemClock useSystemClock - = yarp::os::ShouldUseSystemClock::No); - YarpRobotLoggerDevice(); + YarpRobotLoggerDevice(double period = 0.01); ~YarpRobotLoggerDevice(); virtual bool open(yarp::os::Searchable& config) final; virtual bool close() final; virtual bool attachAll(const yarp::dev::PolyDriverList& poly) final; virtual bool detachAll() final; - virtual void run() final; + virtual bool run() final; private: std::chrono::nanoseconds m_previousTimestamp; @@ -177,6 +174,15 @@ class YarpRobotLoggerDevice : public yarp::dev::DeviceDriver, std::mutex m_bufferManagerMutex; robometry::BufferManager m_bufferManager; + enum class RealTimeSchedulingStrategy + { + None, + EarlyWakeUp, + FIFO, + EarlyWakeUpAndFIFO, + }; + RealTimeSchedulingStrategy m_RealTimeSchedulingStrategy{RealTimeSchedulingStrategy::None}; + void lookForNewLogs(); void lookForExogenousSignals(); diff --git a/devices/YarpRobotLoggerDevice/src/YarpRobotLoggerDevice.cpp b/devices/YarpRobotLoggerDevice/src/YarpRobotLoggerDevice.cpp index ad60885224..f453f0957b 100644 --- a/devices/YarpRobotLoggerDevice/src/YarpRobotLoggerDevice.cpp +++ b/devices/YarpRobotLoggerDevice/src/YarpRobotLoggerDevice.cpp @@ -15,6 +15,9 @@ #include #include #include +#ifdef __linux__ +#include +#endif #include #include @@ -93,23 +96,9 @@ void YarpRobotLoggerDevice::VectorsCollectionSignal::disconnect() } } -YarpRobotLoggerDevice::YarpRobotLoggerDevice(double period, - yarp::os::ShouldUseSystemClock useSystemClock) - : yarp::os::PeriodicThread(period, useSystemClock) -{ - // Use the yarp clock in blf - BipedalLocomotion::System::ClockBuilder::setFactory( - std::make_shared()); - - // the logging message are streamed using yarp - BipedalLocomotion::TextLogging::LoggerBuilder::setFactory( - std::make_shared()); - - m_sendDataRT = false; -} - -YarpRobotLoggerDevice::YarpRobotLoggerDevice() - : yarp::os::PeriodicThread(0.01, yarp::os::ShouldUseSystemClock::No) +YarpRobotLoggerDevice::YarpRobotLoggerDevice(double period) + : BipedalLocomotion::System::PeriodicThread( + std::chrono::nanoseconds(static_cast(period * 1e9))) { // Use the yarp clock in blf BipedalLocomotion::System::ClockBuilder::setFactory( @@ -149,7 +138,79 @@ bool YarpRobotLoggerDevice::open(yarp::os::Searchable& config) double devicePeriod{0.01}; if (params->getParameter("sampling_period_in_s", devicePeriod)) { - this->setPeriod(devicePeriod); + this->setPeriod(std::chrono::nanoseconds(static_cast(devicePeriod))); + } + + std::string realTimeSchedulingStrategy{"none"}; + if (!params->getParameter("real_time_scheduling_strategy", realTimeSchedulingStrategy)) + { + log()->info("{} The 'real_time_scheduling_strategy' parameter is not found. " + "YarpLoggerDevice will run without any real time strategy.", + logPrefix); + } + if (realTimeSchedulingStrategy == "none") + { + m_RealTimeSchedulingStrategy = RealTimeSchedulingStrategy::None; + } else if (realTimeSchedulingStrategy == "early_wakeup") + { + m_RealTimeSchedulingStrategy = RealTimeSchedulingStrategy::EarlyWakeUp; + } else if (realTimeSchedulingStrategy == "fifo") + { + m_RealTimeSchedulingStrategy = RealTimeSchedulingStrategy::FIFO; + } else if (realTimeSchedulingStrategy == "early_wakeup_and_fifo") + { + m_RealTimeSchedulingStrategy = RealTimeSchedulingStrategy::EarlyWakeUpAndFIFO; + } else + { + log()->error("{} The 'real_time_scheduling_strategy' parameter is not valid. Available " + "options are 'none', 'early_wakeup', 'fifo', 'early_wakeup_and_fifo'.", + logPrefix); + return false; + } + + switch (m_RealTimeSchedulingStrategy) + { + + case RealTimeSchedulingStrategy::None: + break; + + case RealTimeSchedulingStrategy::EarlyWakeUp: + if (!this->enableEarlyWakeUp()) + { + log()->error("{} Failed to enable the early wake up.", logPrefix); + return false; + } + break; +#ifdef __linux__ + case RealTimeSchedulingStrategy::FIFO: + if (!this->setPolicy(SCHED_FIFO, 80)) + { + log()->error("{} Failed to set the policy to SCHED_FIFO.", logPrefix); + return false; + } + log()->info("{} The FIFO scheduling policy is set.", logPrefix); + break; + case RealTimeSchedulingStrategy::EarlyWakeUpAndFIFO: + if (!this->enableEarlyWakeUp()) + { + log()->error("{} Failed to enable the early wake up.", logPrefix); + return false; + } + if (!this->setPolicy(SCHED_FIFO, 80)) + { + log()->error("{} Failed to set the policy to SCHED_FIFO.", logPrefix); + return false; + } + break; +#else + case RealTimeSchedulingStrategy::FIFO: + log()->error("{} The FIFO scheduling policy is not supported on this OS.", logPrefix); + return false; + case RealTimeSchedulingStrategy::EarlyWakeUpAndFIFO: + log()->error("{} The EarlyWakeUpAndFIFO scheduling policy is not supported on this OS.", + logPrefix); + return false; +#endif } if (!params->getParameter("text_logging_subnames", m_textLoggingSubnames)) @@ -1189,22 +1250,21 @@ void YarpRobotLoggerDevice::lookForExogenousSignals() continue; } - log()->info("[YarpRobotLoggerDevice::lookForExogenousSignals] Attempt to get the " - "metadata for the vectors collection signal named: {}", + log()->info("[YarpRobotLoggerDevice::lookForExogenousSignals] Attempt to get " + "the metadata for the vectors collection signal named: {}", name); if (!signal.client.getMetadata(signal.metadata)) { - log()->warn("[YarpRobotLoggerDevice::lookForExogenousSignals] Unable to get " - "the metadata for the signal named: {}. The exogenous signal will " - "not contain the metadata.", + log()->warn("[YarpRobotLoggerDevice::lookForExogenousSignals] Unable to " + "get the metadata for the signal named: {}. The exogenous " + "signal will not contain the metadata.", name); } } } signal.connected = connectionDone; - } }; @@ -1429,7 +1489,7 @@ void YarpRobotLoggerDevice::recordVideo(const std::string& cameraName, VideoWrit } } -void YarpRobotLoggerDevice::run() +bool YarpRobotLoggerDevice::run() { auto logData = [this](const std::string& name, const auto& data, const double time) { m_bufferManager.push_back(data, time, name); @@ -1453,7 +1513,7 @@ void YarpRobotLoggerDevice::run() std::chrono::duration(m_previousTimestamp), std::chrono::duration(t), std::chrono::duration(t - m_previousTimestamp)); - return; + return true; } } @@ -1718,12 +1778,11 @@ void YarpRobotLoggerDevice::run() yarp::os::Bottle* b = m_textLoggingPort.read(false); if (b != nullptr) { - msg = BipedalLocomotion::TextLoggingEntry::deserializeMessage(*b, - std::to_string(time)); + msg = BipedalLocomotion::TextLoggingEntry::deserializeMessage(*b, std::to_string(time)); if (msg.isValid) { signalFullName = msg.portSystem + "::" + msg.portPrefix + "::" + msg.processName - + "::p" + msg.processPID; + + "::p" + msg.processPID; // matlab does not support the character - as a key of a struct findAndReplaceAll(signalFullName, "-", "_"); @@ -1736,7 +1795,7 @@ void YarpRobotLoggerDevice::run() m_bufferManager.addChannel({signalFullName, {1, 1}}); m_textLogsStoredInManager.insert(signalFullName); } - //Not using logData here because we don't want to stream the data to RT + // Not using logData here because we don't want to stream the data to RT m_bufferManager.push_back(msg, time, signalFullName); } bufferportSize = m_textLoggingPort.getPendingReads(); @@ -1753,6 +1812,8 @@ void YarpRobotLoggerDevice::run() m_previousTimestamp = t; m_firstRun = false; + + return true; } bool YarpRobotLoggerDevice::saveCallback(const std::string& fileName, diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index a45244e810..90550ac318 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -1 +1,2 @@ add_subdirectory(UnicycleTrajectoryGenerator) +add_subdirectory(PeriodicThread) diff --git a/examples/PeriodicThread/CMakeLists.txt b/examples/PeriodicThread/CMakeLists.txt new file mode 100644 index 0000000000..0fca178f5b --- /dev/null +++ b/examples/PeriodicThread/CMakeLists.txt @@ -0,0 +1,5 @@ +add_executable(PeriodicThreadExample main.cpp) + +target_link_libraries(PeriodicThreadExample +BipedalLocomotion::System +BipedalLocomotion::TextLogging) \ No newline at end of file diff --git a/examples/PeriodicThread/main.cpp b/examples/PeriodicThread/main.cpp new file mode 100644 index 0000000000..292124ed14 --- /dev/null +++ b/examples/PeriodicThread/main.cpp @@ -0,0 +1,71 @@ +#include +#include +#include + +#include +#include +#include +#include + +class Thread : public BipedalLocomotion::System::PeriodicThread +{ +public: + std::string name = "Thread"; + Thread(std::string name); + ~Thread(); + bool run() override; + + bool threadInit() override; +}; + +bool Thread::run() +{ + BipedalLocomotion::log()->info("[Thread::run] {} is running.", name); + return true; +} + +Thread::Thread(std::string name) + : name(name) + , BipedalLocomotion::System::PeriodicThread(std::chrono::milliseconds(1000)){}; + +Thread::~Thread() +{ + BipedalLocomotion::log()->info("[Thread::~Thread] {} is destroyed.", name); +}; + +bool Thread::threadInit() +{ + BipedalLocomotion::log()->info("[Thread::threadInit] {} is initialized.", name); + return true; +} + +int main() +{ + + auto barrier = BipedalLocomotion::System::Barrier::create(2); + + // Thread thread; + auto thread1 = Thread("Thread 1"); + thread1.start(barrier); + + BipedalLocomotion::clock().sleepFor(std::chrono::milliseconds(2000)); + + auto thread2 = Thread("Thread 2"); + thread2.setPeriod(std::chrono::seconds(2)); + thread2.start(barrier); + + while (thread1.isRunning() || thread2.isRunning()) + { + std::this_thread::sleep_for(std::chrono::milliseconds(4000)); + thread1.stop(); + BipedalLocomotion::log()->info("[main] Thread 1 is asked to stop."); + + if (!thread1.isRunning()) + { + thread2.stop(); + BipedalLocomotion::log()->info("[main] Thread 2 is asked to stop."); + } + } + BipedalLocomotion::log()->info("[main] About to exit the application."); + return EXIT_SUCCESS; +} diff --git a/src/System/CMakeLists.txt b/src/System/CMakeLists.txt index dd30345c6f..a71f15117a 100644 --- a/src/System/CMakeLists.txt +++ b/src/System/CMakeLists.txt @@ -19,9 +19,10 @@ if(FRAMEWORK_COMPILE_System) ${H_PREFIX}/QuitHandler.h ${H_PREFIX}/Barrier.h ${H_PREFIX}/TimeProfiler.h ${H_PREFIX}/WeightProvider.h ${H_PREFIX}/ConstantWeightProvider.h + ${H_PREFIX}/PeriodicThread.h SOURCES src/VariablesHandler.cpp src/LinearTask.cpp src/StdClock.cpp src/Clock.cpp src/QuitHandler.cpp src/Barrier.cpp - src/ConstantWeightProvider.cpp src/TimeProfiler.cpp + src/ConstantWeightProvider.cpp src/TimeProfiler.cpp src/PeriodicThread.cpp PUBLIC_LINK_LIBRARIES BipedalLocomotion::ParametersHandler Eigen3::Eigen SUBDIRECTORIES tests YarpImplementation RosImplementation ) diff --git a/src/System/include/BipedalLocomotion/System/PeriodicThread.h b/src/System/include/BipedalLocomotion/System/PeriodicThread.h new file mode 100644 index 0000000000..7538f75110 --- /dev/null +++ b/src/System/include/BipedalLocomotion/System/PeriodicThread.h @@ -0,0 +1,258 @@ +/** + * @file PeriodicThread.h + * @authors Lorenzo Moretti + * @copyright 2024 Istituto Italiano di Tecnologia (IIT). This software may be modified and + * distributed under the terms of the BSD-3-Clause license. + */ + +#ifndef BIPEDAL_LOCOMOTION_SYSTEM_PERIODIC_THREAD_H +#define BIPEDAL_LOCOMOTION_SYSTEM_PERIODIC_THREAD_H + +#include +#include +#include +#include +#include +#include + +#include + +namespace BipedalLocomotion +{ +namespace System +{ + +/** + * @brief Enum class which defines the state of the periodic thread state machine. + */ +enum class PeriodicThreadState +{ + INACTIVE, /**< The thread is not yet active. */ + STARTED, /**< The thread has been started. */ + INITIALIZED, /**< The thread has been initialized.*/ + EXECUTING, /**< The thread is executing the task. */ + IDLE, /**< The thread is idling. */ + STOPPED /**< The thread is stopped. */ +}; + +/** + * @brief The PeriodicThread class is designed to implement a periodic thread that performs a + * specific task at regular intervals. The task is defined by overriding the virtual run method. The + * class provides various methods to control the thread's lifecycle, including starting, stopping, + * suspending, and resuming the thread. It also allows configuring thread-specific parameters like + * priority and scheduling policy (on Linux systems). Additionally, it supports synchronization with + * other threads using barriers and can handle early wake-up scenarios to minimize latency. + * Finally, the class provides a mechanism to monitor the number of deadline misses, which can be + * useful for real-time applications. + */ +class PeriodicThread +{ +public: +#ifdef __linux__ + // Default constructor + + PeriodicThread(std::chrono::nanoseconds period = std::chrono::nanoseconds(100000), + int maximumNumberOfAcceptedDeadlineMiss = -1, + int priority = 0, + int policy = SCHED_OTHER, + bool earlyWakeUp = false); +#else + // Default constructor + PeriodicThread(std::chrono::nanoseconds period = std::chrono::nanoseconds(100000), + int maximumNumberOfAcceptedDeadlineMiss = -1, + bool earlyWakeUp = false); +#endif + + // Destructor + virtual ~PeriodicThread(); + + // Copy constructor + PeriodicThread(const PeriodicThread&) = delete; + + // Copy assignment operator + PeriodicThread& operator=(const PeriodicThread&) = delete; + + /** + * @brief Start the thread. + * @param barrier barrier to synchronize the thread. If nullptr, the thread will start + * immediately, without waiting for other threads to reach the barrier. Instead, if for example + * the barrier is creted as std::make_shared(2), the thread + * will wait for another thread to reach the barrier before starting. + * @return true if the thread was correctly started, false otherwise. + */ + bool start(std::shared_ptr barrier = nullptr); + + /** + * @brief Call this method to stop the thread. + */ + void stop(); + + /** + * @brief Suspend the thread. + */ + bool suspend(); + + /** + * @brief Resume the thread. + */ + bool resume(); + + /** + * @brief Check if the thread is running. This means that the thread has started and not yet + * stopped. + * @return true if the thread is running, false otherwise. + */ + bool isRunning() const; + + /** + * @brief Check if the thread is initialized. + * @return true if the thread is initialized, false otherwise. + */ + bool isInitialized() const; + + /** + * @brief Set the period of the thread. + * @param period period of the thread. + * @return true if the period was correctly set, false otherwise. + */ + bool setPeriod(std::chrono::nanoseconds period); + + /** + * @brief Get the period of the thread. + * @return period of the thread. + */ + std::chrono::nanoseconds getPeriod() const; + +#ifdef __linux__ + /** + * @brief Set the policy and priority of the thread before starting it. When starting, the + * thread will try to use this policy and priority. + * @param policy policy of the thread. + * @param priority priority of the thread. + * @return true if the policy and priority are correctly set, false otherwise. + */ + bool setPolicy(int policy, int priority = 0); + + /** + * @brief Get the policy and prority of the thread. + * @return policy of the thread. + */ + void getPolicy(int& policy, int& priority) const; + +#endif + + /** + * @brief Set the maximum number of accepted deadline miss. + * @param maximumNumberOfAcceptedDeadlineMiss maximum number of accepted deadline miss. + */ + bool setMaximumNumberOfAcceptedDeadlineMiss(int maximumNumberOfAcceptedDeadlineMiss); + + /** + * @brief Get the maximum number of accepted deadline miss. + * @return maximum number of accepted deadline miss. + */ + int getMaximumNumberOfAcceptedDeadlineMiss() const; + + /** + * @brief Get the number of deadline miss. + * @return number of deadline miss. + */ + int getNumberOfDeadlineMiss() const; + + /** + * @brief Enable the early wake up. The thread will be awaken before and busy wait until the + * actual wake up time. + * @return true if the early wake up was correctly set, false otherwise. + */ + bool enableEarlyWakeUp(); + + /** + * @brief Check if the early wake up is enabled. + * @return true if the early wake up is enabled, false otherwise. + */ + bool isEarlyWakeUpEnabled() const; + +protected: + /** + * @brief This method is called at each iteration of the thread. + * Override this method to implement the task to be performed from the thread. + * @return true if the thread has to continue, false otherwise. + */ + virtual bool run(); + + /** + * @brief This method is called at the beginning of the thread. + * @return true if the initialization was successful, false otherwise. + */ + virtual bool threadInit(); + +private: + /** + * @brief This is the function being executed by the std::thread. + */ + void threadFunction(); + + /** + * @brief Advance the thread of one step, calling the virtual run function once. + */ + void advance(); + + /** + * @brief Synchronize the thread with other threads with a barrier. + */ + void synchronize(); + + /** + * @brief Set the policy and priority of the thread. + * @return true if the policy and priority were correctly set, false otherwise. + */ + bool setPolicy(); + + std::atomic m_period = std::chrono::nanoseconds(100000); /**< Period + * of the + * thread. + */ + + std::atomic m_maximumNumberOfAcceptedDeadlineMiss = -1; /**< Maximum number of accepted + * deadline miss. + */ + + std::atomic m_deadlineMiss = 0; /**< Number of deadline miss. */ + + std::chrono::nanoseconds m_wakeUpTime = std::chrono::nanoseconds(0); /**< Wake up time of the + * thread. + */ +#ifdef __linux__ + std::atomic m_priority = 0; /**< Priority of the thread. */ + + std::atomic m_policy = SCHED_OTHER; /**< Policy of the thread. */ +#endif + + std::thread m_thread; /**< Thread object. */ + + std::atomic m_state = PeriodicThreadState::INACTIVE; /**< State of the + * thread. + */ + + std::shared_ptr m_barrier; /**< Barrier to synchronize the + * thread. + */ + const std::chrono::nanoseconds m_schedulerLatency + = std::chrono::microseconds(100); /**< Scheduler latency when waking up a thread. Indeed, it + varies depending on the OS. For now we fix it to a + constant value. Note that in real-time OS it might be + smaller. */ + std::atomic m_earlyWakeUp = false; /**< If true, the thread will be awaken before and busy + wait until the actual wake up time. */ + + std::condition_variable m_cv; /**< Condition variable to check for initialization.*/ + std::mutex m_cvMutex; /**< Mutex to protect the condition variable. */ + std::atomic m_ready = false; /**< Flag to signal that the inizialization tasks of the + thread are completed. */ + std::atomic m_initializationSuccessful = false; /**< Flag to signal the result of + initialization */ +}; +} // namespace System +} // namespace BipedalLocomotion + +#endif // BIPEDAL_LOCOMOTION_SYSTEM_PERIODIC_THREAD_H \ No newline at end of file diff --git a/src/System/src/PeriodicThread.cpp b/src/System/src/PeriodicThread.cpp new file mode 100644 index 0000000000..3507c5cc2d --- /dev/null +++ b/src/System/src/PeriodicThread.cpp @@ -0,0 +1,389 @@ +#include +#include +#include +#include +#include + +#ifdef __linux__ +#include +#include +#endif + +#include +#include +#include + +namespace BipedalLocomotion +{ +namespace System +{ + +#ifdef __linux__ + +PeriodicThread::PeriodicThread(std::chrono::nanoseconds period, + int maximumNumberOfAcceptedDeadlineMiss, + int priority, + int policy, + bool earlyWakeUp) + : m_period(period) + , m_maximumNumberOfAcceptedDeadlineMiss(maximumNumberOfAcceptedDeadlineMiss) + , m_priority(priority) + , m_policy(policy) + , m_deadlineMiss(0) + , m_wakeUpTime(std::chrono::nanoseconds(0)) + , m_earlyWakeUp(earlyWakeUp) + , m_state(PeriodicThreadState::INACTIVE){}; + +#else + +PeriodicThread::PeriodicThread(std::chrono::nanoseconds period, + int maximumNumberOfAcceptedDeadlineMiss, + bool earlyWakeUp) + : m_period(period) + , m_maximumNumberOfAcceptedDeadlineMiss(maximumNumberOfAcceptedDeadlineMiss) + , m_deadlineMiss(0) + , m_wakeUpTime(std::chrono::nanoseconds(0)) + , m_earlyWakeUp(earlyWakeUp) + , m_state(PeriodicThreadState::INACTIVE){}; + +#endif + +PeriodicThread::~PeriodicThread() +{ + // force stop of the thread + m_state.store(PeriodicThreadState::STOPPED); + + // join the thread + if (m_thread.joinable()) + { + m_thread.join(); + m_thread = std::thread(); + } +}; + +void PeriodicThread::threadFunction() +{ + constexpr auto logPrefix = "[PeriodicThread::threadFunction]"; + + m_initializationSuccessful.store(true); + + // thread initialization + if (!threadInit()) + { + log()->error("{} The thread initialization failed.", logPrefix); + m_initializationSuccessful.store(false); + } + + // set the policy + if (!setPolicy()) + { + log()->error("{} The policy and priority cannot be set.", logPrefix); + m_initializationSuccessful.store(false); + } + + // Notify that initialization is completed (successful or not) + { + // https://en.cppreference.com/w/cpp/thread/condition_variable: + // Even if the shared variable is atomic, it must be modified while owning the mutex to + // correctly publish the modification to the waiting thread. + std::lock_guard lock(m_cvMutex); + m_ready.store(true); + } + m_cv.notify_one(); + + // cannot continue if the initialization was not successful + if (!m_initializationSuccessful.load()) + { + m_state.store(PeriodicThreadState::STOPPED); + return; + } + + m_state.store(PeriodicThreadState::INITIALIZED); + + // synchronize the thread + synchronize(); + + // run loop + m_state.store(PeriodicThreadState::EXECUTING); + + while (m_state == PeriodicThreadState::EXECUTING || m_state == PeriodicThreadState::IDLE) + { + this->advance(); + } + + m_state.store(PeriodicThreadState::STOPPED); + + return; +}; + +bool PeriodicThread::setPolicy() +{ + constexpr auto logPrefix = "[PeriodicThread::setPolicy]"; + +#ifdef __linux__ + // get the current thread native handle + pthread_t nativeHandle = pthread_self(); + // get the current thread parameters + sched_param params; + params.sched_priority = m_priority.load(); + // Set the scheduling policy to SCHED_FIFO and priority + int ret = pthread_setschedparam(nativeHandle, m_policy.load(), ¶ms); + if (ret != 0) + { + log()->error("{} Failed to set scheduling policy, with error: {}", logPrefix, ret); + if (ret == EPERM) + { + log()->error("{} The calling thread does not have the appropriate privileges to set " + "the requested scheduling policy and parameters. Try to run the " + "application with 'sudo -E'.", + logPrefix); + } + return false; + } else + { + log()->debug("{} Scheduling policy set to {} with priority {}", + logPrefix, + m_policy.load(), + params.sched_priority); + return true; + } +#else + return true; +#endif +}; + +#ifdef __linux__ +bool PeriodicThread::setPolicy(int policy, int priority) +{ + if (m_state.load() != PeriodicThreadState::INACTIVE) + { + BipedalLocomotion::log()->error("[PeriodicThread::setPolicy] The thread has already " + "started. The policy and priority cannot be changed."); + return false; + } + m_policy.store(policy); + m_priority.store(priority); + return true; +}; + +void PeriodicThread::getPolicy(int& policy, int& priority) const +{ + policy = m_policy.load(); + priority = m_priority.load(); +}; +#endif + +bool PeriodicThread::setPeriod(std::chrono::nanoseconds period) +{ + if (m_state.load() != PeriodicThreadState::INACTIVE) + { + BipedalLocomotion::log()->error("[PeriodicThread::setPeriod] The thread has already " + "started. The period cannot be changed."); + return false; + } + m_period = period; + return true; +} + +std::chrono::nanoseconds PeriodicThread::getPeriod() const +{ + return m_period.load(); +} + +bool PeriodicThread::setMaximumNumberOfAcceptedDeadlineMiss(int maximumNumberOfAcceptedDeadlineMiss) +{ + if (m_state.load() != PeriodicThreadState::INACTIVE) + { + BipedalLocomotion::log()->error("[PeriodicThread::setMaximumNumberOfAcceptedDeadlineMiss] " + "The thread has already started. The maximum number of " + "accepted deadline miss cannot be changed."); + return false; + } + m_maximumNumberOfAcceptedDeadlineMiss.store(maximumNumberOfAcceptedDeadlineMiss); + return true; +} + +int PeriodicThread::getMaximumNumberOfAcceptedDeadlineMiss() const +{ + return m_maximumNumberOfAcceptedDeadlineMiss.load(); +} + +int PeriodicThread::getNumberOfDeadlineMiss() const +{ + return m_deadlineMiss.load(); +} + +bool PeriodicThread::enableEarlyWakeUp() +{ + if (m_state.load() != PeriodicThreadState::INACTIVE) + { + BipedalLocomotion::log()->error("[PeriodicThread::enableEarlyWakeUp] The thread has " + "already started. The early wake up cannot be changed."); + return false; + } + m_earlyWakeUp.store(true); + return true; +} + +bool PeriodicThread::threadInit() +{ + return true; +}; + +bool PeriodicThread::run() +{ + return false; +}; + +void PeriodicThread::synchronize() +{ + if (!(m_barrier == nullptr)) + { + BipedalLocomotion::log()->debug("[PeriodicThread::synchronize] This thread is waiting for " + "the other threads."); + m_barrier->wait(); + } +}; + +void PeriodicThread::stop() +{ + // stop the thread only if it is executing or idling + if ((m_state.load() == PeriodicThreadState::EXECUTING + || m_state.load() == PeriodicThreadState::IDLE)) + { + m_state.store(PeriodicThreadState::STOPPED); + BipedalLocomotion::log()->debug("[PeriodicThread::stop] Thread is stopped."); + } +}; + +bool PeriodicThread::suspend() +{ + if (m_state.load() == PeriodicThreadState::EXECUTING) + { + m_state.store(PeriodicThreadState::IDLE); + BipedalLocomotion::log()->debug("[PeriodicThread::suspend] Thread is suspended."); + return true; + } + return false; +}; + +bool PeriodicThread::resume() +{ + if (m_state.load() == PeriodicThreadState::IDLE) + { + m_state.store(PeriodicThreadState::EXECUTING); + BipedalLocomotion::log()->debug("[PeriodicThread::resume] Thread is resumed."); + return true; + } + return false; +}; + +bool PeriodicThread::start(std::shared_ptr barrier) +{ + + std::unique_lock lock(m_cvMutex); // lock the mutex for the condition variable + + // only an inactive thread can be started + if (m_state.load() != PeriodicThreadState::INACTIVE) + { + return false; + } else + { + m_state.store(PeriodicThreadState::STARTED); + } + + // store the barrier + m_barrier = barrier; + + // lambda wrapper for the thread function + auto threadFunctionLambda = [this]() { this->threadFunction(); }; + + // start the thread + m_thread = std::thread(threadFunctionLambda); + + // check if the thread is joinable + if (!m_thread.joinable()) + { + return false; + } + + // check if initialization was successful and thread started + m_cv.wait(lock, [this] { return m_ready.load(); }); + + return m_initializationSuccessful.load(); +}; + +bool PeriodicThread::isRunning() const +{ + return ((m_state.load() != PeriodicThreadState::INACTIVE) + && (m_state.load() != PeriodicThreadState::STOPPED)); +} + +bool PeriodicThread::isInitialized() const +{ + return (m_state.load() == PeriodicThreadState::INITIALIZED); +} + +bool PeriodicThread::isEarlyWakeUpEnabled() const +{ + return (m_earlyWakeUp.load()); +} + +void PeriodicThread::advance() +{ + // get the current time + auto now = BipedalLocomotion::clock().now(); + + // busy wait until wake up time + if (isEarlyWakeUpEnabled()) + { + while (now < m_wakeUpTime) + { + now = BipedalLocomotion::clock().now(); + } + } + + // get the next wake up time + m_wakeUpTime = now + m_period.load(); + + // run user overridden function, when not idling + if (m_state.load() != PeriodicThreadState::IDLE) + { + if (!run()) + { + // an error occurred, stop the thread + m_state.store(PeriodicThreadState::STOPPED); + return; + } + } + + // check if the deadline is missed + if (BipedalLocomotion::clock().now() > m_wakeUpTime) + { + m_deadlineMiss.fetch_add(1); // increment the number of deadline miss + if (m_maximumNumberOfAcceptedDeadlineMiss.load() > 0) + { + if (m_deadlineMiss.load() > m_maximumNumberOfAcceptedDeadlineMiss.load()) + { + // we have to close the runner + m_state.store(PeriodicThreadState::STOPPED); + return; + } + } + } + + // yield the CPU + BipedalLocomotion::clock().yield(); + + // wait until the next deadline + if (isEarlyWakeUpEnabled()) + { + BipedalLocomotion::clock().sleepUntil(m_wakeUpTime - m_schedulerLatency); + } else + { + BipedalLocomotion::clock().sleepUntil(m_wakeUpTime); + } +}; + +} // namespace System +} // namespace BipedalLocomotion \ No newline at end of file diff --git a/src/System/tests/CMakeLists.txt b/src/System/tests/CMakeLists.txt index 7cdcbdd15e..96969487e1 100644 --- a/src/System/tests/CMakeLists.txt +++ b/src/System/tests/CMakeLists.txt @@ -11,3 +11,8 @@ add_bipedal_test( NAME AdvanceableRunner SOURCES AdvanceableRunnerTest.cpp LINKS BipedalLocomotion::System BipedalLocomotion::TextLogging BipedalLocomotion::ParametersHandler) + +add_bipedal_test( + NAME PeriodicThread + SOURCES PeriodicThreadTest.cpp + LINKS BipedalLocomotion::System BipedalLocomotion::TextLogging) diff --git a/src/System/tests/PeriodicThreadTest.cpp b/src/System/tests/PeriodicThreadTest.cpp new file mode 100644 index 0000000000..52e3f9b390 --- /dev/null +++ b/src/System/tests/PeriodicThreadTest.cpp @@ -0,0 +1,213 @@ +/** + * @file PeriodicThreadTest.cpp + * @authors Lorenzo Moretti + * @copyright This software may be modified and distributed under the terms of the GNU Lesser + * General Public License v2.1 or any later version. + */ + +#include +#include +#include + +// Catch2 +#include + +#include +#include +#include + +using namespace BipedalLocomotion::System; +using namespace std::chrono_literals; + +class Thread : public PeriodicThread +{ +protected: + bool threadInit() override + { + m_counter.store(0); + return true; + } + bool run() override + { + m_counter++; + return true; + } + +private: + std::atomic m_counter{0}; + +public: + int getCounter() + { + return m_counter.load(); + } +}; + +class ThreadWithDeadlineMiss : public PeriodicThread +{ +protected: + bool threadInit() override + { + return true; + } + bool run() override + { + BipedalLocomotion::clock().sleepFor(waitTime); + return true; + } + +private: + std::chrono::nanoseconds waitTime; + +public: + void setWaitTime(std::chrono::nanoseconds time) + { + waitTime = time; + } +}; + +TEST_CASE("Test Periodic Thread", "[PeriodicThread]") +{ + BipedalLocomotion::System::ClockBuilder::setFactory( + std::make_shared()); + + auto period = 50ms; + + // create + auto thread = Thread(); + + // set the period + REQUIRE(thread.setPeriod(period)); + +#ifdef __linux__ + // set the policy + REQUIRE(thread.setPolicy(SCHED_OTHER, 0)); +#endif + + // start the thread + REQUIRE(thread.start()); + + BipedalLocomotion::clock().sleepFor(period); + + // check if the thread is running + REQUIRE(thread.isRunning()); + + // suspend the thread + REQUIRE(thread.suspend()); + BipedalLocomotion::clock().sleepFor(period); + + // check if the thread is suspended + int counter = thread.getCounter(); + BipedalLocomotion::clock().sleepFor(period); + REQUIRE(counter == thread.getCounter()); + + // resume the thread + REQUIRE(thread.resume()); + BipedalLocomotion::clock().sleepFor(period); + + // stop the thread + thread.stop(); + BipedalLocomotion::clock().sleepFor(period); + + // check if the thread is stopped + REQUIRE(!thread.isRunning()); +} + +TEST_CASE("Test Periodic Thread", "[PeriodicThreadSynchronization]") +{ + auto period = 100ms; + + auto barrier = BipedalLocomotion::System::Barrier::create(2); + + // create two threads + auto thread1 = Thread(); + thread1.setPeriod(period); + + auto thread2 = Thread(); + thread2.setPeriod(period); + + // start thread 1 + REQUIRE(thread1.start(barrier)); + BipedalLocomotion::clock().sleepFor(2 * period); + + // check that the thread 1 is waiting for thread 2 + // (i.e. threadInit has been called, but run has not been called yet) + REQUIRE(thread1.isInitialized()); + REQUIRE(!thread2.isRunning()); + + // start thread 2 + REQUIRE(thread2.start(barrier)); + BipedalLocomotion::clock().sleepFor(period); + + // check that the thread 2 is running + REQUIRE(thread2.isRunning()); + + // stop the threads + thread1.stop(); + thread2.stop(); +} + +TEST_CASE("Test Periodic Thread", "[PeriodicThreadNotAllowed]") +{ + + auto period = 50ms; + + // create + auto thread = Thread(); + + // start the thread + REQUIRE(thread.start()); + BipedalLocomotion::clock().sleepFor(period); + + // try to set the period + REQUIRE(!thread.setPeriod(period)); + +#ifdef __linux__ + // try to set the policy + REQUIRE(!thread.setPolicy(SCHED_OTHER, 0)); +#endif + + // try to set the maximum number of accepted deadline miss + REQUIRE(!thread.setMaximumNumberOfAcceptedDeadlineMiss(0)); + + // try to enable early wake up + REQUIRE(!thread.enableEarlyWakeUp()); + + // check that early wake up has not enabled + REQUIRE(!thread.isEarlyWakeUpEnabled()); + + // stop the thread + thread.stop(); + BipedalLocomotion::clock().sleepFor(period); + + // try to resume the thread + REQUIRE(!thread.resume()); +} + +TEST_CASE("Test Periodic Thread", "[PeriodicThreadDeadlineMiss]") +{ + auto period = 50ms; + + // create + auto thread = ThreadWithDeadlineMiss(); + + // set the period + REQUIRE(thread.setPeriod(period)); + + // set the wait time in the run function to trigger a deadline miss + thread.setWaitTime(2 * period); + + // set the maximum number of accepted deadline miss + REQUIRE(thread.setMaximumNumberOfAcceptedDeadlineMiss(2)); + + // start the thread + REQUIRE(thread.start()); + + BipedalLocomotion::clock().sleepFor(10 * period); + + // check if the thread is stopped + REQUIRE(!thread.isRunning()); + + // check the number of deadline miss + REQUIRE(thread.getNumberOfDeadlineMiss() == 3); +} \ No newline at end of file