Skip to content

Commit ccfb5d5

Browse files
committed
Handle maximum deadline misses case (#6016)
* Refs #23289. Handle maximum deadline misses case. Data writer implementation. Signed-off-by: zesk1999 <[email protected]> * Refs #23289. Handle maximum deadline misses case. Data reader implementation. Signed-off-by: zesk1999 <[email protected]> * Refs #23289. Handle maximum deadline misses case. Tests. Signed-off-by: zesk1999 <[email protected]> --------- Signed-off-by: zesk1999 <[email protected]> (cherry picked from commit 3230d1d) Signed-off-by: zesk1999 <[email protected]> # Conflicts: # src/cpp/fastdds/publisher/DataWriterImpl.cpp # src/cpp/fastdds/publisher/DataWriterImpl.hpp # src/cpp/fastdds/subscriber/DataReaderImpl.cpp # test/blackbox/CMakeLists.txt # test/blackbox/common/BlackboxTestsDeadlineQos.cpp
1 parent a552a33 commit ccfb5d5

File tree

9 files changed

+632
-111
lines changed

9 files changed

+632
-111
lines changed

src/cpp/fastdds/publisher/DataWriterImpl.cpp

Lines changed: 103 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
#ifdef FASTDDS_STATISTICS
5454
#include <statistics/fastdds/domain/DomainParticipantImpl.hpp>
5555
#include <statistics/types/monitorservice_types.h>
56-
#endif //FASTDDS_STATISTICS
56+
#endif // FASTDDS_STATISTICS
5757

5858
using namespace eprosima::fastrtps;
5959
using namespace eprosima::fastrtps::rtps;
@@ -397,12 +397,7 @@ ReturnCode_t DataWriterImpl::enable()
397397
// In case it has been loaded from the persistence DB, rebuild instances on history
398398
history_.rebuild_instances();
399399

400-
deadline_timer_ = new TimedEvent(publisher_->get_participant()->get_resource_event(),
401-
[&]() -> bool
402-
{
403-
return deadline_missed();
404-
},
405-
qos_.deadline().period.to_ns() * 1e-6);
400+
configure_deadline_timer_();
406401

407402
lifespan_timer_ = new TimedEvent(publisher_->get_participant()->get_resource_event(),
408403
[&]() -> bool
@@ -642,8 +637,8 @@ ReturnCode_t DataWriterImpl::check_write_preconditions(
642637
type_.get()->getKey(data, &instance_handle, is_key_protected);
643638
}
644639

645-
//Check if the Handle is different from the special value HANDLE_NIL and
646-
//does not correspond with the instance referred by the data
640+
// Check if the Handle is different from the special value HANDLE_NIL and
641+
// does not correspond with the instance referred by the data
647642
if (handle.isDefined() && handle != instance_handle)
648643
{
649644
return ReturnCode_t::RETCODE_PRECONDITION_NOT_MET;
@@ -992,6 +987,7 @@ ReturnCode_t DataWriterImpl::perform_create_new_change(
992987
}
993988
}
994989

990+
// new_change seeds the next per-instance deadline and reschedules the timer for the next sample
995991
CacheChange_t* ch = writer_->new_change(change_kind, handle);
996992
if (ch != nullptr)
997993
{
@@ -1024,7 +1020,8 @@ ReturnCode_t DataWriterImpl::perform_create_new_change(
10241020
return ReturnCode_t::RETCODE_TIMEOUT;
10251021
}
10261022

1027-
if (qos_.deadline().period != c_TimeInfinite)
1023+
if (qos_.deadline().period.to_ns() > 0 && qos_.deadline().period != c_TimeInfinite &&
1024+
deadline_missed_status_.total_count < std::numeric_limits<uint32_t>::max())
10281025
{
10291026
if (!history_.set_next_deadline(
10301027
handle,
@@ -1135,7 +1132,7 @@ void DataWriterImpl::publisher_qos_updated()
11351132
{
11361133
if (writer_ != nullptr)
11371134
{
1138-
//NOTIFY THE BUILTIN PROTOCOLS THAT THE WRITER HAS CHANGED
1135+
// NOTIFY THE BUILTIN PROTOCOLS THAT THE WRITER HAS CHANGED
11391136
WriterQos wqos = qos_.get_writerqos(get_publisher()->get_qos(), topic_->get_qos());
11401137
publisher_->rtps_participant()->updateWriter(writer_, get_topic_attributes(qos_, *topic_, type_), wqos);
11411138
}
@@ -1170,6 +1167,9 @@ ReturnCode_t DataWriterImpl::set_qos(
11701167
return ReturnCode_t::RETCODE_IMMUTABLE_POLICY;
11711168
}
11721169

1170+
// Take a snapshot of the current QoS before mutating it
1171+
const DataWriterQos old_qos = qos_;
1172+
11731173
set_qos(qos_, qos_to_set, !enabled);
11741174

11751175
if (enabled)
@@ -1185,33 +1185,30 @@ ReturnCode_t DataWriterImpl::set_qos(
11851185
writer_->updateAttributes(w_att);
11861186
}
11871187

1188-
//Notify the participant that a Writer has changed its QOS
1188+
// Notify the participant that a Writer has changed its QOS
11891189
fastrtps::TopicAttributes topic_att = get_topic_attributes(qos_, *topic_, type_);
11901190
WriterQos wqos = qos_.get_writerqos(get_publisher()->get_qos(), topic_->get_qos());
11911191
publisher_->rtps_participant()->updateWriter(writer_, topic_att, wqos);
11921192

1193-
// Deadline
1194-
if (qos_.deadline().period != c_TimeInfinite)
1193+
// If the deadline period actually changed, (re)configure the timer.
1194+
if (old_qos.deadline().period != qos_.deadline().period)
11951195
{
1196-
deadline_duration_us_ =
1197-
duration<double, std::ratio<1, 1000000>>(qos_.deadline().period.to_ns() * 1e-3);
1198-
deadline_timer_->update_interval_millisec(qos_.deadline().period.to_ns() * 1e-6);
1199-
}
1200-
else
1201-
{
1202-
deadline_timer_->cancel_timer();
1196+
configure_deadline_timer_();
12031197
}
12041198

12051199
// Lifespan
1206-
if (qos_.lifespan().duration != c_TimeInfinite)
1200+
if (old_qos.lifespan().duration != qos_.lifespan().duration)
12071201
{
1208-
lifespan_duration_us_ =
1209-
duration<double, std::ratio<1, 1000000>>(qos_.lifespan().duration.to_ns() * 1e-3);
1210-
lifespan_timer_->update_interval_millisec(qos_.lifespan().duration.to_ns() * 1e-6);
1211-
}
1212-
else
1213-
{
1214-
lifespan_timer_->cancel_timer();
1202+
if (qos_.lifespan().duration != c_TimeInfinite)
1203+
{
1204+
lifespan_duration_us_ =
1205+
duration<double, std::ratio<1, 1000000>>(qos_.lifespan().duration.to_ns() * 1e-3);
1206+
lifespan_timer_->update_interval_millisec(qos_.lifespan().duration.to_ns() * 1e-6);
1207+
}
1208+
else
1209+
{
1210+
lifespan_timer_->cancel_timer();
1211+
}
12151212
}
12161213
}
12171214

@@ -1283,7 +1280,7 @@ void DataWriterImpl::InnerDataWriterListener::on_offered_incompatible_qos(
12831280

12841281
#ifdef FASTDDS_STATISTICS
12851282
notify_status_observer(statistics::INCOMPATIBLE_QOS);
1286-
#endif //FASTDDS_STATISTICS
1283+
#endif // FASTDDS_STATISTICS
12871284

12881285
data_writer_->user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true);
12891286
}
@@ -1322,7 +1319,7 @@ void DataWriterImpl::InnerDataWriterListener::on_liveliness_lost(
13221319

13231320
#ifdef FASTDDS_STATISTICS
13241321
notify_status_observer(statistics::LIVELINESS_LOST);
1325-
#endif //FASTDDS_STATISTICS
1322+
#endif // FASTDDS_STATISTICS
13261323

13271324
data_writer_->user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true);
13281325
}
@@ -1366,7 +1363,7 @@ void DataWriterImpl::InnerDataWriterListener::notify_status_observer(
13661363
}
13671364
}
13681365

1369-
#endif //FASTDDS_STATISTICS
1366+
#endif // FASTDDS_STATISTICS
13701367

13711368
ReturnCode_t DataWriterImpl::wait_for_acknowledgments(
13721369
const Duration_t& max_wait)
@@ -1459,10 +1456,12 @@ ReturnCode_t DataWriterImpl::get_publication_matched_status(
14591456

14601457
bool DataWriterImpl::deadline_timer_reschedule()
14611458
{
1462-
assert(qos_.deadline().period != c_TimeInfinite);
1463-
14641459
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
14651460

1461+
assert(qos_.deadline().period != c_TimeInfinite);
1462+
assert(deadline_timer_ != nullptr);
1463+
assert(deadline_missed_status_.total_count < std::numeric_limits<uint32_t>::max());
1464+
14661465
steady_clock::time_point next_deadline_us;
14671466
if (!history_.get_next_deadline(timer_owner_, next_deadline_us))
14681467
{
@@ -1475,28 +1474,89 @@ bool DataWriterImpl::deadline_timer_reschedule()
14751474
return true;
14761475
}
14771476

1478-
bool DataWriterImpl::deadline_missed()
1477+
void DataWriterImpl::configure_deadline_timer_()
14791478
{
1480-
assert(qos_.deadline().period != c_TimeInfinite);
1481-
14821479
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
14831480

1484-
deadline_missed_status_.total_count++;
1485-
deadline_missed_status_.total_count_change++;
1486-
deadline_missed_status_.last_instance_handle = timer_owner_;
1481+
// Create the timer once
1482+
if (deadline_timer_ == nullptr)
1483+
{
1484+
deadline_timer_ = new TimedEvent(
1485+
publisher_->rtps_participant()->get_resource_event(),
1486+
[this]() -> bool
1487+
{
1488+
return deadline_missed();
1489+
},
1490+
// Park timer with a huge interval (prevents spurious callbacks); we'll arm/cancel explicitly
1491+
std::numeric_limits<double>::max()
1492+
);
1493+
}
1494+
1495+
// Handle "infinite" and "zero" outside the callback
1496+
if (qos_.deadline().period == c_TimeInfinite)
1497+
{
1498+
deadline_duration_us_ = std::chrono::duration<double, std::micro>::max();
1499+
deadline_timer_->cancel_timer();
1500+
return;
1501+
}
1502+
1503+
deadline_duration_us_ =
1504+
std::chrono::duration<double, std::ratio<1, 1000000>>(qos_.deadline().period.to_ns() * 1e-3);
1505+
1506+
if (qos_.deadline().period.to_ns() == 0)
1507+
{
1508+
deadline_timer_->cancel_timer();
1509+
1510+
deadline_missed_status_.total_count = std::numeric_limits<uint32_t>::max();
1511+
deadline_missed_status_.total_count_change = std::numeric_limits<uint32_t>::max();
1512+
EPROSIMA_LOG_WARNING(
1513+
DATA_WRITER,
1514+
"Deadline period is 0, it will be ignored from now on.");
1515+
1516+
// Bump once and notify listener exactly once.
1517+
notify_deadline_missed_nts_();
1518+
return;
1519+
}
1520+
1521+
deadline_timer_->update_interval_millisec(qos_.deadline().period.to_ns() * 1e-6);
1522+
}
1523+
1524+
void DataWriterImpl::notify_deadline_missed_nts_()
1525+
{
14871526
StatusMask notify_status = StatusMask::offered_deadline_missed();
1488-
auto listener = get_listener_for(notify_status);
1489-
if (nullptr != listener)
1527+
if (auto* listener = get_listener_for(notify_status))
14901528
{
14911529
listener->on_offered_deadline_missed(user_datawriter_, deadline_missed_status_);
14921530
deadline_missed_status_.total_count_change = 0;
14931531
}
14941532

14951533
#ifdef FASTDDS_STATISTICS
14961534
writer_listener_.notify_status_observer(statistics::DEADLINE_MISSED);
1497-
#endif //FASTDDS_STATISTICS
1535+
#endif // FASTDDS_STATISTICS
14981536

14991537
user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true);
1538+
}
1539+
1540+
bool DataWriterImpl::deadline_missed()
1541+
{
1542+
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
1543+
1544+
assert(qos_.deadline().period != c_TimeInfinite);
1545+
1546+
deadline_missed_status_.total_count++;
1547+
deadline_missed_status_.total_count_change++;
1548+
deadline_missed_status_.last_instance_handle = timer_owner_;
1549+
1550+
notify_deadline_missed_nts_();
1551+
1552+
// If we just reached the max -> log ONCE, stop timer, and bail.
1553+
if (deadline_missed_status_.total_count == std::numeric_limits<uint32_t>::max())
1554+
{
1555+
EPROSIMA_LOG_WARNING(DATA_WRITER,
1556+
"Maximum number of deadline missed messages reached. Stopping deadline timer.");
1557+
deadline_timer_->cancel_timer();
1558+
return false; // do not reschedule
1559+
}
15001560

15011561
if (!history_.set_next_deadline(
15021562
timer_owner_,

src/cpp/fastdds/publisher/DataWriterImpl.hpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#define _FASTRTPS_DATAWRITERIMPL_HPP_
2121

2222
#include <memory>
23+
#include <mutex>
2324

2425
#include <fastdds/dds/core/status/BaseStatus.hpp>
2526
#include <fastdds/dds/core/status/IncompatibleQosStatus.hpp>
@@ -578,6 +579,7 @@ class DataWriterImpl : protected rtps::IReaderDataFilter
578579

579580
/**
580581
* @brief A method to reschedule the deadline timer
582+
* @return true if deadline rescheduling succeeded, false otherwise
581583
*/
582584
bool deadline_timer_reschedule();
583585

@@ -736,6 +738,22 @@ class DataWriterImpl : protected rtps::IReaderDataFilter
736738

737739
private:
738740

741+
/**
742+
* (Re)configures the deadline timer:
743+
* In case of infinite deadline period cancel it, for 0 warn and notify once (with max counts), and
744+
* for non-infinite positive values store period.
745+
*/
746+
void configure_deadline_timer_();
747+
748+
/**
749+
* Notifies listeners that a deadline has been missed.
750+
*/
751+
void notify_deadline_missed_nts_();
752+
753+
void create_history(
754+
const std::shared_ptr<IPayloadPool>& payload_pool,
755+
const std::shared_ptr<IChangePool>& change_pool);
756+
739757
DataWriterQos get_datawriter_qos_from_settings(
740758
const DataWriterQos& qos);
741759

0 commit comments

Comments
 (0)