Skip to content

Commit 8cec3fc

Browse files
committed
Store ids of all running actors in the registry
This is useful for debugging shutdown hangs, when an actor that does not terminate blocks the destructor of the actor system itself.
1 parent 4ff66bf commit 8cec3fc

File tree

5 files changed

+44
-19
lines changed

5 files changed

+44
-19
lines changed

libcaf_core/caf/actor_registry.hpp

+21-4
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include <string>
1212
#include <thread>
1313
#include <unordered_map>
14+
#include <unordered_set>
1415

1516
#include "caf/abstract_actor.hpp"
1617
#include "caf/actor.hpp"
@@ -53,19 +54,34 @@ class CAF_CORE_EXPORT actor_registry {
5354

5455
/// Increases running-actors-count by one.
5556
/// @returns the increased count.
56-
size_t inc_running();
57+
size_t inc_running(actor_id key);
5758

5859
/// Decreases running-actors-count by one.
5960
/// @returns the decreased count.
60-
size_t dec_running();
61+
size_t dec_running(actor_id key);
6162

6263
/// Returns the number of currently running actors.
6364
size_t running() const;
6465

65-
/// Blocks the caller until running-actors-count becomes `expected`
66-
/// (must be either 0 or 1).
66+
/// Returns the the actor ids of all currently running actors.
67+
const std::unordered_set<actor_id>& running_ids() const;
68+
69+
/// Blocks the caller until running-actors-count becomes `expected`..
6770
void await_running_count_equal(size_t expected) const;
6871

72+
/// Blocks the caller until running-actors-count becomes `expected`..
73+
/// Invokes `cb` every time the set of running actors shrinks.
74+
template <class CB>
75+
void await_running_count_equal(size_t expected, CB&& cb) const {
76+
CAF_LOG_TRACE(CAF_ARG(expected));
77+
std::unique_lock<std::mutex> guard{running_mtx_};
78+
while (running_.size() != expected) {
79+
CAF_LOG_DEBUG(CAF_ARG(running_.size()));
80+
running_cv_.wait(guard);
81+
cb();
82+
}
83+
}
84+
6985
/// Returns the actor associated with `key` or `invalid_actor`.
7086
template <class T = strong_actor_ptr>
7187
T get(const std::string& key) const {
@@ -112,6 +128,7 @@ class CAF_CORE_EXPORT actor_registry {
112128

113129
mutable std::mutex running_mtx_;
114130
mutable std::condition_variable running_cv_;
131+
std::unordered_set<actor_id> running_;
115132

116133
mutable detail::shared_spinlock instances_mtx_;
117134
entries entries_;

libcaf_core/src/abstract_actor.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -80,15 +80,15 @@ void abstract_actor::register_at_system() {
8080
if (getf(is_registered_flag))
8181
return;
8282
setf(is_registered_flag);
83-
[[maybe_unused]] auto count = home_system().registry().inc_running();
83+
[[maybe_unused]] auto count = home_system().registry().inc_running(id());
8484
CAF_LOG_DEBUG("actor" << id() << "increased running count to" << count);
8585
}
8686

8787
void abstract_actor::unregister_from_system() {
8888
if (!getf(is_registered_flag))
8989
return;
9090
unsetf(is_registered_flag);
91-
[[maybe_unused]] auto count = home_system().registry().dec_running();
91+
[[maybe_unused]] auto count = home_system().registry().dec_running(id());
9292
CAF_LOG_DEBUG("actor" << id() << "decreased running count to" << count);
9393
}
9494

libcaf_core/src/actor_registry.cpp

+18-11
Original file line numberDiff line numberDiff line change
@@ -80,29 +80,36 @@ void actor_registry::erase(actor_id key) {
8080
}
8181
}
8282

83-
size_t actor_registry::inc_running() {
84-
return ++*system_.base_metrics().running_actors;
83+
size_t actor_registry::inc_running(actor_id key) {
84+
std::unique_lock<std::mutex> guard(running_mtx_);
85+
running_.emplace(key);
86+
return running_.size();
8587
}
8688

8789
size_t actor_registry::running() const {
88-
return static_cast<size_t>(system_.base_metrics().running_actors->value());
90+
std::unique_lock<std::mutex> guard(running_mtx_);
91+
return running_.size();
8992
}
9093

91-
size_t actor_registry::dec_running() {
92-
size_t new_val = --*system_.base_metrics().running_actors;
93-
if (new_val <= 1) {
94-
std::unique_lock<std::mutex> guard(running_mtx_);
95-
running_cv_.notify_all();
96-
}
94+
const std::unordered_set<actor_id>& actor_registry::running_ids() const {
95+
std::unique_lock<std::mutex> guard(running_mtx_);
96+
return running_;
97+
}
98+
99+
size_t actor_registry::dec_running(actor_id key) {
100+
std::unique_lock<std::mutex> guard(running_mtx_);
101+
running_.erase(key);
102+
size_t new_val = running_.size();
103+
running_cv_.notify_all();
97104
return new_val;
98105
}
99106

100107
void actor_registry::await_running_count_equal(size_t expected) const {
101108
CAF_ASSERT(expected == 0 || expected == 1);
102109
CAF_LOG_TRACE(CAF_ARG(expected));
103110
std::unique_lock<std::mutex> guard{running_mtx_};
104-
while (running() != expected) {
105-
CAF_LOG_DEBUG(CAF_ARG(running()));
111+
while (running_.size() != expected) {
112+
CAF_LOG_DEBUG(CAF_ARG(running_.size()));
106113
running_cv_.wait(guard);
107114
}
108115
}

libcaf_core/src/blocking_actor.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ class blocking_actor_runner : public resumable {
132132
auto& sys = ctx->system();
133133
sys.release_private_thread(thread_);
134134
if (!hidden_) {
135-
[[maybe_unused]] auto count = sys.registry().dec_running();
135+
[[maybe_unused]] auto count = sys.registry().dec_running(self_->id());
136136
CAF_LOG_DEBUG("actor" << self_->id() << "decreased running count to"
137137
<< count);
138138
}
@@ -166,7 +166,7 @@ void blocking_actor::launch(execution_unit*, bool, bool hide) {
166166
// Note: must *not* call register_at_system() to stop actor cleanup from
167167
// decrementing the count before releasing the thread.
168168
if (!hide) {
169-
[[maybe_unused]] auto count = sys.registry().inc_running();
169+
[[maybe_unused]] auto count = sys.registry().inc_running(id());
170170
CAF_LOG_DEBUG("actor" << id() << "increased running count to" << count);
171171
}
172172
thread->resume(new blocking_actor_runner(this, thread, hide));

libcaf_io/test/detail/prometheus_broker.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ CAF_TEST(the prometheus broker responds to HTTP get requests) {
7070
string_view response{reinterpret_cast<char*>(response_buf.data()),
7171
response_buf.size()};
7272
CHECK(starts_with(response, http_ok_header));
73+
MESSAGE(response);
7374
CHECK(contains(response, "\ncaf_system_running_actors 2 "));
7475
if (detail::prometheus_broker::has_process_metrics()) {
7576
CHECK(contains(response, "\nprocess_cpu_seconds_total "));

0 commit comments

Comments
 (0)