Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions csrc/inc/page_allocator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ class PageAllocator {
BroadcastMapCallback broadcast_map_callback_;
BroadcastUnmapCallback broadcast_unmap_callback_;
ShouldUseWorkerIpcCallback should_use_worker_ipc_callback_;
mutable std::atomic<bool> should_use_worker_ipc_cached_{false};
};

} // namespace kvcached
27 changes: 23 additions & 4 deletions csrc/page_allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -507,8 +507,11 @@ void PageAllocator::set_broadcast_unmap_callback(

void PageAllocator::set_should_use_worker_ipc_callback(
ShouldUseWorkerIpcCallback callback) {
bool use_worker_ipc = callback ? callback() : false;
std::lock_guard<std::mutex> lock(lock_);
should_use_worker_ipc_callback_ = callback;
should_use_worker_ipc_callback_ = std::move(callback);
should_use_worker_ipc_cached_.store(use_worker_ipc,
std::memory_order_release);
LOGGER(INFO, "Should-use-worker-ipc callback set for PageAllocator");
}

Expand Down Expand Up @@ -731,10 +734,26 @@ void PageAllocator::stop_prealloc_thread_internal() {
}

bool PageAllocator::should_use_worker_ipc() const {
if (should_use_worker_ipc_callback_) {
return should_use_worker_ipc_callback_();
ShouldUseWorkerIpcCallback callback;
{
std::lock_guard<std::mutex> lock(lock_);
if (prealloc_thread_ &&
prealloc_thread_->get_id() == std::this_thread::get_id()) {
// The background prealloc thread must not re-enter Python here. It only
// consumes the cached decision, which non-prealloc callers refresh.
return should_use_worker_ipc_cached_.load(std::memory_order_acquire);
}
Comment thread
shipiyouniao marked this conversation as resolved.
callback = should_use_worker_ipc_callback_;
}
Comment thread
shipiyouniao marked this conversation as resolved.
return false;

if (callback) {
bool use_worker_ipc = callback();
should_use_worker_ipc_cached_.store(use_worker_ipc,
std::memory_order_release);
return use_worker_ipc;
}

return should_use_worker_ipc_cached_.load(std::memory_order_acquire);
}

} // namespace kvcached
Loading