forked from envoyproxy/envoy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththread_local_impl.cc
231 lines (193 loc) · 8.71 KB
/
thread_local_impl.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
#include "source/common/thread_local/thread_local_impl.h"
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <list>
#include "envoy/event/dispatcher.h"
#include "source/common/common/assert.h"
#include "source/common/common/stl_helpers.h"
namespace Envoy {
namespace ThreadLocal {
thread_local InstanceImpl::ThreadLocalData InstanceImpl::thread_local_data_;
InstanceImpl::~InstanceImpl() {
ASSERT_IS_MAIN_OR_TEST_THREAD();
ASSERT(shutdown_);
thread_local_data_.data_.clear();
}
SlotPtr InstanceImpl::allocateSlot() {
ASSERT_IS_MAIN_OR_TEST_THREAD();
ASSERT(!shutdown_);
if (free_slot_indexes_.empty()) {
SlotPtr slot = std::make_unique<SlotImpl>(*this, slots_.size());
slots_.push_back(slot.get());
return slot;
}
const uint32_t idx = free_slot_indexes_.front();
free_slot_indexes_.pop_front();
ASSERT(idx < slots_.size());
SlotPtr slot = std::make_unique<SlotImpl>(*this, idx);
slots_[idx] = slot.get();
return slot;
}
InstanceImpl::SlotImpl::SlotImpl(InstanceImpl& parent, uint32_t index)
: parent_(parent), index_(index), still_alive_guard_(std::make_shared<bool>(true)) {}
Event::PostCb InstanceImpl::SlotImpl::wrapCallback(const Event::PostCb& cb) {
// See the header file comments for still_alive_guard_ for the purpose of this capture and the
// expired check below.
//
// Note also that this logic is duplicated below and dataCallback(), rather
// than incurring another lambda redirection.
return [still_alive_guard = std::weak_ptr<bool>(still_alive_guard_), cb] {
if (!still_alive_guard.expired()) {
cb();
}
};
}
bool InstanceImpl::SlotImpl::currentThreadRegisteredWorker(uint32_t index) {
return thread_local_data_.data_.size() > index;
}
bool InstanceImpl::SlotImpl::currentThreadRegistered() {
return currentThreadRegisteredWorker(index_);
}
ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::getWorker(uint32_t index) {
ASSERT(currentThreadRegisteredWorker(index));
return thread_local_data_.data_[index];
}
ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::get() { return getWorker(index_); }
Event::PostCb InstanceImpl::SlotImpl::dataCallback(const UpdateCb& cb) {
// See the header file comments for still_alive_guard_ for why we capture index_.
return [still_alive_guard = std::weak_ptr<bool>(still_alive_guard_), cb, index = index_] {
// This duplicates logic in wrapCallback() (above). Using wrapCallback also
// works, but incurs another indirection of lambda at runtime. As the
// duplicated logic is only an if-statement and a bool function, it doesn't
// seem worth factoring that out to a helper function.
if (!still_alive_guard.expired()) {
cb(getWorker(index));
}
};
}
void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb, const Event::PostCb& complete_cb) {
parent_.runOnAllThreads(dataCallback(cb), complete_cb);
}
void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb) {
parent_.runOnAllThreads(dataCallback(cb));
}
void InstanceImpl::SlotImpl::set(InitializeCb cb) {
ASSERT_IS_MAIN_OR_TEST_THREAD();
ASSERT(!parent_.shutdown_);
for (Event::Dispatcher& dispatcher : parent_.registered_threads_) {
// See the header file comments for still_alive_guard_ for why we capture index_.
dispatcher.post(wrapCallback(
[index = index_, cb, &dispatcher]() -> void { setThreadLocal(index, cb(dispatcher)); }));
}
// Handle main thread.
setThreadLocal(index_, cb(*parent_.main_thread_dispatcher_));
}
void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_thread) {
ASSERT_IS_MAIN_OR_TEST_THREAD();
ASSERT(!shutdown_);
if (main_thread) {
main_thread_dispatcher_ = &dispatcher;
thread_local_data_.dispatcher_ = &dispatcher;
} else {
ASSERT(!containsReference(registered_threads_, dispatcher));
registered_threads_.push_back(dispatcher);
dispatcher.post([&dispatcher] { thread_local_data_.dispatcher_ = &dispatcher; });
}
}
void InstanceImpl::removeSlot(uint32_t slot) {
ASSERT_IS_MAIN_OR_TEST_THREAD();
// When shutting down, we do not post slot removals to other threads. This is because the other
// threads have already shut down and the dispatcher is no longer alive. There is also no reason
// to do removal, because no allocations happen during shutdown and shutdownThread() will clean
// things up on the other thread.
if (shutdown_) {
return;
}
slots_[slot] = nullptr;
ASSERT(std::find(free_slot_indexes_.begin(), free_slot_indexes_.end(), slot) ==
free_slot_indexes_.end(),
fmt::format("slot index {} already in free slot set!", slot));
free_slot_indexes_.push_back(slot);
runOnAllThreads([slot]() -> void {
// This runs on each thread and clears the slot, making it available for a new allocations.
// This is safe even if a new allocation comes in, because everything happens with post() and
// will be sequenced after this removal. It is also safe if there are callbacks pending on
// other threads because they will run first.
if (slot < thread_local_data_.data_.size()) {
thread_local_data_.data_[slot] = nullptr;
}
});
}
void InstanceImpl::runOnAllThreads(Event::PostCb cb) {
ASSERT_IS_MAIN_OR_TEST_THREAD();
ASSERT(!shutdown_);
for (Event::Dispatcher& dispatcher : registered_threads_) {
dispatcher.post(cb);
}
// Handle main thread.
cb();
}
void InstanceImpl::runOnAllThreads(Event::PostCb cb, Event::PostCb all_threads_complete_cb) {
ASSERT_IS_MAIN_OR_TEST_THREAD();
ASSERT(!shutdown_);
// Handle main thread first so that when the last worker thread wins, we could just call the
// all_threads_complete_cb method. Parallelism of main thread execution is being traded off
// for programming simplicity here.
cb();
Event::PostCbSharedPtr cb_guard(new Event::PostCb(cb),
[this, all_threads_complete_cb](Event::PostCb* cb) {
main_thread_dispatcher_->post(all_threads_complete_cb);
delete cb;
});
for (Event::Dispatcher& dispatcher : registered_threads_) {
dispatcher.post([cb_guard]() -> void { (*cb_guard)(); });
}
}
void InstanceImpl::setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr object) {
if (thread_local_data_.data_.size() <= index) {
thread_local_data_.data_.resize(index + 1);
}
thread_local_data_.data_[index] = object;
}
void InstanceImpl::shutdownGlobalThreading() {
ASSERT_IS_MAIN_OR_TEST_THREAD();
ASSERT(!shutdown_);
shutdown_ = true;
}
void InstanceImpl::shutdownThread() {
ASSERT(shutdown_);
// Destruction of slots is done in *reverse* order. This is so that filters and higher layer
// things that are built on top of the cluster manager, stats, etc. will be destroyed before
// more base layer things. The reason reverse ordering is done is to deal with the case that leaf
// objects depend in some way on "persistent" objects (particularly the cluster manager) that are
// created very early on with a known slot number and never destroyed until shutdown. For example,
// if we chose to create persistent per-thread gRPC clients we would potentially run into shutdown
// issues if that thing got destroyed after the cluster manager. This happens in practice
// currently when a redis connection pool is destroyed and removes its member update callback from
// the backing cluster. Examples of things with TLS that are created early on and are never
// destroyed until server shutdown are stats, runtime, and the cluster manager (see server.cc).
//
// It's possible this might need to become more complicated later but it's OK for now. Note that
// this is always safe to do because:
// 1) All slot updates come in via post().
// 2) No updates or removals will come in during shutdown().
//
// TODO(mattklein123): Deletion should really be in reverse *allocation* order. This could be
// implemented relatively easily by keeping a parallel list of slot #s. This
// would fix the case where something allocates two slots, but is interleaved
// with a deletion, such that the second allocation is actually a lower slot
// number than the first. This is an edge case that does not exist anywhere
// in the code today, but we can keep this in mind if things become more
// complicated in the future.
for (auto it = thread_local_data_.data_.rbegin(); it != thread_local_data_.data_.rend(); ++it) {
it->reset();
}
thread_local_data_.data_.clear();
}
Event::Dispatcher& InstanceImpl::dispatcher() {
ASSERT(thread_local_data_.dispatcher_ != nullptr);
return *thread_local_data_.dispatcher_;
}
} // namespace ThreadLocal
} // namespace Envoy