Skip to content

Commit e58f872

Browse files
committed
shared ptr and weak ptr
Signed-off-by: Ruiyang Wang <[email protected]>
1 parent ef5ba77 commit e58f872

29 files changed

+144
-143
lines changed

src/ray/common/asio/periodical_runner.cc

+38-46
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ PeriodicalRunner::~PeriodicalRunner() {
2929

3030
void PeriodicalRunner::Clear() {
3131
absl::MutexLock lock(&mutex_);
32-
*stopped_ = true;
3332
for (const auto &timer : timers_) {
3433
timer->cancel();
3534
}
@@ -39,28 +38,23 @@ void PeriodicalRunner::Clear() {
3938
void PeriodicalRunner::RunFnPeriodically(std::function<void()> fn,
4039
uint64_t period_ms,
4140
const std::string &name) {
42-
*stopped_ = false;
4341
if (period_ms > 0) {
4442
auto timer = std::make_shared<boost::asio::deadline_timer>(io_service_);
4543
{
4644
absl::MutexLock lock(&mutex_);
4745
timers_.push_back(timer);
4846
}
47+
auto weak_self = weak_from_this();
4948
io_service_.post(
50-
[this,
51-
stopped = stopped_,
52-
fn = std::move(fn),
53-
period_ms,
54-
name,
55-
timer = std::move(timer)]() {
56-
if (*stopped) {
57-
return;
58-
}
59-
if (RayConfig::instance().event_stats()) {
60-
DoRunFnPeriodicallyInstrumented(
61-
fn, boost::posix_time::milliseconds(period_ms), timer, name);
62-
} else {
63-
DoRunFnPeriodically(fn, boost::posix_time::milliseconds(period_ms), timer);
49+
[weak_self, fn = std::move(fn), period_ms, name, timer = std::move(timer)]() {
50+
if (auto self = weak_self.lock(); self) {
51+
if (RayConfig::instance().event_stats()) {
52+
self->DoRunFnPeriodicallyInstrumented(
53+
std::move(fn), boost::posix_time::milliseconds(period_ms), timer, name);
54+
} else {
55+
self->DoRunFnPeriodically(
56+
std::move(fn), boost::posix_time::milliseconds(period_ms), timer);
57+
}
6458
}
6559
},
6660
"PeriodicalRunner.RunFnPeriodically");
@@ -74,19 +68,19 @@ void PeriodicalRunner::DoRunFnPeriodically(
7468
fn();
7569
absl::MutexLock lock(&mutex_);
7670
timer->expires_from_now(period);
77-
timer->async_wait([this, stopped = stopped_, fn, period, timer = std::move(timer)](
71+
auto weak_self = weak_from_this();
72+
timer->async_wait([weak_self, fn, period, timer = std::move(timer)](
7873
const boost::system::error_code &error) {
79-
if (*stopped) {
80-
return;
81-
}
82-
if (error == boost::asio::error::operation_aborted) {
83-
// `operation_aborted` is set when `timer` is canceled or destroyed.
84-
// The Monitor lifetime may be short than the object who use it. (e.g.
85-
// gcs_server)
86-
return;
74+
if (auto self = weak_self.lock(); self) {
75+
if (error == boost::asio::error::operation_aborted) {
76+
// `operation_aborted` is set when `timer` is canceled or destroyed.
77+
// The Monitor lifetime may be short than the object who use it. (e.g.
78+
// gcs_server)
79+
return;
80+
}
81+
RAY_CHECK(!error) << error.message();
82+
self->DoRunFnPeriodically(fn, period, timer);
8783
}
88-
RAY_CHECK(!error) << error.message();
89-
DoRunFnPeriodically(fn, period, timer);
9084
});
9185
}
9286

@@ -102,31 +96,29 @@ void PeriodicalRunner::DoRunFnPeriodicallyInstrumented(
10296
// which the handler was elgible to execute on the event loop but was queued by the
10397
// event loop.
10498
auto stats_handle = io_service_.stats().RecordStart(name, period.total_nanoseconds());
105-
timer->async_wait([this,
99+
auto weak_self = weak_from_this();
100+
timer->async_wait([weak_self,
106101
fn,
107-
stopped = stopped_,
108102
period,
109103
timer = std::move(timer),
110104
stats_handle = std::move(stats_handle),
111105
name](const boost::system::error_code &error) {
112-
if (*stopped) {
113-
return;
106+
if (auto self = weak_self.lock(); self) {
107+
self->io_service_.stats().RecordExecution(
108+
[weak_self, fn, error, period, timer, name]() {
109+
if (auto self = weak_self.lock(); self) {
110+
if (error == boost::asio::error::operation_aborted) {
111+
// `operation_aborted` is set when `timer` is canceled or destroyed.
112+
// The Monitor lifetime may be short than the object who use it. (e.g.
113+
// gcs_server)
114+
return;
115+
}
116+
RAY_CHECK(!error) << error.message();
117+
self->DoRunFnPeriodicallyInstrumented(fn, period, timer, name);
118+
}
119+
},
120+
stats_handle);
114121
}
115-
io_service_.stats().RecordExecution(
116-
[this, stopped = stopped, fn, error, period, timer, name]() {
117-
if (*stopped) {
118-
return;
119-
}
120-
if (error == boost::asio::error::operation_aborted) {
121-
// `operation_aborted` is set when `timer` is canceled or destroyed.
122-
// The Monitor lifetime may be short than the object who use it. (e.g.
123-
// gcs_server)
124-
return;
125-
}
126-
RAY_CHECK(!error) << error.message();
127-
DoRunFnPeriodicallyInstrumented(fn, period, timer, name);
128-
},
129-
stats_handle);
130122
});
131123
}
132124

src/ray/common/asio/periodical_runner.h

+13-7
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,28 @@ namespace ray {
2828
/// It can run functions with specified period. Each function is triggered by its timer.
2929
/// To run a function, call `RunFnPeriodically(fn, period_ms)`.
3030
/// All registered functions will stop running once this object is destructed.
31-
class PeriodicalRunner {
31+
//
32+
// Lifetime: once a PeriodicalRunner is destructed, all its timers are cancelled. The
33+
// scheduled asio tasks keep a weak_ptr to the PeriodicalRunner, and they won't run after
34+
// the PeriodicalRunner is destructed.
35+
class PeriodicalRunner : public std::enable_shared_from_this<PeriodicalRunner> {
3236
public:
33-
explicit PeriodicalRunner(instrumented_io_context &io_service);
37+
static std::shared_ptr<PeriodicalRunner> Create(instrumented_io_context &io_service) {
38+
// Sadly we can't use std::make_shared because the constructor is private.
39+
return std::shared_ptr<PeriodicalRunner>(new PeriodicalRunner(io_service));
40+
}
3441

3542
~PeriodicalRunner();
3643

37-
void Clear();
38-
3944
void RunFnPeriodically(std::function<void()> fn,
4045
uint64_t period_ms,
4146
const std::string &name) ABSL_LOCKS_EXCLUDED(mutex_);
4247

4348
private:
49+
explicit PeriodicalRunner(instrumented_io_context &io_service);
50+
51+
void Clear();
52+
4453
void DoRunFnPeriodically(const std::function<void()> &fn,
4554
boost::posix_time::milliseconds period,
4655
std::shared_ptr<boost::asio::deadline_timer> timer)
@@ -56,9 +65,6 @@ class PeriodicalRunner {
5665
mutable absl::Mutex mutex_;
5766
std::vector<std::shared_ptr<boost::asio::deadline_timer>> timers_
5867
ABSL_GUARDED_BY(mutex_);
59-
// `stopped_` is copied to the timer callback, and may outlive `this`.
60-
std::shared_ptr<std::atomic<bool>> stopped_ =
61-
std::make_shared<std::atomic<bool>>(false);
6268
};
6369

6470
} // namespace ray

src/ray/common/file_system_monitor.cc

+4-4
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@ FileSystemMonitor::FileSystemMonitor(std::vector<std::string> paths,
3333
boost::asio::io_service::work io_service_work_(io_context_);
3434
io_context_.run();
3535
}),
36-
runner_(io_context_) {
37-
runner_.RunFnPeriodically([this] { over_capacity_ = CheckIfAnyPathOverCapacity(); },
38-
monitor_interval_ms,
39-
"FileSystemMonitor.CheckIfAnyPathOverCapacity");
36+
runner_(PeriodicalRunner::Create(io_context_)) {
37+
runner_->RunFnPeriodically([this] { over_capacity_ = CheckIfAnyPathOverCapacity(); },
38+
monitor_interval_ms,
39+
"FileSystemMonitor.CheckIfAnyPathOverCapacity");
4040
}
4141

4242
FileSystemMonitor::FileSystemMonitor()

src/ray/common/file_system_monitor.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ class FileSystemMonitor {
7070
std::atomic<bool> over_capacity_;
7171
instrumented_io_context io_context_;
7272
std::thread monitor_thread_;
73-
PeriodicalRunner runner_;
73+
std::shared_ptr<PeriodicalRunner> runner_;
7474
};
7575

7676
std::vector<std::string> ParseSpillingPaths(const std::string &spilling_config);

src/ray/common/memory_monitor.cc

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ MemoryMonitor::MemoryMonitor(instrumented_io_context &io_service,
3434
: usage_threshold_(usage_threshold),
3535
min_memory_free_bytes_(min_memory_free_bytes),
3636
monitor_callback_(monitor_callback),
37-
runner_(io_service) {
37+
runner_(PeriodicalRunner::Create(io_service)) {
3838
RAY_CHECK(monitor_callback_ != nullptr);
3939
RAY_CHECK_GE(usage_threshold_, 0);
4040
RAY_CHECK_LE(usage_threshold_, 1);
@@ -48,7 +48,7 @@ MemoryMonitor::MemoryMonitor(instrumented_io_context &io_service,
4848
<< computed_threshold_bytes_ << " bytes ("
4949
<< FormatFloat(computed_threshold_fraction_, 2)
5050
<< " system memory), total system memory bytes: " << total_memory_bytes;
51-
runner_.RunFnPeriodically(
51+
runner_->RunFnPeriodically(
5252
[this] {
5353
auto [used_memory_bytes, total_memory_bytes] = GetMemoryBytes();
5454
MemorySnapshot system_memory;

src/ray/common/memory_monitor.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ class MemoryMonitor {
233233
/// Callback function that executes at each monitoring interval,
234234
/// on a dedicated thread managed by this class.
235235
const MemoryUsageRefreshCallback monitor_callback_;
236-
PeriodicalRunner runner_;
236+
std::shared_ptr<PeriodicalRunner> runner_;
237237
};
238238

239239
} // namespace ray

src/ray/common/ray_syncer/ray_syncer.cc

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ RaySyncer::RaySyncer(instrumented_io_context &io_context,
2929
: io_context_(io_context),
3030
local_node_id_(local_node_id),
3131
node_state_(std::make_unique<NodeState>()),
32-
timer_(io_context) {
32+
timer_(PeriodicalRunner::Create(io_context)) {
3333
stopped_ = std::make_shared<bool>(false);
3434
}
3535

@@ -155,7 +155,7 @@ void RaySyncer::Register(MessageType message_type,
155155

156156
// Set job to pull from reporter periodically
157157
if (reporter != nullptr && pull_from_reporter_interval_ms > 0) {
158-
timer_.RunFnPeriodically(
158+
timer_->RunFnPeriodically(
159159
[this, stopped = stopped_, message_type]() {
160160
if (*stopped) {
161161
return;

src/ray/common/ray_syncer/ray_syncer.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ class RaySyncer {
176176
std::unique_ptr<NodeState> node_state_;
177177

178178
/// Timer is used to do broadcasting.
179-
ray::PeriodicalRunner timer_;
179+
std::shared_ptr<PeriodicalRunner> timer_;
180180

181181
friend class RaySyncerService;
182182
/// Test purpose

src/ray/common/test/syncer_service_e2e_test.cc

+4-3
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@
2828
#include "ray/common/ray_syncer/ray_syncer.h"
2929
using namespace std;
3030
using namespace ray::syncer;
31+
using ray::PeriodicalRunner;
3132

3233
class LocalNode : public ReporterInterface {
3334
public:
3435
LocalNode(instrumented_io_context &io_context, ray::NodeID node_id)
35-
: node_id_(node_id), timer_(io_context) {
36-
timer_.RunFnPeriodically(
36+
: node_id_(node_id), timer_(PeriodicalRunner::Create(io_context)) {
37+
timer_->RunFnPeriodically(
3738
[this]() {
3839
auto v = static_cast<double>(std::rand()) / RAND_MAX;
3940
if (v < 0.3) {
@@ -66,7 +67,7 @@ class LocalNode : public ReporterInterface {
6667
int64_t version_ = 0;
6768
int state_ = 0;
6869
ray::NodeID node_id_;
69-
ray::PeriodicalRunner timer_;
70+
std::shared_ptr<PeriodicalRunner> timer_;
7071
};
7172

7273
class RemoteNodes : public ReceiverInterface {

src/ray/core_worker/core_worker.cc

+14-14
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ CoreWorker::CoreWorker(CoreWorkerOptions options, const WorkerID &worker_id)
257257
worker_context_(options_.worker_type, worker_id, GetProcessJobID(options_)),
258258
io_work_(io_service_),
259259
client_call_manager_(new rpc::ClientCallManager(io_service_)),
260-
periodical_runner_(io_service_),
260+
periodical_runner_(PeriodicalRunner::Create(io_service_)),
261261
task_queue_length_(0),
262262
num_executed_tasks_(0),
263263
resource_ids_(new ResourceMappingType()),
@@ -327,7 +327,7 @@ CoreWorker::CoreWorker(CoreWorkerOptions options, const WorkerID &worker_id)
327327
options_.raylet_ip_address, options_.node_manager_port, *client_call_manager_);
328328

329329
if (options_.worker_type != WorkerType::DRIVER) {
330-
periodical_runner_.RunFnPeriodically(
330+
periodical_runner_->RunFnPeriodically(
331331
[this] { ExitIfParentRayletDies(); },
332332
RayConfig::instance().raylet_death_check_interval_milliseconds(),
333333
"CoreWorker.ExitIfParentRayletDies");
@@ -451,11 +451,11 @@ CoreWorker::CoreWorker(CoreWorkerOptions options, const WorkerID &worker_id)
451451
});
452452

453453
object_info_publisher_ = std::make_unique<pubsub::Publisher>(
454-
/*channels=*/std::vector<
455-
rpc::ChannelType>{rpc::ChannelType::WORKER_OBJECT_EVICTION,
456-
rpc::ChannelType::WORKER_REF_REMOVED_CHANNEL,
457-
rpc::ChannelType::WORKER_OBJECT_LOCATIONS_CHANNEL},
458-
/*periodical_runner=*/&periodical_runner_,
454+
/*channels=*/
455+
std::vector<rpc::ChannelType>{rpc::ChannelType::WORKER_OBJECT_EVICTION,
456+
rpc::ChannelType::WORKER_REF_REMOVED_CHANNEL,
457+
rpc::ChannelType::WORKER_OBJECT_LOCATIONS_CHANNEL},
458+
/*periodical_runner=*/*periodical_runner_,
459459
/*get_time_ms=*/[]() { return absl::GetCurrentTimeNanos() / 1e6; },
460460
/*subscriber_timeout_ms=*/RayConfig::instance().subscriber_timeout_ms(),
461461
/*publish_batch_size_=*/RayConfig::instance().publish_batch_size(),
@@ -791,7 +791,7 @@ CoreWorker::CoreWorker(CoreWorkerOptions options, const WorkerID &worker_id)
791791
const auto event_stats_print_interval_ms =
792792
RayConfig::instance().event_stats_print_interval_ms();
793793
if (event_stats_print_interval_ms != -1 && RayConfig::instance().event_stats()) {
794-
periodical_runner_.RunFnPeriodically(
794+
periodical_runner_->RunFnPeriodically(
795795
[this] {
796796
RAY_LOG(INFO) << "Event stats:\n\n"
797797
<< io_service_.stats().StatsString() << "\n\n"
@@ -811,7 +811,7 @@ CoreWorker::CoreWorker(CoreWorkerOptions options, const WorkerID &worker_id)
811811
ray::rpc::Event_SourceType::Event_SourceType_CORE_WORKER,
812812
{{"worker_id", worker_id.Hex()}});
813813

814-
periodical_runner_.RunFnPeriodically(
814+
periodical_runner_->RunFnPeriodically(
815815
[this] {
816816
const auto lost_objects = reference_counter_->FlushObjectsToRecover();
817817
if (!lost_objects.empty()) {
@@ -836,17 +836,17 @@ CoreWorker::CoreWorker(CoreWorkerOptions options, const WorkerID &worker_id)
836836
100,
837837
"CoreWorker.RecoverObjects");
838838

839-
periodical_runner_.RunFnPeriodically(
839+
periodical_runner_->RunFnPeriodically(
840840
[this] { InternalHeartbeat(); },
841841
RayConfig::instance().core_worker_internal_heartbeat_ms(),
842842
"CoreWorker.InternalHeartbeat");
843843

844-
periodical_runner_.RunFnPeriodically(
844+
periodical_runner_->RunFnPeriodically(
845845
[this] { RecordMetrics(); },
846846
RayConfig::instance().metrics_report_interval_ms() / 2,
847847
"CoreWorker.RecordMetrics");
848848

849-
periodical_runner_.RunFnPeriodically(
849+
periodical_runner_->RunFnPeriodically(
850850
[this] { TryDeleteObjectRefStreams(); },
851851
RayConfig::instance().local_gc_min_interval_s() * 1000,
852852
"CoreWorker.GCStreamingGeneratorMetadata");
@@ -3014,9 +3014,9 @@ std::unique_ptr<worker::ProfileEvent> CoreWorker::CreateProfileEvent(
30143014
}
30153015

30163016
void CoreWorker::RunTaskExecutionLoop() {
3017-
PeriodicalRunner signal_checker(task_execution_service_);
3017+
auto signal_checker = PeriodicalRunner::Create(task_execution_service_);
30183018
if (options_.check_signals) {
3019-
signal_checker.RunFnPeriodically(
3019+
signal_checker->RunFnPeriodically(
30203020
[this] {
30213021
/// The overhead of this is only a single digit microsecond.
30223022
auto status = options_.check_signals();

src/ray/core_worker/core_worker.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -1695,7 +1695,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
16951695
std::shared_ptr<rpc::CoreWorkerClientPool> core_worker_client_pool_;
16961696

16971697
/// The runner to run function periodically.
1698-
PeriodicalRunner periodical_runner_;
1698+
std::shared_ptr<PeriodicalRunner> periodical_runner_;
16991699

17001700
/// RPC server used to receive tasks to execute.
17011701
std::unique_ptr<rpc::GrpcServer> core_worker_server_;

src/ray/core_worker/task_event_buffer.cc

+4-4
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ bool TaskEventBuffer::RecordTaskStatusEventIfNeeded(
232232

233233
TaskEventBufferImpl::TaskEventBufferImpl(std::shared_ptr<gcs::GcsClient> gcs_client)
234234
: work_guard_(boost::asio::make_work_guard(io_service_)),
235-
periodical_runner_(io_service_),
235+
periodical_runner_(PeriodicalRunner::Create(io_service_)),
236236
gcs_client_(std::move(gcs_client)),
237237
status_events_() {}
238238

@@ -283,9 +283,9 @@ Status TaskEventBufferImpl::Start(bool auto_flush) {
283283
}
284284

285285
RAY_LOG(INFO) << "Reporting task events to GCS every " << report_interval_ms << "ms.";
286-
periodical_runner_.RunFnPeriodically([this] { FlushEvents(/* forced */ false); },
287-
report_interval_ms,
288-
"CoreWorker.deadline_timer.flush_task_events");
286+
periodical_runner_->RunFnPeriodically([this] { FlushEvents(/*forced= */ false); },
287+
report_interval_ms,
288+
"CoreWorker.deadline_timer.flush_task_events");
289289
return Status::OK();
290290
}
291291

src/ray/core_worker/task_event_buffer.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ class TaskEventBufferImpl : public TaskEventBuffer {
430430
std::thread io_thread_;
431431

432432
/// The runner to run function periodically.
433-
PeriodicalRunner periodical_runner_;
433+
std::shared_ptr<PeriodicalRunner> periodical_runner_;
434434

435435
/// Client to the GCS used to push profile events to it.
436436
std::shared_ptr<gcs::GcsClient> gcs_client_ ABSL_GUARDED_BY(mutex_);

0 commit comments

Comments
 (0)