From 06bc373acee679c4f71a722a509dee1464e47414 Mon Sep 17 00:00:00 2001 From: Max Kellermann Date: Mon, 10 Mar 2025 18:43:41 +0100 Subject: [PATCH] event/Loop: read eventfd with io_uring This eliminates a roundtrip to epoll. --- src/event/Loop.cxx | 71 +++++++++++++++++++++++++++++++++++++++++----- src/event/Loop.hxx | 6 ++++ 2 files changed, 70 insertions(+), 7 deletions(-) diff --git a/src/event/Loop.cxx b/src/event/Loop.cxx index c4962a8d08..bd3cd71a34 100644 --- a/src/event/Loop.cxx +++ b/src/event/Loop.cxx @@ -17,6 +17,46 @@ #include "io/uring/Queue.hxx" #endif +#if defined(HAVE_THREADED_EVENT_LOOP) && defined(USE_EVENTFD) && defined(HAVE_URING) + +#include + +/** + * Read from the eventfd using io_uring and invoke + * EventLoop::OnWake(). + */ +class EventLoop::UringWake final : Uring::Operation { + EventLoop &event_loop; + + eventfd_t value; + +public: + explicit UringWake(EventLoop &_event_loop) noexcept + :event_loop(_event_loop) {} + + void Start() { + assert(!IsUringPending()); + assert(event_loop.GetUring()); + + auto &queue = *event_loop.GetUring(); + + auto &s = queue.RequireSubmitEntry(); + io_uring_prep_read(&s, event_loop.wake_fd.GetSocket().Get(), &value, sizeof(value), 0); + queue.Push(s, *this); + } + +private: + void OnUringCompletion(int res) noexcept override { + if (res <= 0) + return; + + Start(); + event_loop.OnWake(); + } +}; + +#endif // USE_EVENTFD && HAVE_URING + EventLoop::EventLoop( #ifdef HAVE_THREADED_EVENT_LOOP ThreadId _thread @@ -40,6 +80,9 @@ EventLoop::~EventLoop() noexcept /* if Run() was never called (maybe because startup failed and an exception is pending), we need to destruct the Uring::Manager here or else the assertions below fail */ +#if defined(HAVE_THREADED_EVENT_LOOP) && defined(USE_EVENTFD) + uring_wake.reset(); +#endif uring_poll.reset(); uring.reset(); #endif @@ -403,7 +446,15 @@ EventLoop::Run() noexcept assert(alive || quit_injected); assert(busy); - wake_event.Schedule(SocketEvent::READ); +#if defined(USE_EVENTFD) && defined(HAVE_URING) + if (uring) { + if (!uring_wake) { + uring_wake = std::make_unique(*this); + uring_wake->Start(); + } + } else +#endif + wake_event.Schedule(SocketEvent::READ); #endif #ifdef HAVE_THREADED_EVENT_LOOP @@ -537,13 +588,9 @@ EventLoop::HandleInject() noexcept } } -void -EventLoop::OnSocketReady([[maybe_unused]] unsigned flags) noexcept +inline void +EventLoop::OnWake() noexcept { - assert(IsInside()); - - wake_fd.Read(); - if (quit_injected) { Break(); return; @@ -553,4 +600,14 @@ EventLoop::OnSocketReady([[maybe_unused]] unsigned flags) noexcept HandleInject(); } +void +EventLoop::OnSocketReady([[maybe_unused]] unsigned flags) noexcept +{ + assert(IsInside()); + + wake_fd.Read(); + + OnWake(); +} + #endif diff --git a/src/event/Loop.hxx b/src/event/Loop.hxx index d1ee5c3b9b..476b5dd2ba 100644 --- a/src/event/Loop.hxx +++ b/src/event/Loop.hxx @@ -107,6 +107,11 @@ class EventLoop final #endif // HAVE_URING #ifdef HAVE_THREADED_EVENT_LOOP +#if defined(USE_EVENTFD) && defined(HAVE_URING) + class UringWake; + std::unique_ptr uring_wake; +#endif + /** * A reference to the thread that is currently inside Run(). */ @@ -339,6 +344,7 @@ private: void Wait(Event::Duration timeout) noexcept; #ifdef HAVE_THREADED_EVENT_LOOP + void OnWake() noexcept; void OnSocketReady(unsigned flags) noexcept; #endif