Skip to content

Commit 4505cb3

Browse files
committed
Address review: deadline timer (∞/0 handling), count_change reset, lifespan/deadline tests (#23289)
1 parent ca60b64 commit 4505cb3

File tree

5 files changed

+2570
-153
lines changed

5 files changed

+2570
-153
lines changed

src/cpp/fastdds/publisher/DataWriterImpl.cpp

Lines changed: 82 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,7 @@ ReturnCode_t DataWriterImpl::enable()
431431
}
432432

433433
writer_ = BaseWriter::downcast(writer);
434+
434435
// Set DataWriterImpl as the implementer of the
435436
// IReaderDataFilter interface
436437
writer_->reader_data_filter(this);
@@ -439,7 +440,7 @@ ReturnCode_t DataWriterImpl::enable()
439440
history_->rebuild_instances();
440441

441442
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
442-
configure_deadline_timer_locked_();
443+
configure_deadline_timer_();
443444

444445
lifespan_timer_ = new TimedEvent(publisher_->rtps_participant()->get_resource_event(),
445446
[&]() -> bool
@@ -1068,7 +1069,7 @@ ReturnCode_t DataWriterImpl::perform_create_new_change(
10681069
return RETCODE_TIMEOUT;
10691070
}
10701071

1071-
if (deadline_timer_ != nullptr && qos_.deadline().period.to_ns() > 0 && qos_.deadline().period != dds::c_TimeInfinite && deadline_missed_status_.total_count < std::numeric_limits<int32_t>::max())
1072+
if (deadline_timer_ != nullptr && qos_.deadline().period.to_ns() > 0 && qos_.deadline().period != dds::c_TimeInfinite && deadline_missed_status_.total_count < std::numeric_limits<uint32_t>::max())
10721073
{
10731074
if (!history_->set_next_deadline(
10741075
handle,
@@ -1214,12 +1215,16 @@ ReturnCode_t DataWriterImpl::set_qos(
12141215
return RETCODE_IMMUTABLE_POLICY;
12151216
}
12161217

1217-
const dds::Duration_t prev_deadline = qos_.deadline().period;
1218+
// Take a snapshot of the current QoS before mutating it
1219+
const DataWriterQos old_qos = qos_;
12181220

12191221
set_qos(qos_, qos_to_set, !enabled);
12201222

12211223
if (enabled)
12221224
{
1225+
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
1226+
// Locks after we checked that writer exists
1227+
12231228
int32_t transport_priority = writer_->get_transport_priority();
12241229

12251230
if ((qos_.reliability().kind == ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS &&
@@ -1239,30 +1244,29 @@ ReturnCode_t DataWriterImpl::set_qos(
12391244
WriterQos wqos = qos_.get_writerqos(get_publisher()->get_qos(), topic_->get_qos());
12401245
publisher_->rtps_participant()->update_writer(writer_, wqos);
12411246

1242-
1243-
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
1244-
1245-
// If the deadline period actually changed, (re) the timer.
1246-
if (qos_.deadline().period != prev_deadline)
1247+
// If the deadline period actually changed, (re)configure the timer.
1248+
if (old_qos.deadline().period != qos_.deadline().period)
12471249
{
12481250
// Resetting total count value whenever the deadline period changes
12491251
deadline_missed_status_.total_count = 0;
12501252
deadline_missed_status_.total_count_change = 0;
12511253
deadline_missed_status_.last_instance_handle = InstanceHandle_t();
12521254

1253-
configure_deadline_timer_locked_();
1255+
configure_deadline_timer_();
12541256
}
12551257

12561258
// Lifespan
1257-
if (qos_.lifespan().duration != dds::c_TimeInfinite)
1258-
{
1259-
lifespan_duration_us_ =
1260-
duration<double, std::ratio<1, 1000000>>(qos_.lifespan().duration.to_ns() * 1e-3);
1261-
lifespan_timer_->update_interval_millisec(qos_.lifespan().duration.to_ns() * 1e-6);
1262-
}
1263-
else
1264-
{
1265-
lifespan_timer_->cancel_timer();
1259+
if (old_qos.lifespan().duration != qos_.lifespan().duration) {
1260+
if (qos_.lifespan().duration != dds::c_TimeInfinite)
1261+
{
1262+
lifespan_duration_us_ =
1263+
duration<double, std::ratio<1, 1000000>>(qos_.lifespan().duration.to_ns() * 1e-3);
1264+
lifespan_timer_->update_interval_millisec(qos_.lifespan().duration.to_ns() * 1e-6);
1265+
}
1266+
else
1267+
{
1268+
lifespan_timer_->cancel_timer();
1269+
}
12661270
}
12671271
}
12681272

@@ -1555,7 +1559,7 @@ ReturnCode_t DataWriterImpl::set_related_datareader(
15551559
bool DataWriterImpl::deadline_timer_reschedule()
15561560
{
15571561
assert(qos_.deadline().period != dds::c_TimeInfinite);
1558-
if (deadline_timer_ == nullptr || deadline_missed_status_.total_count >= std::numeric_limits<int32_t>::max())
1562+
if (deadline_timer_ == nullptr || deadline_missed_status_.total_count >= std::numeric_limits<uint32_t>::max())
15591563
{
15601564
return false;
15611565
}
@@ -1569,128 +1573,109 @@ bool DataWriterImpl::deadline_timer_reschedule()
15691573
return false;
15701574
}
15711575

1576+
if (next_deadline_us <= std::chrono::steady_clock::now())
1577+
{
1578+
// Ignore uninitialized or stale deadlines; don't arm to avoid a spurious first miss
1579+
return false;
1580+
}
1581+
15721582
auto interval_ms = duration_cast<milliseconds>(next_deadline_us - steady_clock::now());
15731583
deadline_timer_->update_interval_millisec(static_cast<double>(interval_ms.count()));
15741584
return true;
15751585
}
15761586

1577-
/**
1578-
* This function tears down any existing deadline timer and creates a new one
1579-
* configured for the current deadline period. If the period is 0 or infinite,
1580-
* the timer is created to fire only once to log a warning and then cancel itself.
1581-
*/
1582-
void DataWriterImpl::configure_deadline_timer_locked_()
1587+
void DataWriterImpl::configure_deadline_timer_()
15831588
{
1584-
if (deadline_timer_ != nullptr)
1589+
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
1590+
1591+
// Create the timer once
1592+
if (deadline_timer_ == nullptr)
15851593
{
1586-
deadline_timer_->cancel_timer();
1587-
delete deadline_timer_;
1588-
deadline_timer_ = nullptr;
1594+
deadline_timer_ = new TimedEvent(
1595+
publisher_->rtps_participant()->get_resource_event(),
1596+
[this]() -> bool
1597+
{
1598+
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
1599+
1600+
// Normal path: process a deadline miss. This function will
1601+
// update counters, notify, and ask for reschedule as needed.
1602+
return deadline_missed();
1603+
},
1604+
(qos_.deadline().period == dds::c_TimeInfinite) ? std::numeric_limits<double>::max() : qos_.deadline().period.to_ns() * 1e-6
1605+
// In case of deadline period = infinite, multiplying it by 1e-6 could cause a problem
1606+
);
15891607
}
15901608

1591-
// If we had previously hit the max, keep timer disabled until QoS changes reset the count.
1592-
if (deadline_missed_status_.total_count >= std::numeric_limits<int32_t>::max())
1609+
// Handle "infinite" and "zero" outside the callback
1610+
if (qos_.deadline().period == dds::c_TimeInfinite)
15931611
{
1594-
return; // no new timer
1612+
deadline_timer_->cancel_timer();
1613+
return;
15951614
}
15961615

1597-
// Refresh cached duration and guards
1598-
deadline_duration_us_ =
1599-
std::chrono::duration<double, std::ratio<1, 1000000>>(qos_.deadline().period.to_ns() * 1e-3);
1616+
deadline_duration_us_ = std::chrono::duration<double, std::ratio<1, 1000000>>(qos_.deadline().period.to_ns() * 1e-3);
16001617

1601-
// Create the timer with a safe callback
1602-
deadline_timer_ = new TimedEvent(
1603-
publisher_->rtps_participant()->get_resource_event(),
1604-
[this]() -> bool
1605-
{
1606-
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
1618+
if (qos_.deadline().period.to_ns() == 0)
1619+
{
1620+
EPROSIMA_LOG_WARNING(
1621+
DATA_WRITER,
1622+
"Deadline period is 0, it will be ignored from now on. Timer is going to be canceled.");
16071623

1608-
if ((qos_.deadline().period.to_ns() == 0) || (qos_.deadline().period == dds::c_TimeInfinite))
1609-
{
1610-
1611-
EPROSIMA_LOG_WARNING(
1612-
DATA_WRITER,
1613-
"Deadline period is "
1614-
<< (qos_.deadline().period.to_ns() == 0 ? "0" : "infinite")
1615-
<< ", it will be ignored from now on. Timer is going to be canceled.");
1616-
deadline_missed_status_.total_count = std::numeric_limits<int32_t>::max();
1617-
1618-
if (deadline_timer_ != nullptr)
1619-
{
1620-
deadline_timer_->cancel_timer();
1621-
}
1622-
return false;
1623-
}
1624+
// Bump once and notify listener exactly once.
1625+
notify_deadline_missed_no_increment_();
16241626

1625-
// Normal path
1626-
return deadline_missed();
1627-
},
1628-
0.0);
1627+
deadline_missed_status_.total_count = std::numeric_limits<uint32_t>::max();
1628+
deadline_timer_->cancel_timer();
1629+
return;
1630+
}
16291631

1630-
if ((qos_.deadline().period.to_ns() > 0) && (qos_.deadline().period != dds::c_TimeInfinite))
1632+
if (deadline_timer_reschedule())
16311633
{
1632-
// Only arm if a next deadline is already computed; otherwise keep disabled to avoid spurious fire.
1633-
std::chrono::steady_clock::time_point next_tp;
1634-
if (history_->get_next_deadline(timer_owner_, next_tp))
1635-
{
1636-
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(
1637-
next_tp - std::chrono::steady_clock::now()).count();
1638-
if (ms < 0) ms = 0;
1639-
1640-
deadline_timer_->update_interval_millisec(static_cast<double>(ms));
1641-
deadline_timer_->restart_timer();
1642-
}
1643-
else
1644-
{
1645-
deadline_timer_->cancel_timer(); // keep it created but idle
1646-
}
1634+
deadline_timer_->restart_timer();
16471635
}
16481636
else
16491637
{
1650-
// 0 or infinite -> fire once to log and self-disable
1651-
deadline_timer_->restart_timer();
1638+
// Keep the timer object around but idle if there's no pending deadline
1639+
deadline_timer_->cancel_timer();
16521640
}
16531641
}
16541642

1643+
void DataWriterImpl::notify_deadline_missed_no_increment_()
1644+
{
1645+
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
1646+
1647+
StatusMask notify_status = StatusMask::offered_deadline_missed();
1648+
if (auto* listener = get_listener_for(notify_status))
1649+
{
1650+
listener->on_offered_deadline_missed(user_datawriter_, deadline_missed_status_);
1651+
}
1652+
user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true);
1653+
}
16551654

16561655
bool DataWriterImpl::deadline_missed()
16571656
{
16581657
assert(qos_.deadline().period != dds::c_TimeInfinite);
16591658

16601659
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
16611660

1662-
if (deadline_missed_status_.total_count >= std::numeric_limits<int32_t>::max() && deadline_timer_ != nullptr)
1663-
{
1664-
deadline_timer_->cancel_timer();
1665-
return false;
1666-
}
1667-
16681661
deadline_missed_status_.total_count++;
16691662
deadline_missed_status_.total_count_change++;
16701663
deadline_missed_status_.last_instance_handle = timer_owner_;
16711664

1665+
notify_deadline_missed_no_increment_();
1666+
16721667
// If we just reached the max -> log ONCE, stop timer, and bail.
1673-
if (deadline_missed_status_.total_count == std::numeric_limits<int32_t>::max())
1668+
if (deadline_missed_status_.total_count == std::numeric_limits<uint32_t>::max())
16741669
{
16751670
EPROSIMA_LOG_WARNING(DATA_WRITER, "Maximum number of deadline missed messages reached. Stopping deadline timer.");
16761671
deadline_timer_->cancel_timer();
16771672
return false; // do not reschedule
16781673
}
16791674

1680-
StatusMask notify_status = StatusMask::offered_deadline_missed();
1681-
auto listener = get_listener_for(notify_status);
1682-
if (nullptr != listener)
1683-
{
1684-
listener->on_offered_deadline_missed(user_datawriter_, deadline_missed_status_);
1685-
deadline_missed_status_.total_count_change = 0;
1686-
}
1687-
16881675
#ifdef FASTDDS_STATISTICS
16891676
writer_listener_.notify_status_observer(statistics::StatusKind::DEADLINE_MISSED);
16901677
#endif // FASTDDS_STATISTICS
16911678

1692-
user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true);
1693-
16941679
if (!history_->set_next_deadline(
16951680
timer_owner_,
16961681
steady_clock::now() + duration_cast<steady_clock::duration>(deadline_duration_us_)))
@@ -2547,4 +2532,4 @@ bool DataWriterImpl::is_relevant(
25472532

25482533
} // namespace dds
25492534
} // namespace fastdds
2550-
} // namespace eprosima
2535+
} // namespace eprosima

src/cpp/fastdds/publisher/DataWriterImpl.hpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -771,7 +771,17 @@ class DataWriterImpl : protected rtps::IReaderDataFilter
771771

772772
private:
773773

774-
void configure_deadline_timer_locked_();
774+
/**
775+
* This function tears down any existing deadline timer and creates a new one
776+
* configured for the current deadline period. If the period is 0 or infinite,
777+
* the timer is created to fire only once to log a warning and then cancel itself.
778+
*/
779+
void configure_deadline_timer_();
780+
781+
/**
782+
* Notifies listeners that a deadline has been missed without touching the counters.
783+
*/
784+
void notify_deadline_missed_no_increment_();
775785

776786
void create_history(
777787
const std::shared_ptr<IPayloadPool>& payload_pool,

0 commit comments

Comments
 (0)