Skip to content

Commit ce8692f

Browse files
zesk1999mergify[bot]
authored andcommitted
Handle maximum deadline misses case (#6016)
* Refs #23289. Handle maximum deadline misses case. Data writer implementation. Signed-off-by: zesk1999 <[email protected]> * Refs #23289. Handle maximum deadline misses case. Data reader implementation. Signed-off-by: zesk1999 <[email protected]> * Refs #23289. Handle maximum deadline misses case. Tests. Signed-off-by: zesk1999 <[email protected]> --------- Signed-off-by: zesk1999 <[email protected]> (cherry picked from commit 3230d1d) # Conflicts: # src/cpp/fastdds/publisher/DataWriterImpl.cpp # src/cpp/fastdds/publisher/DataWriterImpl.hpp # src/cpp/fastdds/subscriber/DataReaderImpl.cpp # test/blackbox/CMakeLists.txt # test/blackbox/common/BlackboxTestsDeadlineQos.cpp
1 parent a552a33 commit ce8692f

File tree

9 files changed

+886
-69
lines changed

9 files changed

+886
-69
lines changed

src/cpp/fastdds/publisher/DataWriterImpl.cpp

Lines changed: 144 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,13 @@
5252
#include <rtps/RTPSDomainImpl.hpp>
5353
#ifdef FASTDDS_STATISTICS
5454
#include <statistics/fastdds/domain/DomainParticipantImpl.hpp>
55+
<<<<<<< HEAD
5556
#include <statistics/types/monitorservice_types.h>
5657
#endif //FASTDDS_STATISTICS
58+
=======
59+
#include <statistics/types/monitorservice_types.hpp>
60+
#endif // FASTDDS_STATISTICS
61+
>>>>>>> 3230d1d7 (Handle maximum deadline misses case (#6016))
5762

5863
using namespace eprosima::fastrtps;
5964
using namespace eprosima::fastrtps::rtps;
@@ -397,12 +402,16 @@ ReturnCode_t DataWriterImpl::enable()
397402
// In case it has been loaded from the persistence DB, rebuild instances on history
398403
history_.rebuild_instances();
399404

405+
<<<<<<< HEAD
400406
deadline_timer_ = new TimedEvent(publisher_->get_participant()->get_resource_event(),
401407
[&]() -> bool
402408
{
403409
return deadline_missed();
404410
},
405411
qos_.deadline().period.to_ns() * 1e-6);
412+
=======
413+
configure_deadline_timer_();
414+
>>>>>>> 3230d1d7 (Handle maximum deadline misses case (#6016))
406415

407416
lifespan_timer_ = new TimedEvent(publisher_->get_participant()->get_resource_event(),
408417
[&]() -> bool
@@ -642,8 +651,8 @@ ReturnCode_t DataWriterImpl::check_write_preconditions(
642651
type_.get()->getKey(data, &instance_handle, is_key_protected);
643652
}
644653

645-
//Check if the Handle is different from the special value HANDLE_NIL and
646-
//does not correspond with the instance referred by the data
654+
// Check if the Handle is different from the special value HANDLE_NIL and
655+
// does not correspond with the instance referred by the data
647656
if (handle.isDefined() && handle != instance_handle)
648657
{
649658
return ReturnCode_t::RETCODE_PRECONDITION_NOT_MET;
@@ -992,7 +1001,12 @@ ReturnCode_t DataWriterImpl::perform_create_new_change(
9921001
}
9931002
}
9941003

1004+
<<<<<<< HEAD
9951005
CacheChange_t* ch = writer_->new_change(change_kind, handle);
1006+
=======
1007+
// create_change seeds the next per-instance deadline and reschedules the timer for the next sample
1008+
CacheChange_t* ch = history_->create_change(change_kind, handle);
1009+
>>>>>>> 3230d1d7 (Handle maximum deadline misses case (#6016))
9961010
if (ch != nullptr)
9971011
{
9981012
payload.move_into_change(*ch);
@@ -1024,7 +1038,12 @@ ReturnCode_t DataWriterImpl::perform_create_new_change(
10241038
return ReturnCode_t::RETCODE_TIMEOUT;
10251039
}
10261040

1041+
<<<<<<< HEAD
10271042
if (qos_.deadline().period != c_TimeInfinite)
1043+
=======
1044+
if (qos_.deadline().period.to_ns() > 0 && qos_.deadline().period != dds::c_TimeInfinite &&
1045+
deadline_missed_status_.total_count < std::numeric_limits<uint32_t>::max())
1046+
>>>>>>> 3230d1d7 (Handle maximum deadline misses case (#6016))
10281047
{
10291048
if (!history_.set_next_deadline(
10301049
handle,
@@ -1135,7 +1154,7 @@ void DataWriterImpl::publisher_qos_updated()
11351154
{
11361155
if (writer_ != nullptr)
11371156
{
1138-
//NOTIFY THE BUILTIN PROTOCOLS THAT THE WRITER HAS CHANGED
1157+
// NOTIFY THE BUILTIN PROTOCOLS THAT THE WRITER HAS CHANGED
11391158
WriterQos wqos = qos_.get_writerqos(get_publisher()->get_qos(), topic_->get_qos());
11401159
publisher_->rtps_participant()->updateWriter(writer_, get_topic_attributes(qos_, *topic_, type_), wqos);
11411160
}
@@ -1170,6 +1189,9 @@ ReturnCode_t DataWriterImpl::set_qos(
11701189
return ReturnCode_t::RETCODE_IMMUTABLE_POLICY;
11711190
}
11721191

1192+
// Take a snapshot of the current QoS before mutating it
1193+
const DataWriterQos old_qos = qos_;
1194+
11731195
set_qos(qos_, qos_to_set, !enabled);
11741196

11751197
if (enabled)
@@ -1185,33 +1207,43 @@ ReturnCode_t DataWriterImpl::set_qos(
11851207
writer_->updateAttributes(w_att);
11861208
}
11871209

1210+
<<<<<<< HEAD
11881211
//Notify the participant that a Writer has changed its QOS
11891212
fastrtps::TopicAttributes topic_att = get_topic_attributes(qos_, *topic_, type_);
1213+
=======
1214+
// Notify the participant that a Writer has changed its QOS
1215+
>>>>>>> 3230d1d7 (Handle maximum deadline misses case (#6016))
11901216
WriterQos wqos = qos_.get_writerqos(get_publisher()->get_qos(), topic_->get_qos());
11911217
publisher_->rtps_participant()->updateWriter(writer_, topic_att, wqos);
11921218

1219+
<<<<<<< HEAD
11931220
// Deadline
11941221
if (qos_.deadline().period != c_TimeInfinite)
1222+
=======
1223+
// If the deadline period actually changed, (re)configure the timer.
1224+
if (old_qos.deadline().period != qos_.deadline().period)
1225+
>>>>>>> 3230d1d7 (Handle maximum deadline misses case (#6016))
11951226
{
1196-
deadline_duration_us_ =
1197-
duration<double, std::ratio<1, 1000000>>(qos_.deadline().period.to_ns() * 1e-3);
1198-
deadline_timer_->update_interval_millisec(qos_.deadline().period.to_ns() * 1e-6);
1199-
}
1200-
else
1201-
{
1202-
deadline_timer_->cancel_timer();
1227+
configure_deadline_timer_();
12031228
}
12041229

12051230
// Lifespan
1231+
<<<<<<< HEAD
12061232
if (qos_.lifespan().duration != c_TimeInfinite)
1233+
=======
1234+
if (old_qos.lifespan().duration != qos_.lifespan().duration)
1235+
>>>>>>> 3230d1d7 (Handle maximum deadline misses case (#6016))
12071236
{
1208-
lifespan_duration_us_ =
1209-
duration<double, std::ratio<1, 1000000>>(qos_.lifespan().duration.to_ns() * 1e-3);
1210-
lifespan_timer_->update_interval_millisec(qos_.lifespan().duration.to_ns() * 1e-6);
1211-
}
1212-
else
1213-
{
1214-
lifespan_timer_->cancel_timer();
1237+
if (qos_.lifespan().duration != dds::c_TimeInfinite)
1238+
{
1239+
lifespan_duration_us_ =
1240+
duration<double, std::ratio<1, 1000000>>(qos_.lifespan().duration.to_ns() * 1e-3);
1241+
lifespan_timer_->update_interval_millisec(qos_.lifespan().duration.to_ns() * 1e-6);
1242+
}
1243+
else
1244+
{
1245+
lifespan_timer_->cancel_timer();
1246+
}
12151247
}
12161248
}
12171249

@@ -1282,8 +1314,13 @@ void DataWriterImpl::InnerDataWriterListener::on_offered_incompatible_qos(
12821314
}
12831315

12841316
#ifdef FASTDDS_STATISTICS
1317+
<<<<<<< HEAD
12851318
notify_status_observer(statistics::INCOMPATIBLE_QOS);
12861319
#endif //FASTDDS_STATISTICS
1320+
=======
1321+
notify_status_observer(statistics::StatusKind::INCOMPATIBLE_QOS);
1322+
#endif // FASTDDS_STATISTICS
1323+
>>>>>>> 3230d1d7 (Handle maximum deadline misses case (#6016))
12871324

12881325
data_writer_->user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true);
12891326
}
@@ -1321,8 +1358,13 @@ void DataWriterImpl::InnerDataWriterListener::on_liveliness_lost(
13211358
}
13221359

13231360
#ifdef FASTDDS_STATISTICS
1361+
<<<<<<< HEAD
13241362
notify_status_observer(statistics::LIVELINESS_LOST);
13251363
#endif //FASTDDS_STATISTICS
1364+
=======
1365+
notify_status_observer(statistics::StatusKind::LIVELINESS_LOST);
1366+
#endif // FASTDDS_STATISTICS
1367+
>>>>>>> 3230d1d7 (Handle maximum deadline misses case (#6016))
13261368

13271369
data_writer_->user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true);
13281370
}
@@ -1366,7 +1408,7 @@ void DataWriterImpl::InnerDataWriterListener::notify_status_observer(
13661408
}
13671409
}
13681410

1369-
#endif //FASTDDS_STATISTICS
1411+
#endif // FASTDDS_STATISTICS
13701412

13711413
ReturnCode_t DataWriterImpl::wait_for_acknowledgments(
13721414
const Duration_t& max_wait)
@@ -1459,10 +1501,17 @@ ReturnCode_t DataWriterImpl::get_publication_matched_status(
14591501

14601502
bool DataWriterImpl::deadline_timer_reschedule()
14611503
{
1504+
<<<<<<< HEAD
14621505
assert(qos_.deadline().period != c_TimeInfinite);
14631506

1507+
=======
1508+
>>>>>>> 3230d1d7 (Handle maximum deadline misses case (#6016))
14641509
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
14651510

1511+
assert(qos_.deadline().period != dds::c_TimeInfinite);
1512+
assert(deadline_timer_ != nullptr);
1513+
assert(deadline_missed_status_.total_count < std::numeric_limits<uint32_t>::max());
1514+
14661515
steady_clock::time_point next_deadline_us;
14671516
if (!history_.get_next_deadline(timer_owner_, next_deadline_us))
14681517
{
@@ -1475,28 +1524,99 @@ bool DataWriterImpl::deadline_timer_reschedule()
14751524
return true;
14761525
}
14771526

1478-
bool DataWriterImpl::deadline_missed()
1527+
void DataWriterImpl::configure_deadline_timer_()
14791528
{
1529+
<<<<<<< HEAD
14801530
assert(qos_.deadline().period != c_TimeInfinite);
14811531

1532+
=======
1533+
>>>>>>> 3230d1d7 (Handle maximum deadline misses case (#6016))
14821534
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
14831535

1484-
deadline_missed_status_.total_count++;
1485-
deadline_missed_status_.total_count_change++;
1486-
deadline_missed_status_.last_instance_handle = timer_owner_;
1536+
// Create the timer once
1537+
if (deadline_timer_ == nullptr)
1538+
{
1539+
deadline_timer_ = new TimedEvent(
1540+
publisher_->rtps_participant()->get_resource_event(),
1541+
[this]() -> bool
1542+
{
1543+
return deadline_missed();
1544+
},
1545+
// Park timer with a huge interval (prevents spurious callbacks); we'll arm/cancel explicitly
1546+
std::numeric_limits<double>::max()
1547+
);
1548+
}
1549+
1550+
// Handle "infinite" and "zero" outside the callback
1551+
if (qos_.deadline().period == dds::c_TimeInfinite)
1552+
{
1553+
deadline_duration_us_ = std::chrono::duration<double, std::micro>::max();
1554+
deadline_timer_->cancel_timer();
1555+
return;
1556+
}
1557+
1558+
deadline_duration_us_ =
1559+
std::chrono::duration<double, std::ratio<1, 1000000>>(qos_.deadline().period.to_ns() * 1e-3);
1560+
1561+
if (qos_.deadline().period.to_ns() == 0)
1562+
{
1563+
deadline_timer_->cancel_timer();
1564+
1565+
deadline_missed_status_.total_count = std::numeric_limits<uint32_t>::max();
1566+
deadline_missed_status_.total_count_change = std::numeric_limits<uint32_t>::max();
1567+
EPROSIMA_LOG_WARNING(
1568+
DATA_WRITER,
1569+
"Deadline period is 0, it will be ignored from now on.");
1570+
1571+
// Bump once and notify listener exactly once.
1572+
notify_deadline_missed_nts_();
1573+
return;
1574+
}
1575+
1576+
deadline_timer_->update_interval_millisec(qos_.deadline().period.to_ns() * 1e-6);
1577+
}
1578+
1579+
void DataWriterImpl::notify_deadline_missed_nts_()
1580+
{
14871581
StatusMask notify_status = StatusMask::offered_deadline_missed();
1488-
auto listener = get_listener_for(notify_status);
1489-
if (nullptr != listener)
1582+
if (auto* listener = get_listener_for(notify_status))
14901583
{
14911584
listener->on_offered_deadline_missed(user_datawriter_, deadline_missed_status_);
14921585
deadline_missed_status_.total_count_change = 0;
14931586
}
14941587

14951588
#ifdef FASTDDS_STATISTICS
1589+
<<<<<<< HEAD
14961590
writer_listener_.notify_status_observer(statistics::DEADLINE_MISSED);
14971591
#endif //FASTDDS_STATISTICS
1592+
=======
1593+
writer_listener_.notify_status_observer(statistics::StatusKind::DEADLINE_MISSED);
1594+
#endif // FASTDDS_STATISTICS
1595+
>>>>>>> 3230d1d7 (Handle maximum deadline misses case (#6016))
14981596

14991597
user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true);
1598+
}
1599+
1600+
bool DataWriterImpl::deadline_missed()
1601+
{
1602+
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
1603+
1604+
assert(qos_.deadline().period != dds::c_TimeInfinite);
1605+
1606+
deadline_missed_status_.total_count++;
1607+
deadline_missed_status_.total_count_change++;
1608+
deadline_missed_status_.last_instance_handle = timer_owner_;
1609+
1610+
notify_deadline_missed_nts_();
1611+
1612+
// If we just reached the max -> log ONCE, stop timer, and bail.
1613+
if (deadline_missed_status_.total_count == std::numeric_limits<uint32_t>::max())
1614+
{
1615+
EPROSIMA_LOG_WARNING(DATA_WRITER,
1616+
"Maximum number of deadline missed messages reached. Stopping deadline timer.");
1617+
deadline_timer_->cancel_timer();
1618+
return false; // do not reschedule
1619+
}
15001620

15011621
if (!history_.set_next_deadline(
15021622
timer_owner_,

src/cpp/fastdds/publisher/DataWriterImpl.hpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#define _FASTRTPS_DATAWRITERIMPL_HPP_
2121

2222
#include <memory>
23+
#include <mutex>
2324

2425
#include <fastdds/dds/core/status/BaseStatus.hpp>
2526
#include <fastdds/dds/core/status/IncompatibleQosStatus.hpp>
@@ -578,6 +579,7 @@ class DataWriterImpl : protected rtps::IReaderDataFilter
578579

579580
/**
580581
* @brief A method to reschedule the deadline timer
582+
* @return true if deadline rescheduling succeeded, false otherwise
581583
*/
582584
bool deadline_timer_reschedule();
583585

@@ -736,6 +738,25 @@ class DataWriterImpl : protected rtps::IReaderDataFilter
736738

737739
private:
738740

741+
<<<<<<< HEAD
742+
=======
743+
/**
744+
* (Re)configures the deadline timer:
745+
* In case of infinite deadline period cancel it, for 0 warn and notify once (with max counts), and
746+
* for non-infinite positive values store period.
747+
*/
748+
void configure_deadline_timer_();
749+
750+
/**
751+
* Notifies listeners that a deadline has been missed.
752+
*/
753+
void notify_deadline_missed_nts_();
754+
755+
void create_history(
756+
const std::shared_ptr<IPayloadPool>& payload_pool,
757+
const std::shared_ptr<IChangePool>& change_pool);
758+
759+
>>>>>>> 3230d1d7 (Handle maximum deadline misses case (#6016))
739760
DataWriterQos get_datawriter_qos_from_settings(
740761
const DataWriterQos& qos);
741762

0 commit comments

Comments
 (0)