Skip to content

Commit a6a5764

Browse files
committed
Refs #23289. Handle maximum deadline misses case.
Signed-off-by: zesk1999 <[email protected]>
1 parent 0135fcb commit a6a5764

File tree

3 files changed

+84
-39
lines changed

3 files changed

+84
-39
lines changed

src/cpp/fastdds/publisher/DataWriterImpl.cpp

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,6 @@ void DataWriterImpl::create_history(
248248

249249
ReturnCode_t DataWriterImpl::enable()
250250
{
251-
std::unique_lock<RecursiveTimedMutex> il(impl_mtx_);
252-
253251
assert(writer_ == nullptr);
254252

255253
auto history_att = DataWriterHistory::to_history_attributes(
@@ -519,8 +517,6 @@ ReturnCode_t DataWriterImpl::loan_sample(
519517
void*& sample,
520518
LoanInitializationKind initialization)
521519
{
522-
std::unique_lock<RecursiveTimedMutex> il(impl_mtx_);
523-
524520
// Block lowlevel writer
525521
auto max_blocking_time = steady_clock::now() +
526522
microseconds(rtps::TimeConv::Time_t2MicroSecondsInt64(qos_.reliability().max_blocking_time));
@@ -822,8 +818,6 @@ InstanceHandle_t DataWriterImpl::do_register_instance(
822818
const InstanceHandle_t instance_handle,
823819
WriteParams& wparams)
824820
{
825-
std::unique_lock<RecursiveTimedMutex> il(impl_mtx_);
826-
827821
// TODO(MiguelCompany): wparams should be used when propagating the register_instance operation to the DataReader.
828822
// See redmine issue #14494
829823
static_cast<void>(wparams);
@@ -871,8 +865,6 @@ ReturnCode_t DataWriterImpl::unregister_instance(
871865
const InstanceHandle_t& handle,
872866
bool dispose)
873867
{
874-
std::unique_lock<RecursiveTimedMutex> il(impl_mtx_);
875-
876868
// Preconditions
877869
InstanceHandle_t ih;
878870
ReturnCode_t returned_value = check_instance_preconditions(instance, handle, ih);
@@ -898,8 +890,6 @@ ReturnCode_t DataWriterImpl::unregister_instance_w_timestamp(
898890
const fastdds::dds::Time_t& timestamp,
899891
bool dispose)
900892
{
901-
std::unique_lock<RecursiveTimedMutex> il(impl_mtx_);
902-
903893
// Preconditions
904894
InstanceHandle_t instance_handle;
905895
ReturnCode_t ret = RETCODE_OK;
@@ -1078,7 +1068,8 @@ ReturnCode_t DataWriterImpl::perform_create_new_change(
10781068
return RETCODE_TIMEOUT;
10791069
}
10801070

1081-
if (qos_.deadline().period.to_ns() > 0 && qos_.deadline().period != dds::c_TimeInfinite && deadline_missed_status_.total_count < std::numeric_limits<uint32_t>::max())
1071+
if (qos_.deadline().period.to_ns() > 0 && qos_.deadline().period != dds::c_TimeInfinite &&
1072+
deadline_missed_status_.total_count < std::numeric_limits<uint32_t>::max())
10821073
{
10831074
if (!history_->set_next_deadline(
10841075
handle,
@@ -1187,8 +1178,6 @@ InstanceHandle_t DataWriterImpl::get_instance_handle() const
11871178

11881179
void DataWriterImpl::publisher_qos_updated()
11891180
{
1190-
std::unique_lock<RecursiveTimedMutex> il(impl_mtx_);
1191-
11921181
if (writer_ != nullptr)
11931182
{
11941183
// NOTIFY THE BUILTIN PROTOCOLS THAT THE WRITER HAS CHANGED
@@ -1221,8 +1210,6 @@ ReturnCode_t DataWriterImpl::set_qos(
12211210
}
12221211
}
12231212

1224-
std::unique_lock<RecursiveTimedMutex> il(impl_mtx_);
1225-
12261213
if (enabled && !can_qos_be_updated(qos_, qos_to_set))
12271214
{
12281215
return RETCODE_IMMUTABLE_POLICY;
@@ -1235,6 +1222,7 @@ ReturnCode_t DataWriterImpl::set_qos(
12351222

12361223
if (enabled)
12371224
{
1225+
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
12381226
// Locks after we checked that writer exists
12391227

12401228
int32_t transport_priority = writer_->get_transport_priority();
@@ -1268,7 +1256,8 @@ ReturnCode_t DataWriterImpl::set_qos(
12681256
}
12691257

12701258
// Lifespan
1271-
if (old_qos.lifespan().duration != qos_.lifespan().duration) {
1259+
if (old_qos.lifespan().duration != qos_.lifespan().duration)
1260+
{
12721261
if (qos_.lifespan().duration != dds::c_TimeInfinite)
12731262
{
12741263
lifespan_duration_us_ =
@@ -1287,7 +1276,6 @@ ReturnCode_t DataWriterImpl::set_qos(
12871276

12881277
const DataWriterQos& DataWriterImpl::get_qos() const
12891278
{
1290-
std::unique_lock<RecursiveTimedMutex> il(impl_mtx_);
12911279
return qos_;
12921280
}
12931281

@@ -1603,7 +1591,7 @@ void DataWriterImpl::configure_deadline_timer_()
16031591
},
16041592
// Park timer with a huge interval (prevents spurious callbacks); we'll arm/cancel explicitly
16051593
std::numeric_limits<double>::max()
1606-
);
1594+
);
16071595
}
16081596

16091597
// Handle "infinite" and "zero" outside the callback
@@ -1614,7 +1602,8 @@ void DataWriterImpl::configure_deadline_timer_()
16141602
return;
16151603
}
16161604

1617-
deadline_duration_us_ = std::chrono::duration<double, std::ratio<1, 1000000>>(qos_.deadline().period.to_ns() * 1e-3);
1605+
deadline_duration_us_ =
1606+
std::chrono::duration<double, std::ratio<1, 1000000>>(qos_.deadline().period.to_ns() * 1e-3);
16181607

16191608
if (qos_.deadline().period.to_ns() == 0)
16201609
{
@@ -1664,7 +1653,8 @@ bool DataWriterImpl::deadline_missed()
16641653
// If we just reached the max -> log ONCE, stop timer, and bail.
16651654
if (deadline_missed_status_.total_count == std::numeric_limits<uint32_t>::max())
16661655
{
1667-
EPROSIMA_LOG_WARNING(DATA_WRITER, "Maximum number of deadline missed messages reached. Stopping deadline timer.");
1656+
EPROSIMA_LOG_WARNING(DATA_WRITER,
1657+
"Maximum number of deadline missed messages reached. Stopping deadline timer.");
16681658
deadline_timer_->cancel_timer();
16691659
return false; // do not reschedule
16701660
}
@@ -1797,8 +1787,6 @@ ReturnCode_t DataWriterImpl::assert_liveliness()
17971787
ReturnCode_t DataWriterImpl::get_publication_builtin_topic_data(
17981788
PublicationBuiltinTopicData& publication_data) const
17991789
{
1800-
std::unique_lock<RecursiveTimedMutex> il(impl_mtx_);
1801-
18021790
if (nullptr == writer_)
18031791
{
18041792
return RETCODE_NOT_ENABLED;

src/cpp/fastdds/publisher/DataWriterImpl.hpp

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -774,21 +774,16 @@ class DataWriterImpl : protected rtps::IReaderDataFilter
774774
private:
775775

776776
/**
777-
* Protects implementation class
778-
*/
779-
mutable RecursiveTimedMutex impl_mtx_;
780-
781-
/**
782-
* (Re)configures the deadline timer:
783-
* Create once, parked with a huge interval (idle).
784-
* In case of deadline period ∞ cancel it, for 0 warn and notify once; set counts to max and
785-
* for values >0 store period.
786-
*/
777+
* (Re)configures the deadline timer:
778+
* Create once, parked with a huge interval (idle).
779+
* In case of deadline period ∞ cancel it, for 0 warn and notify once; set counts to max and
780+
* for values >0 store period.
781+
*/
787782
void configure_deadline_timer_();
788783

789784
/**
790-
* Notifies listeners that a deadline has been missed without touching the counters.
791-
*/
785+
* Notifies listeners that a deadline has been missed without touching the counters.
786+
*/
792787
void notify_deadline_missed_nts_();
793788

794789
void create_history(

test/blackbox/common/BlackboxTestsDeadlineQos.cpp

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -317,22 +317,34 @@ TEST_P(DeadlineQos, KeyedTopicBestEffortReaderVolatileWriterSetDeadline)
317317
* Creating a DataWriter with a deadline of 0.
318318
* Checking if a warning is logged exactly once, the timer is cancelled without missed deadline
319319
* messages and a total count and count change set to max integer.
320+
* Checking warnings, total count and count change when changing the deadline at runtime
320321
*/
321322
TEST_P(DeadlineQos, ZeroDeadlinePeriod)
322323
{
323324
// Local helper used only by this test
324325
struct LocalWarningCounter : fastlog::LogConsumer
325326
{
326-
explicit LocalWarningCounter(std::string needle) : needle_(std::move(needle)) {}
327-
void Consume(const fastlog::Log::Entry& e) override
327+
explicit LocalWarningCounter(
328+
std::string needle)
329+
: needle_(std::move(needle))
330+
{
331+
}
332+
333+
void Consume(
334+
const fastlog::Log::Entry& e) override
328335
{
329336
if (e.kind == fastlog::Log::Kind::Warning &&
330-
e.message.find(needle_) != std::string::npos)
337+
e.message.find(needle_) != std::string::npos)
331338
{
332339
count_.fetch_add(1, std::memory_order_relaxed);
333340
}
334341
}
335-
size_t count() const { return count_.load(std::memory_order_relaxed); }
342+
343+
size_t count() const
344+
{
345+
return count_.load(std::memory_order_relaxed);
346+
}
347+
336348
std::string needle_;
337349
std::atomic<size_t> count_{0};
338350
};
@@ -362,7 +374,8 @@ TEST_P(DeadlineQos, ZeroDeadlinePeriod)
362374

363375
std::this_thread::sleep_for(std::chrono::milliseconds(150));
364376

365-
EXPECT_EQ(writer.missed_deadlines(), std::numeric_limits<uint32_t>::max()) << "Expected the max value after a zero-deadline warning.";
377+
EXPECT_EQ(writer.missed_deadlines(),
378+
std::numeric_limits<uint32_t>::max()) << "Expected the max value after a zero-deadline warning.";
366379
EXPECT_EQ(writer.missed_deadlines_change(), std::numeric_limits<uint32_t>::max());
367380

368381
EXPECT_EQ(consumer_ptr->count(), 1u) << "Expected exactly one 'deadline=0' warning\n";
@@ -383,6 +396,55 @@ TEST_P(DeadlineQos, ZeroDeadlinePeriod)
383396
std::this_thread::sleep_for(std::chrono::milliseconds(100));
384397
EXPECT_EQ(consumer_ptr->count(), prev) << "Timer should be canceled; no more warnings";
385398

399+
eprosima::fastdds::dds::DataWriter& native = writer.get_native_writer();
400+
401+
// Helper: double seconds -> Duration_t
402+
auto to_duration = [](double s) -> eprosima::fastdds::dds::Duration_t
403+
{
404+
eprosima::fastdds::dds::Duration_t d{};
405+
d.seconds = static_cast<int32_t>(s);
406+
d.nanosec = static_cast<uint32_t>((s - d.seconds) * 1e9);
407+
return d;
408+
};
409+
410+
// Helper: set writer deadline at runtime via standard DDS API
411+
auto set_deadline = [&](double seconds)
412+
{
413+
eprosima::fastdds::dds::DataWriterQos q;
414+
ASSERT_EQ(native.get_qos(q), eprosima::fastdds::dds::RETCODE_OK);
415+
q.deadline().period = to_duration(seconds);
416+
ASSERT_EQ(native.set_qos(q), eprosima::fastdds::dds::RETCODE_OK);
417+
};
418+
419+
set_deadline(0.1); // Update 0 -> finite: counters must reset, no new warning
420+
421+
eprosima::fastdds::dds::OfferedDeadlineMissedStatus st{};
422+
std::this_thread::sleep_for(std::chrono::milliseconds(150));
423+
ASSERT_EQ(native.get_offered_deadline_missed_status(st), eprosima::fastdds::dds::RETCODE_OK);
424+
EXPECT_EQ(st.total_count, 0u);
425+
EXPECT_EQ(st.total_count_change, 0u);
426+
EXPECT_EQ(consumer_ptr->count(), prev) << "No new warning when moving from 0 -> finite";
427+
428+
set_deadline(0.0); // Update finite -> 0: exactly one new warning, counters jump to max
429+
430+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
431+
ASSERT_EQ(native.get_offered_deadline_missed_status(st), eprosima::fastdds::dds::RETCODE_OK);
432+
EXPECT_EQ(consumer_ptr->count(), prev + 1) << "Exactly one extra warning on finite -> 0";
433+
EXPECT_EQ(st.total_count, std::numeric_limits<uint32_t>::max());
434+
EXPECT_EQ(st.total_count_change, std::numeric_limits<uint32_t>::max());
435+
436+
set_deadline(0.001); // Update 0 -> finite small: missed deadlines registered
437+
KeyedHelloWorld sample;
438+
sample.key(1);
439+
ASSERT_EQ(native.write(&sample), eprosima::fastdds::dds::RETCODE_OK);
440+
441+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
442+
ASSERT_EQ(native.get_offered_deadline_missed_status(st), eprosima::fastdds::dds::RETCODE_OK);
443+
EXPECT_GT(st.total_count, 0u);
444+
EXPECT_GT(st.total_count_change, 0u);
445+
EXPECT_LT(st.total_count, std::numeric_limits<uint32_t>::max());
446+
EXPECT_LT(st.total_count_change, std::numeric_limits<uint32_t>::max());
447+
386448
fastlog::Log::ClearConsumers();
387449
}
388450

0 commit comments

Comments
 (0)