Skip to content

Commit febaf25

Browse files
committed
Included new mutex for class protection
Signed-off-by: zesk1999 <[email protected]>
1 parent 4b16b4e commit febaf25

File tree

4 files changed

+84
-76
lines changed

4 files changed

+84
-76
lines changed

src/cpp/fastdds/publisher/DataWriterImpl.cpp

Lines changed: 43 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,8 @@ void DataWriterImpl::create_history(
248248

249249
ReturnCode_t DataWriterImpl::enable()
250250
{
251+
std::unique_lock<RecursiveTimedMutex> il(impl_mtx_);
252+
251253
assert(writer_ == nullptr);
252254

253255
auto history_att = DataWriterHistory::to_history_attributes(
@@ -431,15 +433,14 @@ ReturnCode_t DataWriterImpl::enable()
431433
}
432434

433435
writer_ = BaseWriter::downcast(writer);
434-
436+
435437
// Set DataWriterImpl as the implementer of the
436438
// IReaderDataFilter interface
437439
writer_->reader_data_filter(this);
438440

439441
// In case it has been loaded from the persistence DB, rebuild instances on history
440442
history_->rebuild_instances();
441443

442-
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
443444
configure_deadline_timer_();
444445

445446
lifespan_timer_ = new TimedEvent(publisher_->rtps_participant()->get_resource_event(),
@@ -518,6 +519,8 @@ ReturnCode_t DataWriterImpl::loan_sample(
518519
void*& sample,
519520
LoanInitializationKind initialization)
520521
{
522+
std::unique_lock<RecursiveTimedMutex> il(impl_mtx_);
523+
521524
// Block lowlevel writer
522525
auto max_blocking_time = steady_clock::now() +
523526
microseconds(rtps::TimeConv::Time_t2MicroSecondsInt64(qos_.reliability().max_blocking_time));
@@ -819,6 +822,8 @@ InstanceHandle_t DataWriterImpl::do_register_instance(
819822
const InstanceHandle_t instance_handle,
820823
WriteParams& wparams)
821824
{
825+
std::unique_lock<RecursiveTimedMutex> il(impl_mtx_);
826+
822827
// TODO(MiguelCompany): wparams should be used when propagating the register_instance operation to the DataReader.
823828
// See redmine issue #14494
824829
static_cast<void>(wparams);
@@ -866,6 +871,8 @@ ReturnCode_t DataWriterImpl::unregister_instance(
866871
const InstanceHandle_t& handle,
867872
bool dispose)
868873
{
874+
std::unique_lock<RecursiveTimedMutex> il(impl_mtx_);
875+
869876
// Preconditions
870877
InstanceHandle_t ih;
871878
ReturnCode_t returned_value = check_instance_preconditions(instance, handle, ih);
@@ -891,6 +898,8 @@ ReturnCode_t DataWriterImpl::unregister_instance_w_timestamp(
891898
const fastdds::dds::Time_t& timestamp,
892899
bool dispose)
893900
{
901+
std::unique_lock<RecursiveTimedMutex> il(impl_mtx_);
902+
894903
// Preconditions
895904
InstanceHandle_t instance_handle;
896905
ReturnCode_t ret = RETCODE_OK;
@@ -1069,7 +1078,7 @@ ReturnCode_t DataWriterImpl::perform_create_new_change(
10691078
return RETCODE_TIMEOUT;
10701079
}
10711080

1072-
if (deadline_timer_ != nullptr && qos_.deadline().period.to_ns() > 0 && qos_.deadline().period != dds::c_TimeInfinite && deadline_missed_status_.total_count < std::numeric_limits<uint32_t>::max())
1081+
if (qos_.deadline().period.to_ns() > 0 && qos_.deadline().period != dds::c_TimeInfinite && deadline_missed_status_.total_count < std::numeric_limits<uint32_t>::max())
10731082
{
10741083
if (!history_->set_next_deadline(
10751084
handle,
@@ -1178,6 +1187,8 @@ InstanceHandle_t DataWriterImpl::get_instance_handle() const
11781187

11791188
void DataWriterImpl::publisher_qos_updated()
11801189
{
1190+
std::unique_lock<RecursiveTimedMutex> il(impl_mtx_);
1191+
11811192
if (writer_ != nullptr)
11821193
{
11831194
// NOTIFY THE BUILTIN PROTOCOLS THAT THE WRITER HAS CHANGED
@@ -1210,6 +1221,8 @@ ReturnCode_t DataWriterImpl::set_qos(
12101221
}
12111222
}
12121223

1224+
std::unique_lock<RecursiveTimedMutex> il(impl_mtx_);
1225+
12131226
if (enabled && !can_qos_be_updated(qos_, qos_to_set))
12141227
{
12151228
return RETCODE_IMMUTABLE_POLICY;
@@ -1222,7 +1235,6 @@ ReturnCode_t DataWriterImpl::set_qos(
12221235

12231236
if (enabled)
12241237
{
1225-
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
12261238
// Locks after we checked that writer exists
12271239

12281240
int32_t transport_priority = writer_->get_transport_priority();
@@ -1275,6 +1287,7 @@ ReturnCode_t DataWriterImpl::set_qos(
12751287

12761288
const DataWriterQos& DataWriterImpl::get_qos() const
12771289
{
1290+
std::unique_lock<RecursiveTimedMutex> il(impl_mtx_);
12781291
return qos_;
12791292
}
12801293

@@ -1555,30 +1568,21 @@ ReturnCode_t DataWriterImpl::set_related_datareader(
15551568
return ret;
15561569
}
15571570

1558-
// deadline_timer_reschedule returns true if it could compute and set a new interval; false if there’s no pending deadline.
15591571
bool DataWriterImpl::deadline_timer_reschedule()
15601572
{
1561-
assert(qos_.deadline().period != dds::c_TimeInfinite);
1562-
if (deadline_timer_ == nullptr || deadline_missed_status_.total_count >= std::numeric_limits<uint32_t>::max())
1563-
{
1564-
return false;
1565-
}
1566-
15671573
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
15681574

1575+
assert(qos_.deadline().period != dds::c_TimeInfinite);
1576+
assert(deadline_timer_ != nullptr);
1577+
assert(deadline_missed_status_.total_count < std::numeric_limits<uint32_t>::max());
1578+
15691579
steady_clock::time_point next_deadline_us;
15701580
if (!history_->get_next_deadline(timer_owner_, next_deadline_us))
15711581
{
15721582
EPROSIMA_LOG_ERROR(DATA_WRITER, "Could not get the next deadline from the history");
15731583
return false;
15741584
}
15751585

1576-
if (next_deadline_us <= std::chrono::steady_clock::now())
1577-
{
1578-
// Ignore uninitialized or stale deadlines; don't arm to avoid a spurious first miss
1579-
return false;
1580-
}
1581-
15821586
auto interval_ms = duration_cast<milliseconds>(next_deadline_us - steady_clock::now());
15831587
deadline_timer_->update_interval_millisec(static_cast<double>(interval_ms.count()));
15841588
return true;
@@ -1588,27 +1592,24 @@ void DataWriterImpl::configure_deadline_timer_()
15881592
{
15891593
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
15901594

1591-
// Create the timer once
1595+
// Create the timer once
15921596
if (deadline_timer_ == nullptr)
15931597
{
15941598
deadline_timer_ = new TimedEvent(
15951599
publisher_->rtps_participant()->get_resource_event(),
15961600
[this]() -> bool
15971601
{
1598-
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
1599-
1600-
// Normal path: process a deadline miss. This function will
1601-
// update counters, notify, and ask for reschedule as needed.
16021602
return deadline_missed();
16031603
},
1604-
(qos_.deadline().period == dds::c_TimeInfinite) ? std::numeric_limits<double>::max() : qos_.deadline().period.to_ns() * 1e-6
1605-
// In case of deadline period = infinite, multiplying it by 1e-6 could cause a problem
1604+
// Park timer with a huge interval (prevents spurious callbacks); we'll arm/cancel explicitly
1605+
std::numeric_limits<double>::max()
16061606
);
16071607
}
16081608

16091609
// Handle "infinite" and "zero" outside the callback
16101610
if (qos_.deadline().period == dds::c_TimeInfinite)
16111611
{
1612+
deadline_duration_us_ = std::chrono::duration<double, std::micro>::max();
16121613
deadline_timer_->cancel_timer();
16131614
return;
16141615
}
@@ -1617,52 +1618,48 @@ void DataWriterImpl::configure_deadline_timer_()
16171618

16181619
if (qos_.deadline().period.to_ns() == 0)
16191620
{
1621+
deadline_timer_->cancel_timer();
1622+
1623+
deadline_missed_status_.total_count = std::numeric_limits<uint32_t>::max();
1624+
deadline_missed_status_.total_count_change = std::numeric_limits<uint32_t>::max();
16201625
EPROSIMA_LOG_WARNING(
16211626
DATA_WRITER,
16221627
"Deadline period is 0, it will be ignored from now on. Timer is going to be canceled.");
16231628

16241629
// Bump once and notify listener exactly once.
1625-
notify_deadline_missed_no_increment_();
1626-
1627-
deadline_missed_status_.total_count = std::numeric_limits<uint32_t>::max();
1628-
deadline_timer_->cancel_timer();
1630+
notify_deadline_missed_nts_();
16291631
return;
16301632
}
16311633

1632-
if (deadline_timer_reschedule())
1633-
{
1634-
deadline_timer_->restart_timer();
1635-
}
1636-
else
1637-
{
1638-
// Keep the timer object around but idle if there's no pending deadline
1639-
deadline_timer_->cancel_timer();
1640-
}
1634+
deadline_timer_->update_interval_millisec(qos_.deadline().period.to_ns() * 1e-6);
16411635
}
16421636

1643-
void DataWriterImpl::notify_deadline_missed_no_increment_()
1637+
void DataWriterImpl::notify_deadline_missed_nts_()
16441638
{
1645-
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
1646-
16471639
StatusMask notify_status = StatusMask::offered_deadline_missed();
16481640
if (auto* listener = get_listener_for(notify_status))
16491641
{
16501642
listener->on_offered_deadline_missed(user_datawriter_, deadline_missed_status_);
16511643
}
1644+
1645+
#ifdef FASTDDS_STATISTICS
1646+
writer_listener_.notify_status_observer(statistics::StatusKind::DEADLINE_MISSED);
1647+
#endif // FASTDDS_STATISTICS
1648+
16521649
user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true);
16531650
}
16541651

16551652
bool DataWriterImpl::deadline_missed()
16561653
{
1657-
assert(qos_.deadline().period != dds::c_TimeInfinite);
1658-
16591654
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
16601655

1656+
assert(qos_.deadline().period != dds::c_TimeInfinite);
1657+
16611658
deadline_missed_status_.total_count++;
16621659
deadline_missed_status_.total_count_change++;
16631660
deadline_missed_status_.last_instance_handle = timer_owner_;
16641661

1665-
notify_deadline_missed_no_increment_();
1662+
notify_deadline_missed_nts_();
16661663

16671664
// If we just reached the max -> log ONCE, stop timer, and bail.
16681665
if (deadline_missed_status_.total_count == std::numeric_limits<uint32_t>::max())
@@ -1672,10 +1669,6 @@ bool DataWriterImpl::deadline_missed()
16721669
return false; // do not reschedule
16731670
}
16741671

1675-
#ifdef FASTDDS_STATISTICS
1676-
writer_listener_.notify_status_observer(statistics::StatusKind::DEADLINE_MISSED);
1677-
#endif // FASTDDS_STATISTICS
1678-
16791672
if (!history_->set_next_deadline(
16801673
timer_owner_,
16811674
steady_clock::now() + duration_cast<steady_clock::duration>(deadline_duration_us_)))
@@ -1804,6 +1797,8 @@ ReturnCode_t DataWriterImpl::assert_liveliness()
18041797
ReturnCode_t DataWriterImpl::get_publication_builtin_topic_data(
18051798
PublicationBuiltinTopicData& publication_data) const
18061799
{
1800+
std::unique_lock<RecursiveTimedMutex> il(impl_mtx_);
1801+
18071802
if (nullptr == writer_)
18081803
{
18091804
return RETCODE_NOT_ENABLED;
@@ -2532,4 +2527,4 @@ bool DataWriterImpl::is_relevant(
25322527

25332528
} // namespace dds
25342529
} // namespace fastdds
2535-
} // namespace eprosima
2530+
} // namespace eprosima

src/cpp/fastdds/publisher/DataWriterImpl.hpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#define _FASTDDS_DATAWRITERIMPL_HPP_
2121

2222
#include <memory>
23+
#include <mutex>
2324

2425
#include <fastdds/dds/builtin/topic/PublicationBuiltinTopicData.hpp>
2526
#include <fastdds/dds/core/ReturnCode.hpp>
@@ -643,6 +644,7 @@ class DataWriterImpl : protected rtps::IReaderDataFilter
643644

644645
/**
645646
* @brief A method to reschedule the deadline timer
647+
* @return true if it could compute and set a new interval, if there’s a pending deadline
646648
*/
647649
bool deadline_timer_reschedule();
648650

@@ -772,16 +774,22 @@ class DataWriterImpl : protected rtps::IReaderDataFilter
772774
private:
773775

774776
/**
775-
* This function tears down any existing deadline timer and creates a new one
776-
* configured for the current deadline period. If the period is 0 or infinite,
777-
* the timer is created to fire only once to log a warning and then cancel itself.
777+
* Protects implementation class
778+
*/
779+
mutable RecursiveTimedMutex impl_mtx_;
780+
781+
/**
782+
* (Re)configures the deadline timer:
783+
* Create once, parked with a huge interval (idle).
784+
* In case of deadline period ∞ cancel it, for 0 warn and notify once; set counts to max and
785+
* for values >0 store period.
778786
*/
779787
void configure_deadline_timer_();
780788

781789
/**
782790
* Notifies listeners that a deadline has been missed without touching the counters.
783791
*/
784-
void notify_deadline_missed_no_increment_();
792+
void notify_deadline_missed_nts_();
785793

786794
void create_history(
787795
const std::shared_ptr<IPayloadPool>& payload_pool,

test/blackbox/api/dds-pim/PubSubWriter.hpp

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,6 @@ class PubSubWriter
183183
Listener(
184184
PubSubWriter& writer)
185185
: writer_(writer)
186-
, times_deadline_missed_(0)
187186
, times_liveliness_lost_(0)
188187
, times_unack_sample_removed_(0)
189188
{
@@ -214,7 +213,8 @@ class PubSubWriter
214213
const eprosima::fastdds::dds::OfferedDeadlineMissedStatus& status) override
215214
{
216215
static_cast<void>(datawriter);
217-
times_deadline_missed_ = status.total_count;
216+
std::lock_guard<std::mutex> lk(mutex_);
217+
offered_deadline_status_ = status;
218218
}
219219

220220
void on_offered_incompatible_qos(
@@ -245,7 +245,14 @@ class PubSubWriter
245245

246246
unsigned int missed_deadlines() const
247247
{
248-
return times_deadline_missed_;
248+
std::lock_guard<std::mutex> lk(mutex_);
249+
return offered_deadline_status_.total_count;
250+
}
251+
252+
unsigned int missed_deadlines_change() const
253+
{
254+
std::lock_guard<std::mutex> lk(mutex_);
255+
return offered_deadline_status_.total_count_change;
249256
}
250257

251258
unsigned int times_liveliness_lost() const
@@ -269,9 +276,10 @@ class PubSubWriter
269276
const Listener&) = delete;
270277

271278
PubSubWriter& writer_;
279+
mutable std::mutex mutex_;
280+
281+
eprosima::fastdds::dds::OfferedDeadlineMissedStatus offered_deadline_status_{};
272282

273-
//! The number of times deadline was missed
274-
unsigned int times_deadline_missed_;
275283
//! The number of times liveliness was lost
276284
unsigned int times_liveliness_lost_;
277285
//! The number of times a sample has been removed unacknowledged
@@ -287,12 +295,6 @@ class PubSubWriter
287295
typedef TypeSupport type_support;
288296
typedef typename type_support::type type;
289297

290-
eprosima::fastdds::dds::ReturnCode_t get_offered_deadline_missed_status(
291-
eprosima::fastdds::dds::OfferedDeadlineMissedStatus& st)
292-
{
293-
return datawriter_->get_offered_deadline_missed_status(st);
294-
}
295-
296298
PubSubWriter(
297299
const std::string& topic_name)
298300
: participant_listener_(*this)
@@ -1761,6 +1763,11 @@ class PubSubWriter
17611763
return listener_.missed_deadlines();
17621764
}
17631765

1766+
unsigned int missed_deadlines_change() const
1767+
{
1768+
return listener_.missed_deadlines_change();
1769+
}
1770+
17641771
unsigned int times_liveliness_lost() const
17651772
{
17661773
return listener_.times_liveliness_lost();

0 commit comments

Comments
 (0)