diff --git a/devices/YarpRobotLoggerDevice/include/BipedalLocomotion/YarpRobotLoggerDevice.h b/devices/YarpRobotLoggerDevice/include/BipedalLocomotion/YarpRobotLoggerDevice.h index ce23c03fa0..18c14ce42b 100644 --- a/devices/YarpRobotLoggerDevice/include/BipedalLocomotion/YarpRobotLoggerDevice.h +++ b/devices/YarpRobotLoggerDevice/include/BipedalLocomotion/YarpRobotLoggerDevice.h @@ -7,6 +7,8 @@ #define BIPEDAL_LOCOMOTION_FRAMEWORK_YARP_ROBOT_LOGGER_DEVICE_H #include +#include +#include #include #include #include @@ -145,17 +147,36 @@ class YarpRobotLoggerDevice : public yarp::dev::DeviceDriver, std::filesystem::path framesPath; }; + struct FrameBundle + { + unsigned int frameIndex{0}; + double timestamp{0.0}; + cv::Mat rgbFrame; + cv::Mat depthFrame; + bool hasRgb{false}; + bool hasDepth{false}; + }; + std::shared_ptr rgb; std::shared_ptr depth; int depthScale{1}; std::thread videoThread; + std::thread flushThread; std::atomic recordVideoIsRunning{false}; int fps{-1}; std::atomic frameIndex{0}; std::atomic resetIndex{false}; std::atomic paused{false}; std::atomic requestPause{false}; + + std::condition_variable frameQueueCv; + std::mutex frameQueueMutex; + std::deque frameQueue; + std::size_t maxBufferedBundles{0}; + std::atomic flushThreadShouldRun{false}; + std::atomic flushPaused{false}; + std::atomic queueOverflowNotified{false}; }; std::string m_videoCodecCode{"mp4v"}; @@ -237,6 +258,7 @@ class YarpRobotLoggerDevice : public yarp::dev::DeviceDriver, bool hasSubstring(const std::string& str, const std::vector& substrings) const; void recordVideo(const std::string& cameraName, VideoWriter& writer); + void flushVideoQueue(const std::string& cameraName, VideoWriter& writer); void saveExogenousImages(const std::string& signalName, VideoWriter& writer, ExogenousSignal>& signal); diff --git a/devices/YarpRobotLoggerDevice/src/YarpRobotLoggerDevice.cpp b/devices/YarpRobotLoggerDevice/src/YarpRobotLoggerDevice.cpp index f23101cdf5..664d31a988 100644 --- a/devices/YarpRobotLoggerDevice/src/YarpRobotLoggerDevice.cpp +++ b/devices/YarpRobotLoggerDevice/src/YarpRobotLoggerDevice.cpp @@ -3,6 +3,7 @@ * distributed under the terms of the BSD-3-Clause license. */ +#include #include #include #include @@ -1458,11 +1459,29 @@ bool BipedalLocomotion::YarpRobotLoggerDevice::prepareCameraLogging() // https://stackoverflow.com/questions/46114214/lambda-implicit-capture-fails-with-variable-declared-from-structured-binding // Note if one day we will support c++20 we can use structured binding see // https://en.cppreference.com/w/cpp/language/structured_binding + constexpr std::size_t queueBufferSeconds = 3; + constexpr std::size_t minQueueDepth = 10; + constexpr std::size_t fallbackFps = 30; + for (auto iter = m_videoWriters.begin(); iter != m_videoWriters.end(); ++iter) { - // start a separate the thread for each camera - iter->second.videoThread - = std::thread([this, iter] { this->recordVideo(iter->first, iter->second); }); + auto& writer = iter->second; + const std::string cameraName = iter->first; + const std::size_t fpsValue = writer.fps > 0 ? static_cast(writer.fps) + : fallbackFps; + + writer.maxBufferedBundles + = std::max(minQueueDepth, fpsValue * queueBufferSeconds); + writer.flushThreadShouldRun = true; + writer.flushPaused = false; + writer.queueOverflowNotified = false; + + writer.flushThread = std::thread( + [this, cameraName, &writer] { this->flushVideoQueue(cameraName, writer); }); + + // start a separate thread for each camera that captures frames + writer.videoThread + = std::thread([this, cameraName, &writer] { this->recordVideo(cameraName, writer); }); } return true; @@ -1816,6 +1835,12 @@ void YarpRobotLoggerDevice::recordVideo(const std::string& cameraName, VideoWrit break; } + const double timeSeconds = std::chrono::duration(time).count(); + VideoWriter::FrameBundle bundle; + bundle.frameIndex = writer.frameIndex.load(); + bundle.timestamp = timeSeconds; + bool hasFrame = false; + // get the frame from the camera if (writer.rgb != nullptr) { @@ -1829,29 +1854,11 @@ void YarpRobotLoggerDevice::recordVideo(const std::string& cameraName, VideoWrit cameraName); } - std::lock_guard lock(writer.rgb->mutex); - - // save the frame in the video writer - if (writer.rgb->saveMode == VideoWriter::SaveMode::Video) + if (!writer.rgb->frame.empty()) { - writer.rgb->writer->write(writer.rgb->frame); - } else - { - assert(writer.rgb->saveMode == VideoWriter::SaveMode::Frame); - - unsigned int frameIndex = writer.frameIndex.load(); - const std::filesystem::path imgPath - = writer.rgb->framesPath / ("img_" + std::to_string(frameIndex) + ".png"); - - cv::imwrite(imgPath.string(), writer.rgb->frame); - - // lock the the buffered manager mutex - std::lock_guard lock(m_bufferManagerMutex); - - // TODO here we may save the frame itself - m_bufferManager.push_back(frameIndex, - std::chrono::duration(time).count(), - "camera::" + cameraName + "::rgb"); + bundle.rgbFrame = writer.rgb->frame.clone(); + bundle.hasRgb = true; + hasFrame = true; } } @@ -1868,41 +1875,41 @@ void YarpRobotLoggerDevice::recordVideo(const std::string& cameraName, VideoWrit } else { - // If a new frame arrived the we should scale it + // If a new frame arrived then we should scale it writer.depth->frame = writer.depth->frame * writer.depthScale; } - std::lock_guard lock(writer.depth->mutex); - - if (writer.depth->saveMode == VideoWriter::SaveMode::Video) + if (!writer.depth->frame.empty()) { - // we need to convert the image to 8bit this is required by the video writer - cv::Mat image8Bit; - writer.depth->frame.convertTo(image8Bit, CV_8UC1); + bundle.depthFrame = writer.depth->frame.clone(); + bundle.hasDepth = true; + hasFrame = true; + } + } - // save the frame in the video writer - writer.depth->writer->write(image8Bit); - } else + if (hasFrame) + { + std::unique_lock queueLock(writer.frameQueueMutex); + if (writer.maxBufferedBundles == 0) { - assert(writer.depth->saveMode == VideoWriter::SaveMode::Frame); - - unsigned int frameIndex = writer.frameIndex.load(); - const std::filesystem::path imgPath - = writer.depth->framesPath / ("img_" + std::to_string(frameIndex) + ".png"); - - // convert the image into 16bit grayscale image - cv::Mat image16Bit; - writer.depth->frame.convertTo(image16Bit, CV_16UC1); - cv::imwrite(imgPath.string(), image16Bit); - - // lock the the buffered manager mutex - std::lock_guard lock(m_bufferManagerMutex); - - // TODO here we may save the frame itself - m_bufferManager.push_back(frameIndex, - std::chrono::duration(time).count(), - "camera::" + cameraName + "::depth"); + writer.maxBufferedBundles = 1; } + if (writer.frameQueue.size() >= writer.maxBufferedBundles) + { + writer.frameQueue.pop_front(); + if (!writer.queueOverflowNotified.exchange(true)) + { + log()->warn("{} Dropping camera frame for {} because the flush queue reached " + "{} elements.", + logPrefix, + cameraName, + writer.maxBufferedBundles); + } + } + + writer.frameQueue.push_back(std::move(bundle)); + queueLock.unlock(); + writer.frameQueueCv.notify_one(); } // increase the index @@ -1927,6 +1934,114 @@ void YarpRobotLoggerDevice::recordVideo(const std::string& cameraName, VideoWrit } } +void YarpRobotLoggerDevice::flushVideoQueue(const std::string& cameraName, VideoWriter& writer) +{ + const std::string rgbChannel = "camera::" + cameraName + "::rgb"; + const std::string depthChannel = "camera::" + cameraName + "::depth"; + + auto queueIsOverflowing = [&writer]() -> bool { + if (writer.maxBufferedBundles == 0) + { + return false; + } + + const std::size_t threshold = std::max(1, writer.maxBufferedBundles / 2); + return writer.frameQueue.size() >= threshold; + }; + + while (true) + { + VideoWriter::FrameBundle bundle; + + { + std::unique_lock lock(writer.frameQueueMutex); + writer.frameQueueCv.wait(lock, [&writer]() { + return !writer.frameQueue.empty() || !writer.flushThreadShouldRun.load(); + }); + + if (writer.frameQueue.empty()) + { + if (!writer.flushThreadShouldRun.load()) + { + writer.flushPaused = true; + break; + } + + if (writer.requestPause) + { + writer.flushPaused = true; + } + + continue; + } + + bundle = std::move(writer.frameQueue.front()); + writer.frameQueue.pop_front(); + + if (writer.queueOverflowNotified.load() && !queueIsOverflowing()) + { + writer.queueOverflowNotified = false; + } + + writer.flushPaused = false; + } + + if (bundle.hasRgb && writer.rgb != nullptr) + { + if (writer.rgb->saveMode == VideoWriter::SaveMode::Video) + { + std::lock_guard lock(writer.rgb->mutex); + if (writer.rgb->writer != nullptr) + { + writer.rgb->writer->write(bundle.rgbFrame); + } + } else + { + const std::filesystem::path imgPath + = writer.rgb->framesPath + / ("img_" + std::to_string(bundle.frameIndex) + ".png"); + cv::imwrite(imgPath.string(), bundle.rgbFrame); + + std::lock_guard bufferLock(m_bufferManagerMutex); + m_bufferManager.push_back(bundle.frameIndex, bundle.timestamp, rgbChannel); + } + } + + if (bundle.hasDepth && writer.depth != nullptr) + { + if (writer.depth->saveMode == VideoWriter::SaveMode::Video) + { + cv::Mat image8Bit; + bundle.depthFrame.convertTo(image8Bit, CV_8UC1); + std::lock_guard lock(writer.depth->mutex); + if (writer.depth->writer != nullptr) + { + writer.depth->writer->write(image8Bit); + } + } else + { + const std::filesystem::path imgPath + = writer.depth->framesPath + / ("img_" + std::to_string(bundle.frameIndex) + ".png"); + cv::Mat image16Bit; + bundle.depthFrame.convertTo(image16Bit, CV_16UC1); + cv::imwrite(imgPath.string(), image16Bit); + + std::lock_guard bufferLock(m_bufferManagerMutex); + m_bufferManager.push_back(bundle.frameIndex, bundle.timestamp, depthChannel); + } + } + + { + std::lock_guard lock(writer.frameQueueMutex); + if (writer.requestPause && writer.frameQueue.empty()) + { + writer.flushPaused = true; + } + } + } +} + void YarpRobotLoggerDevice::saveExogenousImages( const std::string& signalName, VideoWriter& writer, @@ -1938,7 +2053,7 @@ void YarpRobotLoggerDevice::saveExogenousImages( while (writer.recordVideoIsRunning) { - //Notify the other threads that we are somehow blocked + // Notify the other threads that we are somehow blocked writer.paused = true; std::lock_guard lock(signal.mutex); @@ -1980,9 +2095,7 @@ void YarpRobotLoggerDevice::saveExogenousImages( unsigned int frameIndex = writer.frameIndex.load(); // Save the frame const std::filesystem::path imgPath - = writer.rgb->framesPath - / ("img_" + std::to_string(frameIndex) - + ".png"); + = writer.rgb->framesPath / ("img_" + std::to_string(frameIndex) + ".png"); cv::imwrite(imgPath.string(), colorImg); // lock the the buffered manager mutex @@ -2544,7 +2657,7 @@ bool YarpRobotLoggerDevice::saveCallback(const std::string& fileName, if (std::filesystem::exists(temp)) { log()->error("{} Attempted to rename {} to {}, but it already exists. " - "Please choose a different name.", + "Please choose a different name.", logPrefix, oldName, temp); @@ -2797,6 +2910,21 @@ bool YarpRobotLoggerDevice::close() } } + for (auto& [cameraName, writer] : m_videoWriters) + { + writer.flushThreadShouldRun = false; + writer.frameQueueCv.notify_all(); + } + + for (auto& [cameraName, writer] : m_videoWriters) + { + if (writer.flushThread.joinable()) + { + writer.flushThread.join(); + writer.flushThread = std::thread(); + } + } + // close all the thread associated to the exogenous image logging for (auto& [name, signal] : m_imageSignals) { @@ -2831,7 +2959,8 @@ bool YarpRobotLoggerDevice::close() void BipedalLocomotion::YarpRobotLoggerDevice::waitForAcquisitionThreadsToPause() { // First we request all acquisition threads to pause and we wait for them to be paused - log()->info("[YarpRobotLoggerDevice::waitForAcquisitionThreadsToPause] Pausing acquisition threads..."); + log()->info("[YarpRobotLoggerDevice::waitForAcquisitionThreadsToPause] Pausing acquisition " + "threads..."); for (auto& [cameraName, writer] : m_videoWriters) { writer.requestPause = true; @@ -2849,7 +2978,19 @@ void BipedalLocomotion::YarpRobotLoggerDevice::waitForAcquisitionThreadsToPause( allPaused = true; for (auto& [cameraName, writer] : m_videoWriters) { - if (writer.recordVideoIsRunning && !writer.paused) + const bool captureActive = writer.recordVideoIsRunning && !writer.paused; + + bool queueEmpty = true; + if (writer.flushThreadShouldRun) + { + std::lock_guard lock(writer.frameQueueMutex); + queueEmpty = writer.frameQueue.empty(); + } + + const bool flushActive = writer.flushThreadShouldRun + && (!writer.flushPaused || !queueEmpty); + + if (captureActive || flushActive) { allPaused = false; break; @@ -2870,7 +3011,8 @@ void BipedalLocomotion::YarpRobotLoggerDevice::waitForAcquisitionThreadsToPause( using namespace std::chrono_literals; BipedalLocomotion::clock().sleepFor(1ms); } - log()->info("[YarpRobotLoggerDevice::waitForAcquisitionThreadsToPause] All acquisition threads paused."); + log()->info("[YarpRobotLoggerDevice::waitForAcquisitionThreadsToPause] All acquisition threads " + "paused."); } void BipedalLocomotion::YarpRobotLoggerDevice::resumeAcquisitionThreads() @@ -2881,6 +3023,8 @@ void BipedalLocomotion::YarpRobotLoggerDevice::resumeAcquisitionThreads() { writer.requestPause = false; writer.paused = false; + writer.flushPaused = false; + writer.frameQueueCv.notify_all(); } } for (auto& [cameraName, writer] : m_exogenousImageWriters)