Skip to content

Commit 2b19adf

Browse files
zesk1999juanlofer-eprosima
authored andcommitted
Handle maximum deadline misses case (#6016)
* Refs #23289. Handle maximum deadline misses case. Data writer implementation. Signed-off-by: zesk1999 <[email protected]> * Refs #23289. Handle maximum deadline misses case. Data reader implementation. Signed-off-by: zesk1999 <[email protected]> * Refs #23289. Handle maximum deadline misses case. Tests. Signed-off-by: zesk1999 <[email protected]> --------- Signed-off-by: zesk1999 <[email protected]> (cherry picked from commit 3230d1d) # Conflicts: # src/cpp/fastdds/subscriber/DataReaderImpl.cpp # test/blackbox/CMakeLists.txt
1 parent 985ceeb commit 2b19adf

File tree

9 files changed

+628
-113
lines changed

9 files changed

+628
-113
lines changed

src/cpp/fastdds/publisher/DataWriterImpl.cpp

Lines changed: 103 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
#ifdef FASTDDS_STATISTICS
5959
#include <statistics/fastdds/domain/DomainParticipantImpl.hpp>
6060
#include <statistics/types/monitorservice_types.hpp>
61-
#endif //FASTDDS_STATISTICS
61+
#endif // FASTDDS_STATISTICS
6262

6363
using namespace eprosima::fastdds;
6464
using namespace eprosima::fastdds::rtps;
@@ -435,12 +435,7 @@ ReturnCode_t DataWriterImpl::enable()
435435
// In case it has been loaded from the persistence DB, rebuild instances on history
436436
history_->rebuild_instances();
437437

438-
deadline_timer_ = new TimedEvent(publisher_->rtps_participant()->get_resource_event(),
439-
[&]() -> bool
440-
{
441-
return deadline_missed();
442-
},
443-
qos_.deadline().period.to_ns() * 1e-6);
438+
configure_deadline_timer_();
444439

445440
lifespan_timer_ = new TimedEvent(publisher_->rtps_participant()->get_resource_event(),
446441
[&]() -> bool
@@ -683,8 +678,8 @@ ReturnCode_t DataWriterImpl::check_write_preconditions(
683678
type_.get()->compute_key(data, instance_handle, is_key_protected);
684679
}
685680

686-
//Check if the Handle is different from the special value HANDLE_NIL and
687-
//does not correspond with the instance referred by the data
681+
// Check if the Handle is different from the special value HANDLE_NIL and
682+
// does not correspond with the instance referred by the data
688683
if (handle.isDefined() && handle != instance_handle)
689684
{
690685
return RETCODE_PRECONDITION_NOT_MET;
@@ -1036,6 +1031,7 @@ ReturnCode_t DataWriterImpl::perform_create_new_change(
10361031
}
10371032
}
10381033

1034+
// create_change seeds the next per-instance deadline and reschedules the timer for the next sample
10391035
CacheChange_t* ch = history_->create_change(change_kind, handle);
10401036
if (ch != nullptr)
10411037
{
@@ -1068,7 +1064,8 @@ ReturnCode_t DataWriterImpl::perform_create_new_change(
10681064
return RETCODE_TIMEOUT;
10691065
}
10701066

1071-
if (qos_.deadline().period != dds::c_TimeInfinite)
1067+
if (qos_.deadline().period.to_ns() > 0 && qos_.deadline().period != dds::c_TimeInfinite &&
1068+
deadline_missed_status_.total_count < std::numeric_limits<uint32_t>::max())
10721069
{
10731070
if (!history_->set_next_deadline(
10741071
handle,
@@ -1179,7 +1176,7 @@ void DataWriterImpl::publisher_qos_updated()
11791176
{
11801177
if (writer_ != nullptr)
11811178
{
1182-
//NOTIFY THE BUILTIN PROTOCOLS THAT THE WRITER HAS CHANGED
1179+
// NOTIFY THE BUILTIN PROTOCOLS THAT THE WRITER HAS CHANGED
11831180
WriterQos wqos = qos_.get_writerqos(get_publisher()->get_qos(), topic_->get_qos());
11841181
publisher_->rtps_participant()->update_writer(writer_, wqos);
11851182
}
@@ -1214,6 +1211,9 @@ ReturnCode_t DataWriterImpl::set_qos(
12141211
return RETCODE_IMMUTABLE_POLICY;
12151212
}
12161213

1214+
// Take a snapshot of the current QoS before mutating it
1215+
const DataWriterQos old_qos = qos_;
1216+
12171217
set_qos(qos_, qos_to_set, !enabled);
12181218

12191219
if (enabled)
@@ -1229,32 +1229,29 @@ ReturnCode_t DataWriterImpl::set_qos(
12291229
writer_->update_attributes(w_att);
12301230
}
12311231

1232-
//Notify the participant that a Writer has changed its QOS
1232+
// Notify the participant that a Writer has changed its QOS
12331233
WriterQos wqos = qos_.get_writerqos(get_publisher()->get_qos(), topic_->get_qos());
12341234
publisher_->rtps_participant()->update_writer(writer_, wqos);
12351235

1236-
// Deadline
1237-
if (qos_.deadline().period != dds::c_TimeInfinite)
1236+
// If the deadline period actually changed, (re)configure the timer.
1237+
if (old_qos.deadline().period != qos_.deadline().period)
12381238
{
1239-
deadline_duration_us_ =
1240-
duration<double, std::ratio<1, 1000000>>(qos_.deadline().period.to_ns() * 1e-3);
1241-
deadline_timer_->update_interval_millisec(qos_.deadline().period.to_ns() * 1e-6);
1242-
}
1243-
else
1244-
{
1245-
deadline_timer_->cancel_timer();
1239+
configure_deadline_timer_();
12461240
}
12471241

12481242
// Lifespan
1249-
if (qos_.lifespan().duration != dds::c_TimeInfinite)
1243+
if (old_qos.lifespan().duration != qos_.lifespan().duration)
12501244
{
1251-
lifespan_duration_us_ =
1252-
duration<double, std::ratio<1, 1000000>>(qos_.lifespan().duration.to_ns() * 1e-3);
1253-
lifespan_timer_->update_interval_millisec(qos_.lifespan().duration.to_ns() * 1e-6);
1254-
}
1255-
else
1256-
{
1257-
lifespan_timer_->cancel_timer();
1245+
if (qos_.lifespan().duration != dds::c_TimeInfinite)
1246+
{
1247+
lifespan_duration_us_ =
1248+
duration<double, std::ratio<1, 1000000>>(qos_.lifespan().duration.to_ns() * 1e-3);
1249+
lifespan_timer_->update_interval_millisec(qos_.lifespan().duration.to_ns() * 1e-6);
1250+
}
1251+
else
1252+
{
1253+
lifespan_timer_->cancel_timer();
1254+
}
12581255
}
12591256
}
12601257

@@ -1326,7 +1323,7 @@ void DataWriterImpl::InnerDataWriterListener::on_offered_incompatible_qos(
13261323

13271324
#ifdef FASTDDS_STATISTICS
13281325
notify_status_observer(statistics::StatusKind::INCOMPATIBLE_QOS);
1329-
#endif //FASTDDS_STATISTICS
1326+
#endif // FASTDDS_STATISTICS
13301327

13311328
data_writer_->user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true);
13321329
}
@@ -1365,7 +1362,7 @@ void DataWriterImpl::InnerDataWriterListener::on_liveliness_lost(
13651362

13661363
#ifdef FASTDDS_STATISTICS
13671364
notify_status_observer(statistics::StatusKind::LIVELINESS_LOST);
1368-
#endif //FASTDDS_STATISTICS
1365+
#endif // FASTDDS_STATISTICS
13691366

13701367
data_writer_->user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true);
13711368
}
@@ -1409,7 +1406,7 @@ void DataWriterImpl::InnerDataWriterListener::notify_status_observer(
14091406
}
14101407
}
14111408

1412-
#endif //FASTDDS_STATISTICS
1409+
#endif // FASTDDS_STATISTICS
14131410

14141411
ReturnCode_t DataWriterImpl::wait_for_acknowledgments(
14151412
const dds::Duration_t& max_wait)
@@ -1502,10 +1499,12 @@ ReturnCode_t DataWriterImpl::get_publication_matched_status(
15021499

15031500
bool DataWriterImpl::deadline_timer_reschedule()
15041501
{
1505-
assert(qos_.deadline().period != dds::c_TimeInfinite);
1506-
15071502
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
15081503

1504+
assert(qos_.deadline().period != dds::c_TimeInfinite);
1505+
assert(deadline_timer_ != nullptr);
1506+
assert(deadline_missed_status_.total_count < std::numeric_limits<uint32_t>::max());
1507+
15091508
steady_clock::time_point next_deadline_us;
15101509
if (!history_->get_next_deadline(timer_owner_, next_deadline_us))
15111510
{
@@ -1518,28 +1517,89 @@ bool DataWriterImpl::deadline_timer_reschedule()
15181517
return true;
15191518
}
15201519

1521-
bool DataWriterImpl::deadline_missed()
1520+
void DataWriterImpl::configure_deadline_timer_()
15221521
{
1523-
assert(qos_.deadline().period != dds::c_TimeInfinite);
1524-
15251522
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
15261523

1527-
deadline_missed_status_.total_count++;
1528-
deadline_missed_status_.total_count_change++;
1529-
deadline_missed_status_.last_instance_handle = timer_owner_;
1524+
// Create the timer once
1525+
if (deadline_timer_ == nullptr)
1526+
{
1527+
deadline_timer_ = new TimedEvent(
1528+
publisher_->rtps_participant()->get_resource_event(),
1529+
[this]() -> bool
1530+
{
1531+
return deadline_missed();
1532+
},
1533+
// Park timer with a huge interval (prevents spurious callbacks); we'll arm/cancel explicitly
1534+
std::numeric_limits<double>::max()
1535+
);
1536+
}
1537+
1538+
// Handle "infinite" and "zero" outside the callback
1539+
if (qos_.deadline().period == dds::c_TimeInfinite)
1540+
{
1541+
deadline_duration_us_ = std::chrono::duration<double, std::micro>::max();
1542+
deadline_timer_->cancel_timer();
1543+
return;
1544+
}
1545+
1546+
deadline_duration_us_ =
1547+
std::chrono::duration<double, std::ratio<1, 1000000>>(qos_.deadline().period.to_ns() * 1e-3);
1548+
1549+
if (qos_.deadline().period.to_ns() == 0)
1550+
{
1551+
deadline_timer_->cancel_timer();
1552+
1553+
deadline_missed_status_.total_count = std::numeric_limits<uint32_t>::max();
1554+
deadline_missed_status_.total_count_change = std::numeric_limits<uint32_t>::max();
1555+
EPROSIMA_LOG_WARNING(
1556+
DATA_WRITER,
1557+
"Deadline period is 0, it will be ignored from now on.");
1558+
1559+
// Bump once and notify listener exactly once.
1560+
notify_deadline_missed_nts_();
1561+
return;
1562+
}
1563+
1564+
deadline_timer_->update_interval_millisec(qos_.deadline().period.to_ns() * 1e-6);
1565+
}
1566+
1567+
void DataWriterImpl::notify_deadline_missed_nts_()
1568+
{
15301569
StatusMask notify_status = StatusMask::offered_deadline_missed();
1531-
auto listener = get_listener_for(notify_status);
1532-
if (nullptr != listener)
1570+
if (auto* listener = get_listener_for(notify_status))
15331571
{
15341572
listener->on_offered_deadline_missed(user_datawriter_, deadline_missed_status_);
15351573
deadline_missed_status_.total_count_change = 0;
15361574
}
15371575

15381576
#ifdef FASTDDS_STATISTICS
15391577
writer_listener_.notify_status_observer(statistics::StatusKind::DEADLINE_MISSED);
1540-
#endif //FASTDDS_STATISTICS
1578+
#endif // FASTDDS_STATISTICS
15411579

15421580
user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true);
1581+
}
1582+
1583+
bool DataWriterImpl::deadline_missed()
1584+
{
1585+
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
1586+
1587+
assert(qos_.deadline().period != dds::c_TimeInfinite);
1588+
1589+
deadline_missed_status_.total_count++;
1590+
deadline_missed_status_.total_count_change++;
1591+
deadline_missed_status_.last_instance_handle = timer_owner_;
1592+
1593+
notify_deadline_missed_nts_();
1594+
1595+
// If we just reached the max -> log ONCE, stop timer, and bail.
1596+
if (deadline_missed_status_.total_count == std::numeric_limits<uint32_t>::max())
1597+
{
1598+
EPROSIMA_LOG_WARNING(DATA_WRITER,
1599+
"Maximum number of deadline missed messages reached. Stopping deadline timer.");
1600+
deadline_timer_->cancel_timer();
1601+
return false; // do not reschedule
1602+
}
15431603

15441604
if (!history_->set_next_deadline(
15451605
timer_owner_,

src/cpp/fastdds/publisher/DataWriterImpl.hpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#define _FASTDDS_DATAWRITERIMPL_HPP_
2121

2222
#include <memory>
23+
#include <mutex>
2324

2425
#include <fastdds/dds/builtin/topic/PublicationBuiltinTopicData.hpp>
2526
#include <fastdds/dds/core/ReturnCode.hpp>
@@ -604,6 +605,7 @@ class DataWriterImpl : protected rtps::IReaderDataFilter
604605

605606
/**
606607
* @brief A method to reschedule the deadline timer
608+
* @return true if deadline rescheduling succeeded, false otherwise
607609
*/
608610
bool deadline_timer_reschedule();
609611

@@ -732,6 +734,18 @@ class DataWriterImpl : protected rtps::IReaderDataFilter
732734

733735
private:
734736

737+
/**
738+
* (Re)configures the deadline timer:
739+
* In case of infinite deadline period cancel it, for 0 warn and notify once (with max counts), and
740+
* for non-infinite positive values store period.
741+
*/
742+
void configure_deadline_timer_();
743+
744+
/**
745+
* Notifies listeners that a deadline has been missed.
746+
*/
747+
void notify_deadline_missed_nts_();
748+
735749
void create_history(
736750
const std::shared_ptr<IPayloadPool>& payload_pool,
737751
const std::shared_ptr<IChangePool>& change_pool);

0 commit comments

Comments
 (0)