diff --git a/src/cpp/fastdds/publisher/DataWriterImpl.cpp b/src/cpp/fastdds/publisher/DataWriterImpl.cpp index 1a83f0e7eed..0ffdb0f0f21 100644 --- a/src/cpp/fastdds/publisher/DataWriterImpl.cpp +++ b/src/cpp/fastdds/publisher/DataWriterImpl.cpp @@ -59,7 +59,7 @@ #ifdef FASTDDS_STATISTICS #include #include -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS using namespace eprosima::fastdds; using namespace eprosima::fastdds::rtps; @@ -438,12 +438,7 @@ ReturnCode_t DataWriterImpl::enable() // In case it has been loaded from the persistence DB, rebuild instances on history history_->rebuild_instances(); - deadline_timer_ = new TimedEvent(publisher_->rtps_participant()->get_resource_event(), - [&]() -> bool - { - return deadline_missed(); - }, - qos_.deadline().period.to_ns() * 1e-6); + configure_deadline_timer_(); lifespan_timer_ = new TimedEvent(publisher_->rtps_participant()->get_resource_event(), [&]() -> bool @@ -686,8 +681,8 @@ ReturnCode_t DataWriterImpl::check_write_preconditions( type_.get()->compute_key(data, instance_handle, is_key_protected); } - //Check if the Handle is different from the special value HANDLE_NIL and - //does not correspond with the instance referred by the data + // Check if the Handle is different from the special value HANDLE_NIL and + // does not correspond with the instance referred by the data if (handle.isDefined() && handle != instance_handle) { return RETCODE_PRECONDITION_NOT_MET; @@ -1039,6 +1034,7 @@ ReturnCode_t DataWriterImpl::perform_create_new_change( } } + // create_change seeds the next per-instance deadline and reschedules the timer for the next sample CacheChange_t* ch = history_->create_change(change_kind, handle); if (ch != nullptr) { @@ -1071,7 +1067,8 @@ ReturnCode_t DataWriterImpl::perform_create_new_change( return RETCODE_TIMEOUT; } - if (qos_.deadline().period != dds::c_TimeInfinite) + if (qos_.deadline().period.to_ns() > 0 && qos_.deadline().period != dds::c_TimeInfinite && + deadline_missed_status_.total_count < std::numeric_limits::max()) { if (!history_->set_next_deadline( handle, @@ -1182,7 +1179,7 @@ void DataWriterImpl::publisher_qos_updated() { if (writer_ != nullptr) { - //NOTIFY THE BUILTIN PROTOCOLS THAT THE WRITER HAS CHANGED + // NOTIFY THE BUILTIN PROTOCOLS THAT THE WRITER HAS CHANGED WriterQos wqos = qos_.get_writerqos(get_publisher()->get_qos(), topic_->get_qos()); publisher_->rtps_participant()->update_writer(writer_, wqos); } @@ -1217,6 +1214,9 @@ ReturnCode_t DataWriterImpl::set_qos( return RETCODE_IMMUTABLE_POLICY; } + // Take a snapshot of the current QoS before mutating it + const DataWriterQos old_qos = qos_; + set_qos(qos_, qos_to_set, !enabled); if (enabled) @@ -1232,32 +1232,29 @@ ReturnCode_t DataWriterImpl::set_qos( writer_->update_attributes(w_att); } - //Notify the participant that a Writer has changed its QOS + // Notify the participant that a Writer has changed its QOS WriterQos wqos = qos_.get_writerqos(get_publisher()->get_qos(), topic_->get_qos()); publisher_->rtps_participant()->update_writer(writer_, wqos); - // Deadline - if (qos_.deadline().period != dds::c_TimeInfinite) + // If the deadline period actually changed, (re)configure the timer. + if (old_qos.deadline().period != qos_.deadline().period) { - deadline_duration_us_ = - duration>(qos_.deadline().period.to_ns() * 1e-3); - deadline_timer_->update_interval_millisec(qos_.deadline().period.to_ns() * 1e-6); - } - else - { - deadline_timer_->cancel_timer(); + configure_deadline_timer_(); } // Lifespan - if (qos_.lifespan().duration != dds::c_TimeInfinite) + if (old_qos.lifespan().duration != qos_.lifespan().duration) { - lifespan_duration_us_ = - duration>(qos_.lifespan().duration.to_ns() * 1e-3); - lifespan_timer_->update_interval_millisec(qos_.lifespan().duration.to_ns() * 1e-6); - } - else - { - lifespan_timer_->cancel_timer(); + if (qos_.lifespan().duration != dds::c_TimeInfinite) + { + lifespan_duration_us_ = + duration>(qos_.lifespan().duration.to_ns() * 1e-3); + lifespan_timer_->update_interval_millisec(qos_.lifespan().duration.to_ns() * 1e-6); + } + else + { + lifespan_timer_->cancel_timer(); + } } } @@ -1329,7 +1326,7 @@ void DataWriterImpl::InnerDataWriterListener::on_offered_incompatible_qos( #ifdef FASTDDS_STATISTICS notify_status_observer(statistics::StatusKind::INCOMPATIBLE_QOS); -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS data_writer_->user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true); } @@ -1368,7 +1365,7 @@ void DataWriterImpl::InnerDataWriterListener::on_liveliness_lost( #ifdef FASTDDS_STATISTICS notify_status_observer(statistics::StatusKind::LIVELINESS_LOST); -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS data_writer_->user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true); } @@ -1412,7 +1409,7 @@ void DataWriterImpl::InnerDataWriterListener::notify_status_observer( } } -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS ReturnCode_t DataWriterImpl::wait_for_acknowledgments( const dds::Duration_t& max_wait) @@ -1548,10 +1545,12 @@ ReturnCode_t DataWriterImpl::set_related_datareader( bool DataWriterImpl::deadline_timer_reschedule() { - assert(qos_.deadline().period != dds::c_TimeInfinite); - std::unique_lock lock(writer_->getMutex()); + assert(qos_.deadline().period != dds::c_TimeInfinite); + assert(deadline_timer_ != nullptr); + assert(deadline_missed_status_.total_count < std::numeric_limits::max()); + steady_clock::time_point next_deadline_us; if (!history_->get_next_deadline(timer_owner_, next_deadline_us)) { @@ -1564,18 +1563,57 @@ bool DataWriterImpl::deadline_timer_reschedule() return true; } -bool DataWriterImpl::deadline_missed() +void DataWriterImpl::configure_deadline_timer_() { - assert(qos_.deadline().period != dds::c_TimeInfinite); - std::unique_lock lock(writer_->getMutex()); - deadline_missed_status_.total_count++; - deadline_missed_status_.total_count_change++; - deadline_missed_status_.last_instance_handle = timer_owner_; + // Create the timer once + if (deadline_timer_ == nullptr) + { + deadline_timer_ = new TimedEvent( + publisher_->rtps_participant()->get_resource_event(), + [this]() -> bool + { + return deadline_missed(); + }, + // Park timer with a huge interval (prevents spurious callbacks); we'll arm/cancel explicitly + std::numeric_limits::max() + ); + } + + // Handle "infinite" and "zero" outside the callback + if (qos_.deadline().period == dds::c_TimeInfinite) + { + deadline_duration_us_ = std::chrono::duration::max(); + deadline_timer_->cancel_timer(); + return; + } + + deadline_duration_us_ = + std::chrono::duration>(qos_.deadline().period.to_ns() * 1e-3); + + if (qos_.deadline().period.to_ns() == 0) + { + deadline_timer_->cancel_timer(); + + deadline_missed_status_.total_count = std::numeric_limits::max(); + deadline_missed_status_.total_count_change = std::numeric_limits::max(); + EPROSIMA_LOG_WARNING( + DATA_WRITER, + "Deadline period is 0, it will be ignored from now on."); + + // Bump once and notify listener exactly once. + notify_deadline_missed_nts_(); + return; + } + + deadline_timer_->update_interval_millisec(qos_.deadline().period.to_ns() * 1e-6); +} + +void DataWriterImpl::notify_deadline_missed_nts_() +{ StatusMask notify_status = StatusMask::offered_deadline_missed(); - auto listener = get_listener_for(notify_status); - if (nullptr != listener) + if (auto* listener = get_listener_for(notify_status)) { listener->on_offered_deadline_missed(user_datawriter_, deadline_missed_status_); deadline_missed_status_.total_count_change = 0; @@ -1583,9 +1621,31 @@ bool DataWriterImpl::deadline_missed() #ifdef FASTDDS_STATISTICS writer_listener_.notify_status_observer(statistics::StatusKind::DEADLINE_MISSED); -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true); +} + +bool DataWriterImpl::deadline_missed() +{ + std::unique_lock lock(writer_->getMutex()); + + assert(qos_.deadline().period != dds::c_TimeInfinite); + + deadline_missed_status_.total_count++; + deadline_missed_status_.total_count_change++; + deadline_missed_status_.last_instance_handle = timer_owner_; + + notify_deadline_missed_nts_(); + + // If we just reached the max -> log ONCE, stop timer, and bail. + if (deadline_missed_status_.total_count == std::numeric_limits::max()) + { + EPROSIMA_LOG_WARNING(DATA_WRITER, + "Maximum number of deadline missed messages reached. Stopping deadline timer."); + deadline_timer_->cancel_timer(); + return false; // do not reschedule + } if (!history_->set_next_deadline( timer_owner_, diff --git a/src/cpp/fastdds/publisher/DataWriterImpl.hpp b/src/cpp/fastdds/publisher/DataWriterImpl.hpp index 13bd408662f..af062382c4f 100644 --- a/src/cpp/fastdds/publisher/DataWriterImpl.hpp +++ b/src/cpp/fastdds/publisher/DataWriterImpl.hpp @@ -20,6 +20,7 @@ #define _FASTDDS_DATAWRITERIMPL_HPP_ #include +#include #include #include @@ -643,6 +644,7 @@ class DataWriterImpl : protected rtps::IReaderDataFilter /** * @brief A method to reschedule the deadline timer + * @return true if deadline rescheduling succeeded, false otherwise */ bool deadline_timer_reschedule(); @@ -771,6 +773,18 @@ class DataWriterImpl : protected rtps::IReaderDataFilter private: + /** + * (Re)configures the deadline timer: + * In case of infinite deadline period cancel it, for 0 warn and notify once (with max counts), and + * for non-infinite positive values store period. + */ + void configure_deadline_timer_(); + + /** + * Notifies listeners that a deadline has been missed. + */ + void notify_deadline_missed_nts_(); + void create_history( const std::shared_ptr& payload_pool, const std::shared_ptr& change_pool); diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp index 935dfb74f8b..9bb7ba3fde8 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp @@ -61,7 +61,7 @@ #include #else #include -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS using eprosima::fastdds::RecursiveTimedMutex; using eprosima::fastdds::dds::c_TimeInfinite; @@ -265,12 +265,7 @@ ReturnCode_t DataReaderImpl::enable() reader_ = reader; - deadline_timer_ = new TimedEvent(subscriber_->rtps_participant()->get_resource_event(), - [&]() -> bool - { - return deadline_missed(); - }, - qos_.deadline().period.to_ns() * 1e-6); + configure_deadline_timer_(); lifespan_timer_ = new TimedEvent(subscriber_->rtps_participant()->get_resource_event(), [&]() -> bool @@ -288,7 +283,7 @@ ReturnCode_t DataReaderImpl::enable() SubscriptionBuiltinTopicData subscription_data; if (get_subscription_builtin_topic_data(subscription_data) != RETCODE_OK) { - EPROSIMA_LOG_ERROR(DATA_WRITER, "Error getting subscription data. RTPS Writer not enabled."); + EPROSIMA_LOG_ERROR(DATA_READER, "Error getting subscription data. RTPS Reader not enabled."); return RETCODE_ERROR; } @@ -343,7 +338,7 @@ void DataReaderImpl::stop() DataReaderImpl::~DataReaderImpl() { - // assert there are no pending conditions + // Assert there are no pending conditions assert(read_conditions_.empty()); // Disable the datareader to prevent receiving data in the middle of deleting it @@ -394,7 +389,7 @@ void DataReaderImpl::set_read_communication_status( { auto user_reader = user_datareader_; - //First check if we can handle with on_data_on_readers + // First check if we can handle with on_data_on_readers SubscriberListener* subscriber_listener = subscriber_->get_listener_for(StatusMask::data_on_readers()); if (subscriber_listener != nullptr) @@ -887,6 +882,9 @@ ReturnCode_t DataReaderImpl::set_qos( return RETCODE_IMMUTABLE_POLICY; } + // Take a snapshot of the current QoS before mutating it + const DataReaderQos old_qos = qos_; + set_qos(qos_, qos_to_set, !enabled); if (enabled) @@ -894,27 +892,25 @@ ReturnCode_t DataReaderImpl::set_qos( // NOTIFY THE BUILTIN PROTOCOLS THAT THE READER HAS CHANGED update_rtps_reader_qos(); - // Deadline - if (qos_.deadline().period != dds::c_TimeInfinite) + // If the deadline period actually changed, (re)configure the timer. + if (old_qos.deadline().period != qos_.deadline().period) { - deadline_duration_us_ = duration>(qos_.deadline().period.to_ns() * 1e-3); - deadline_timer_->update_interval_millisec(qos_.deadline().period.to_ns() * 1e-6); - } - else - { - deadline_timer_->cancel_timer(); + configure_deadline_timer_(); } // Lifespan - if (qos_.lifespan().duration != dds::c_TimeInfinite) + if (old_qos.lifespan().duration != qos_.lifespan().duration) { - lifespan_duration_us_ = - std::chrono::duration>(qos_.lifespan().duration.to_ns() * 1e-3); - lifespan_timer_->update_interval_millisec(qos_.lifespan().duration.to_ns() * 1e-6); - } - else - { - lifespan_timer_->cancel_timer(); + if (qos_.lifespan().duration != dds::c_TimeInfinite) + { + lifespan_duration_us_ = + std::chrono::duration>(qos_.lifespan().duration.to_ns() * 1e-3); + lifespan_timer_->update_interval_millisec(qos_.lifespan().duration.to_ns() * 1e-6); + } + else + { + lifespan_timer_->cancel_timer(); + } } } @@ -978,7 +974,7 @@ void DataReaderImpl::InnerDataReaderListener::on_liveliness_changed( #ifdef FASTDDS_STATISTICS notify_status_observer(statistics::StatusKind::LIVELINESS_CHANGED); -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS data_reader_->user_datareader_->get_statuscondition().get_impl()->set_status(notify_status, true); } @@ -1001,7 +997,7 @@ void DataReaderImpl::InnerDataReaderListener::on_requested_incompatible_qos( #ifdef FASTDDS_STATISTICS notify_status_observer(statistics::StatusKind::INCOMPATIBLE_QOS); -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS data_reader_->user_datareader_->get_statuscondition().get_impl()->set_status(notify_status, true); } @@ -1024,7 +1020,7 @@ void DataReaderImpl::InnerDataReaderListener::on_sample_lost( #ifdef FASTDDS_STATISTICS notify_status_observer(statistics::StatusKind::SAMPLE_LOST); -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS data_reader_->user_datareader_->get_statuscondition().get_impl()->set_status(notify_status, true); } @@ -1058,12 +1054,12 @@ void DataReaderImpl::InnerDataReaderListener::notify_status_observer( { if (!statistics_pp_impl->get_status_observer()->on_local_entity_status_change(data_reader_->guid(), status_id)) { - EPROSIMA_LOG_ERROR(DATA_WRITER, "Could not set entity status"); + EPROSIMA_LOG_ERROR(DATA_READER, "Could not set entity status"); } } } -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS bool DataReaderImpl::on_data_available( const fastdds::rtps::GUID_t& writer_guid, @@ -1102,13 +1098,14 @@ bool DataReaderImpl::on_new_cache_change_added( return false; } - if (qos_.deadline().period != dds::c_TimeInfinite) + if (qos_.deadline().period.to_ns() > 0 && qos_.deadline().period != dds::c_TimeInfinite && + deadline_missed_status_.total_count < std::numeric_limits::max()) { if (!history_.set_next_deadline( change->instanceHandle, steady_clock::now() + duration_cast(deadline_duration_us_))) { - EPROSIMA_LOG_ERROR(SUBSCRIBER, "Could not set next deadline in the history"); + EPROSIMA_LOG_ERROR(DATA_READER, "Could not set next deadline in the history"); } else if (timer_owner_ == change->instanceHandle || timer_owner_ == InstanceHandle_t()) { @@ -1150,7 +1147,7 @@ bool DataReaderImpl::on_new_cache_change_added( } else { - EPROSIMA_LOG_ERROR(SUBSCRIBER, "A change was added to history that could not be retrieved"); + EPROSIMA_LOG_ERROR(DATA_READER, "A change was added to history that could not be retrieved"); } // Update and restart the timer @@ -1248,14 +1245,16 @@ ReturnCode_t DataReaderImpl::get_matched_publications( bool DataReaderImpl::deadline_timer_reschedule() { - assert(qos_.deadline().period != dds::c_TimeInfinite); - std::unique_lock lock(reader_->getMutex()); + assert(qos_.deadline().period != dds::c_TimeInfinite); + assert(deadline_timer_ != nullptr); + assert(deadline_missed_status_.total_count < std::numeric_limits::max()); + steady_clock::time_point next_deadline_us; if (!history_.get_next_deadline(timer_owner_, next_deadline_us)) { - EPROSIMA_LOG_ERROR(SUBSCRIBER, "Could not get the next deadline from the history"); + EPROSIMA_LOG_ERROR(DATA_READER, "Could not get the next deadline from the history"); return false; } auto interval_ms = duration_cast(next_deadline_us - steady_clock::now()); @@ -1264,15 +1263,55 @@ bool DataReaderImpl::deadline_timer_reschedule() return true; } -bool DataReaderImpl::deadline_missed() +void DataReaderImpl::configure_deadline_timer_() { - assert(qos_.deadline().period != dds::c_TimeInfinite); - std::unique_lock lock(reader_->getMutex()); - deadline_missed_status_.total_count++; - deadline_missed_status_.total_count_change++; - deadline_missed_status_.last_instance_handle = timer_owner_; + // Create the timer once + if (deadline_timer_ == nullptr) + { + deadline_timer_ = new TimedEvent( + subscriber_->rtps_participant()->get_resource_event(), + [this]() -> bool + { + return deadline_missed(); + }, + // Park timer with a huge interval (prevents spurious callbacks); we'll arm/cancel explicitly + std::numeric_limits::max() + ); + } + + // Handle "infinite" and "zero" outside the callback + if (qos_.deadline().period == dds::c_TimeInfinite) + { + deadline_duration_us_ = std::chrono::duration::max(); + deadline_timer_->cancel_timer(); + return; + } + + deadline_duration_us_ = + std::chrono::duration>(qos_.deadline().period.to_ns() * 1e-3); + + if (qos_.deadline().period.to_ns() == 0) + { + deadline_timer_->cancel_timer(); + + deadline_missed_status_.total_count = std::numeric_limits::max(); + deadline_missed_status_.total_count_change = std::numeric_limits::max(); + EPROSIMA_LOG_WARNING( + DATA_READER, + "Deadline period is 0, it will be ignored from now on."); + + // Bump once and notify listener exactly once. + notify_deadline_missed_nts_(); + return; + } + + deadline_timer_->update_interval_millisec(qos_.deadline().period.to_ns() * 1e-6); +} + +void DataReaderImpl::notify_deadline_missed_nts_() +{ StatusMask notify_status = StatusMask::requested_deadline_missed(); auto listener = get_listener_for(notify_status); if (nullptr != listener) @@ -1283,15 +1322,37 @@ bool DataReaderImpl::deadline_missed() #ifdef FASTDDS_STATISTICS reader_listener_.notify_status_observer(statistics::StatusKind::DEADLINE_MISSED); -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS user_datareader_->get_statuscondition().get_impl()->set_status(notify_status, true); +} + +bool DataReaderImpl::deadline_missed() +{ + std::unique_lock lock(reader_->getMutex()); + + assert(qos_.deadline().period != dds::c_TimeInfinite); + + deadline_missed_status_.total_count++; + deadline_missed_status_.total_count_change++; + deadline_missed_status_.last_instance_handle = timer_owner_; + + notify_deadline_missed_nts_(); + + // If we just reached the max -> log ONCE, stop timer, and bail + if (deadline_missed_status_.total_count == std::numeric_limits::max()) + { + EPROSIMA_LOG_WARNING(DATA_READER, + "Maximum number of deadline missed messages reached. Stopping deadline timer."); + deadline_timer_->cancel_timer(); + return false; // do not reschedule + } if (!history_.set_next_deadline( timer_owner_, steady_clock::now() + duration_cast(deadline_duration_us_), true)) { - EPROSIMA_LOG_ERROR(SUBSCRIBER, "Could not set next deadline in the history"); + EPROSIMA_LOG_ERROR(DATA_READER, "Could not set next deadline in the history"); return false; } return deadline_timer_reschedule(); @@ -1925,7 +1986,7 @@ ReturnCode_t DataReaderImpl::check_datasharing_compatible( return RETCODE_OK; break; default: - EPROSIMA_LOG_ERROR(DATA_WRITER, "Unknown data sharing kind."); + EPROSIMA_LOG_ERROR(DATA_READER, "Unknown data sharing kind."); return RETCODE_BAD_PARAMETER; } } @@ -1957,14 +2018,14 @@ ReturnCode_t DataReaderImpl::delete_contained_entities() // Check pending ReadConditions for (detail::ReadConditionImpl* impl : read_conditions_) { - // should be alive + // Should be alive auto keep_alive = impl->shared_from_this(); assert((bool)keep_alive); - // free ReadConditions + // Free ReadConditions impl->detach_all_conditions(); } - // release the colection + // Release the collection read_conditions_.clear(); return RETCODE_OK; @@ -2074,12 +2135,12 @@ ReadCondition* DataReaderImpl::create_readcondition( if (it != read_conditions_.end()) { - // already there + // Already there impl = (*it)->shared_from_this(); } else { - // create a new one + // Create a new one impl = std::make_shared(*this, key); impl->set_trigger_value(current_mask); // Add the implementation object to the collection @@ -2090,7 +2151,7 @@ ReadCondition* DataReaderImpl::create_readcondition( ReadCondition* cond = new ReadCondition(); auto ret_code = impl->attach_condition(cond); - // attach cannot fail in this scenario + // Attach cannot fail in this scenario assert(RETCODE_OK == ret_code); (void)ret_code; @@ -2126,7 +2187,7 @@ ReturnCode_t DataReaderImpl::delete_readcondition( # ifdef __cpp_lib_enable_shared_from_this std::weak_ptr wp = impl->weak_from_this(); # else - // remove when C++17 is enforced + // Remove when C++17 is enforced auto wp = std::weak_ptr(impl->shared_from_this()); # endif // ifdef __cpp_lib_enable_shared_from_this @@ -2135,10 +2196,10 @@ ReturnCode_t DataReaderImpl::delete_readcondition( if (RETCODE_OK == ret_code) { - // delete the condition + // Delete the condition delete a_condition; - // check if we must remove the implementation object + // Check if we must remove the implementation object if (wp.expired()) { read_conditions_.erase(it); @@ -2183,7 +2244,7 @@ void DataReaderImpl::try_notify_read_conditions() noexcept last_mask_state_.instance_states & ~old_mask.instance_states; } - // traverse the conditions notifying + // Traverse the conditions notifying std::lock_guard _(get_conditions_mutex()); for (detail::ReadConditionImpl* impl : read_conditions_) { @@ -2203,7 +2264,7 @@ ReturnCode_t DataReaderImpl::get_subscription_builtin_topic_data( return RETCODE_NOT_ENABLED; } - // sanity checks + // Sanity checks assert(nullptr != subscriber_); assert(nullptr != topic_); assert(nullptr != subscriber_->get_participant()); diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp index 791335f881d..52116b4d6bf 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp @@ -697,6 +697,17 @@ class DataReaderImpl private: + /** + * (Re)configures the deadline timer: + * In case of infinite deadline period cancel it, for 0 warn and notify once (with max counts), and + * for non-infinite positive values store period. + */ + void configure_deadline_timer_(); + + /** + * Notifies listeners that a deadline has been missed. + */ + void notify_deadline_missed_nts_(); void update_rtps_reader_qos(); DataReaderQos get_datareader_qos_from_settings( diff --git a/test/blackbox/CMakeLists.txt b/test/blackbox/CMakeLists.txt index ef8eae59d3d..7021dd73365 100644 --- a/test/blackbox/CMakeLists.txt +++ b/test/blackbox/CMakeLists.txt @@ -218,7 +218,12 @@ if(FASTDDS_PIM_API_TESTS) ) target_include_directories(BlackboxTests_DDS_PIM PRIVATE ${Asio_INCLUDE_DIR} +<<<<<<< HEAD api/dds-pim) +======= + ${CMAKE_CURRENT_SOURCE_DIR}/api/dds-pim + ${PROJECT_SOURCE_DIR}/test/utils) +>>>>>>> 3230d1d7 (Handle maximum deadline misses case (#6016)) target_link_libraries(BlackboxTests_DDS_PIM fastdds fastcdr diff --git a/test/blackbox/api/dds-pim/PubSubReader.hpp b/test/blackbox/api/dds-pim/PubSubReader.hpp index 9ea595d74be..7f1bc7f1d1f 100644 --- a/test/blackbox/api/dds-pim/PubSubReader.hpp +++ b/test/blackbox/api/dds-pim/PubSubReader.hpp @@ -177,7 +177,6 @@ class PubSubReader Listener( PubSubReader& reader) : reader_(reader) - , times_deadline_missed_(0) { } @@ -227,8 +226,8 @@ class PubSubReader const eprosima::fastdds::dds::RequestedDeadlineMissedStatus& status) override { (void)datareader; - - times_deadline_missed_ = status.total_count; + std::lock_guard lk(mutex_); + requested_deadline_status_ = status; } void on_requested_incompatible_qos( @@ -279,7 +278,14 @@ class PubSubReader unsigned int missed_deadlines() const { - return times_deadline_missed_; + std::lock_guard lk(mutex_); + return requested_deadline_status_.total_count; + } + + unsigned int missed_deadlines_change() const + { + std::lock_guard lk(mutex_); + return requested_deadline_status_.total_count_change; } private: @@ -288,6 +294,9 @@ class PubSubReader const Listener&) = delete; PubSubReader& reader_; + mutable std::mutex mutex_; + + eprosima::fastdds::dds::RequestedDeadlineMissedStatus requested_deadline_status_{}; //! Number of times deadline was missed unsigned int times_deadline_missed_; @@ -1815,6 +1824,11 @@ class PubSubReader return listener_.missed_deadlines(); } + unsigned int missed_deadlines_change() const + { + return listener_.missed_deadlines_change(); + } + void liveliness_lost() { std::unique_lock lock(liveliness_mutex_); @@ -1893,6 +1907,22 @@ class PubSubReader return status; } + bool set_qos() + { + return (eprosima::fastdds::dds::RETCODE_OK == datareader_->set_qos(datareader_qos_)); + } + + bool set_qos( + const eprosima::fastdds::dds::DataReaderQos& att) + { + return (eprosima::fastdds::dds::RETCODE_OK == datareader_->set_qos(att)); + } + + eprosima::fastdds::dds::DataReaderQos get_qos() + { + return (datareader_->get_qos()); + } + bool is_matched() const { return matched_ > 0; diff --git a/test/blackbox/api/dds-pim/PubSubWriter.hpp b/test/blackbox/api/dds-pim/PubSubWriter.hpp index ce81e693ddc..da0c495e58c 100644 --- a/test/blackbox/api/dds-pim/PubSubWriter.hpp +++ b/test/blackbox/api/dds-pim/PubSubWriter.hpp @@ -183,7 +183,6 @@ class PubSubWriter Listener( PubSubWriter& writer) : writer_(writer) - , times_deadline_missed_(0) , times_liveliness_lost_(0) , times_unack_sample_removed_(0) { @@ -214,7 +213,8 @@ class PubSubWriter const eprosima::fastdds::dds::OfferedDeadlineMissedStatus& status) override { static_cast(datawriter); - times_deadline_missed_ = status.total_count; + std::lock_guard lk(mutex_); + offered_deadline_status_ = status; } void on_offered_incompatible_qos( @@ -245,7 +245,14 @@ class PubSubWriter unsigned int missed_deadlines() const { - return times_deadline_missed_; + std::lock_guard lk(mutex_); + return offered_deadline_status_.total_count; + } + + unsigned int missed_deadlines_change() const + { + std::lock_guard lk(mutex_); + return offered_deadline_status_.total_count_change; } unsigned int times_liveliness_lost() const @@ -269,9 +276,10 @@ class PubSubWriter const Listener&) = delete; PubSubWriter& writer_; + mutable std::mutex mutex_; + + eprosima::fastdds::dds::OfferedDeadlineMissedStatus offered_deadline_status_{}; - //! The number of times deadline was missed - unsigned int times_deadline_missed_; //! The number of times liveliness was lost unsigned int times_liveliness_lost_; //! The number of times a sample has been removed unacknowledged @@ -1747,6 +1755,11 @@ class PubSubWriter return listener_.missed_deadlines(); } + unsigned int missed_deadlines_change() const + { + return listener_.missed_deadlines_change(); + } + unsigned int times_liveliness_lost() const { return listener_.times_liveliness_lost(); diff --git a/test/blackbox/common/BlackboxTestsDeadlineQos.cpp b/test/blackbox/common/BlackboxTestsDeadlineQos.cpp index 4ec14da1ac3..fda219c2cc7 100644 --- a/test/blackbox/common/BlackboxTestsDeadlineQos.cpp +++ b/test/blackbox/common/BlackboxTestsDeadlineQos.cpp @@ -22,8 +22,11 @@ #include "PubSubReader.hpp" #include "PubSubWriter.hpp" +#include + using namespace eprosima::fastdds; using namespace eprosima::fastdds::rtps; +using fastlog = eprosima::fastdds::dds::Log; enum communication_type { @@ -32,7 +35,7 @@ enum communication_type DATASHARING }; -class DeadlineQos : public testing::TestWithParam +class DeadlineQos : public ::testing::TestWithParam { public: @@ -309,6 +312,179 @@ TEST_P(DeadlineQos, KeyedTopicBestEffortReaderVolatileWriterSetDeadline) EXPECT_GE(writer.missed_deadlines(), 1u); } +/** + * Testing Redmine issue #23289. + * Writer-side version of ZeroDeadlinePeriod. + * Regression test for the zero-deadline period bug. + * Creating a DataWriter with a deadline of 0. + * Checking if a warning is logged exactly once, the timer is cancelled without missed deadline + * messages and a total count and count change set to max integer. + * Checking warnings, total count and count change when changing the deadline. + */ +TEST_P(DeadlineQos, ZeroDeadlinePeriodWriter) +{ + auto observer = std::make_shared(/*store=*/ false); + auto consumer = std::make_unique(observer); + + fastlog::ClearConsumers(); + fastlog::RegisterConsumer(std::move(consumer)); + fastlog::SetVerbosity(fastlog::Kind::Warning); + + observer->set_global_needle("Deadline period is 0"); + + PubSubWriter writer(TEST_TOPIC_NAME); + PubSubReader reader(TEST_TOPIC_NAME); + + writer.durability_kind(eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS); + writer.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS); + reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS); + + writer.deadline_period(0.0).init(); + reader.init(); + ASSERT_TRUE(writer.isInitialized()); + ASSERT_TRUE(reader.isInitialized()); + + writer.wait_discovery(); + reader.wait_discovery(); + + auto data = default_keyedhelloworld_data_generator(1); + writer.send_sample(data.front()); + + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + // Writer offered-deadline counters should be saturated + EXPECT_EQ(writer.missed_deadlines(), + std::numeric_limits::max()) << "Expected the max value after a zero-deadline warning."; + EXPECT_EQ(writer.missed_deadlines_change(), std::numeric_limits::max()); + + const auto prev = observer->matched_global(); + EXPECT_EQ(prev, 1u) << "Expected exactly one 'deadline=0' warning\n"; + + const auto pre_total = writer.missed_deadlines(); + const auto pre_change = writer.missed_deadlines_change(); + + // Wait for a period long enough to expect a new miss if the timer were still active + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + const auto post_total = writer.missed_deadlines(); + const auto post_change = writer.missed_deadlines_change(); + + EXPECT_EQ(pre_total, post_total) << "The total count should not change, as the timer was canceled."; + EXPECT_EQ(pre_change, post_change) << "The total_count_change should not change, as the timer was canceled."; + EXPECT_EQ(observer->matched_global(), prev) << "No extra warnings after cancel."; + + auto q = writer.get_qos(); + q.deadline().period = Duration_t(0.1); + + ASSERT_TRUE(writer.set_qos(q)); // Update 0 -> finite + + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + EXPECT_EQ(writer.missed_deadlines(), std::numeric_limits::max()); + EXPECT_EQ(writer.missed_deadlines_change(), std::numeric_limits::max()); + EXPECT_EQ(observer->matched_global(), prev) << "No new warning when moving reader from 0 -> finite"; + + q.deadline().period = Duration_t(0.0); + + ASSERT_TRUE(writer.set_qos(q)); // Update finite -> 0 + + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + EXPECT_EQ(writer.missed_deadlines(), std::numeric_limits::max()); + EXPECT_EQ(writer.missed_deadlines_change(), std::numeric_limits::max()); + EXPECT_EQ(observer->matched_global(), prev + 1) << "Exactly one new warning."; + + fastlog::ClearConsumers(); +} + +/** + * Testing Redmine issue #23289. + * Regression test for the zero-deadline period bug. + * Reader-side version of ZeroDeadlinePeriod. + * Creating a DataReader with a deadline of 0. + * Checking if a warning is logged exactly once, the timer is cancelled without missed deadline + * messages and a total count and count change set to max integer. + * Checking warnings, total count and count change when changing the deadline. + */ +TEST_P(DeadlineQos, ZeroDeadlinePeriodReader) +{ + PubSubWriter writer(TEST_TOPIC_NAME); + PubSubReader reader(TEST_TOPIC_NAME); + + writer.durability_kind(eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS); + writer.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS); + reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS); + + // Writer deadline must also be 0 to satisfy the matching rule and ensure discovery + writer.deadline_period(0.0).init(); + ASSERT_TRUE(writer.isInitialized()); + + auto observer = std::make_shared(/*store=*/ false); + auto consumer = std::make_unique(observer); + + fastlog::ClearConsumers(); + fastlog::RegisterConsumer(std::move(consumer)); + fastlog::SetVerbosity(fastlog::Kind::Warning); + + observer->set_global_needle("Deadline period is 0"); + + // Zero deadline on the READER + reader.deadline_period(0.0).init(); + ASSERT_TRUE(reader.isInitialized()); + + writer.wait_discovery(); + reader.wait_discovery(); + + auto data = default_keyedhelloworld_data_generator(1); + writer.send_sample(data.front()); + + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + // Reader requested-deadline counters should be saturated + EXPECT_EQ(reader.missed_deadlines(), + std::numeric_limits::max()) << "Expected the max value after a zero-deadline warning."; + EXPECT_EQ(reader.missed_deadlines_change(), std::numeric_limits::max()); + + const auto prev = observer->matched_global(); + EXPECT_EQ(prev, 1u) << "Expected exactly one 'deadline=0' warning\n"; + + const auto pre_total = reader.missed_deadlines(); + const auto pre_change = reader.missed_deadlines_change(); + + // Wait for a period long enough to expect a new miss if the timer were still active + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + const auto post_total = reader.missed_deadlines(); + const auto post_change = reader.missed_deadlines_change(); + + EXPECT_EQ(pre_total, post_total) << "Timer canceled on reader; total must not change."; + EXPECT_EQ(pre_change, post_change) << "Timer canceled on reader; total_count_change must not change."; + EXPECT_EQ(observer->matched_global(), prev) << "No extra warnings after cancel."; + + // Now change reader's deadline from 0 -> finite; still no additional warning; counters remain saturated + auto q = reader.get_qos(); + q.deadline().period = Duration_t(0.1); + ASSERT_TRUE(reader.set_qos(q)); + + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + EXPECT_EQ(reader.missed_deadlines(), std::numeric_limits::max()); + EXPECT_EQ(reader.missed_deadlines_change(), std::numeric_limits::max()); + EXPECT_EQ(observer->matched_global(), prev) << "No new warning when moving reader from 0 -> finite"; + + q.deadline().period = Duration_t(0.0); + + ASSERT_TRUE(reader.set_qos(q)); // Update finite -> 0 + + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + EXPECT_EQ(reader.missed_deadlines(), std::numeric_limits::max()); + EXPECT_EQ(reader.missed_deadlines_change(), std::numeric_limits::max()); + EXPECT_EQ(observer->matched_global(), prev + 1) << "Exactly one new warning."; + + fastlog::ClearConsumers(); +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else @@ -317,8 +493,8 @@ TEST_P(DeadlineQos, KeyedTopicBestEffortReaderVolatileWriterSetDeadline) GTEST_INSTANTIATE_TEST_MACRO(DeadlineQos, DeadlineQos, - testing::Values(TRANSPORT, INTRAPROCESS, DATASHARING), - [](const testing::TestParamInfo& info) + ::testing::Values(TRANSPORT, INTRAPROCESS, DATASHARING), + [](const ::testing::TestParamInfo& info) { switch (info.param) { diff --git a/test/utils/LogCounter.hpp b/test/utils/LogCounter.hpp new file mode 100644 index 00000000000..a75556e626f --- /dev/null +++ b/test/utils/LogCounter.hpp @@ -0,0 +1,149 @@ +// Copyright 2025 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file LogCounter.hpp + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include + +namespace eprosima { +namespace fastdds { +namespace testing { + +/** + * This class holds all counting/state logic: + * - Counts per Log::Kind (Warning, Error, Info, ...) + * - Optional storage of full entries (disabled by default) + * - "Needle" matcher for exact message occurrences + * Intended to be used behind a LogCounterConsumer + */ +class LogCounterObserver +{ +public: + + using Log = eprosima::fastdds::dds::Log; + using Kind = Log::Kind; + + explicit LogCounterObserver( + bool store_logs = false) + : store_(store_logs) + { + matched_.store(0, std::memory_order_relaxed); + } + + // Set / reset the message substring to match and count + void set_global_needle( + std::string s) + { + std::lock_guard lk(m_); + needle_ = std::move(s); + matched_.store(0, std::memory_order_relaxed); + } + + // Number of messages that matched the current needle (substring match) + size_t matched_global() const + { + return matched_.load(std::memory_order_relaxed); + } + + // Count of logs for a specific kind (Warning, Error, ...) + size_t count( + Kind k) const + { + std::lock_guard lk(m_); + auto it = counts_.find(k); + return (it == counts_.end()) ? 0 : it->second; + } + + // Get stored entries (only if constructed with store_logs=true) + const std::vector& entries() const + { + return entries_; + } + + // Called by the consumer + void on_log( + const Log::Entry& e) + { + std::string local_needle; + { + std::lock_guard lk(m_); + ++counts_[e.kind]; + local_needle = needle_; + if (store_) + { + entries_.push_back(e); + } + } + if (!local_needle.empty() && e.message.find(local_needle) != std::string::npos) + { + matched_.fetch_add(1, std::memory_order_relaxed); + } + } + +private: + + std::map counts_; + std::atomic matched_{0}; + + bool store_; + mutable std::mutex m_; + std::string needle_; + std::vector entries_; +}; + +/** + * Class holding a shared_ptr that ensures the observer + * outlives asynchronous logging, so tests can safely read counters even if a + * failure occurs mid-test. + */ +class LogCounterConsumer : public eprosima::fastdds::dds::LogConsumer +{ +public: + + using Log = eprosima::fastdds::dds::Log; + + explicit LogCounterConsumer( + std::shared_ptr obs) + : observer_(std::move(obs)) + { + } + + void Consume( + const Log::Entry& e) override + { + if (observer_) + { + observer_->on_log(e); + } + } + +private: + + std::shared_ptr observer_; +}; + +} // namespace testing +} // namespace fastdds +} // namespace eprosima