diff --git a/include/pistache/config.h b/include/pistache/config.h index 4ef164610..faaee662e 100644 --- a/include/pistache/config.h +++ b/include/pistache/config.h @@ -15,10 +15,11 @@ // Allow compile-time overload namespace Pistache::Const { - static constexpr size_t MaxBacklog = 128; - static constexpr size_t MaxEvents = 1024; - static constexpr size_t MaxBuffer = 4096; - static constexpr size_t DefaultWorkers = 1; + static constexpr size_t MaxBacklog = 128; + static constexpr size_t MaxEvents = 1024; + static constexpr size_t MaxBuffer = 4096; + static constexpr size_t DefaultAcceptors = 1; + static constexpr size_t DefaultWorkers = 1; static constexpr size_t DefaultTimerPoolSize = 128; diff --git a/include/pistache/endpoint.h b/include/pistache/endpoint.h index b63a7a49b..eaf186a1b 100644 --- a/include/pistache/endpoint.h +++ b/include/pistache/endpoint.h @@ -29,6 +29,8 @@ namespace Pistache::Http { friend class Endpoint; + Options& acceptThreads(int val); + Options& acceptThreadsName(const std::string& val); Options& threads(int val); Options& threadsName(const std::string& val); @@ -78,6 +80,10 @@ namespace Pistache::Http maxPayload(size_t val); private: + // Accept thread options + int acceptThreads_; + std::string acceptThreadName_; + // Thread options int threads_; std::string threadsName_; diff --git a/include/pistache/listener.h b/include/pistache/listener.h index c86604f03..9d099f549 100644 --- a/include/pistache/listener.h +++ b/include/pistache/listener.h @@ -62,10 +62,12 @@ namespace Pistache::Tcp explicit Listener(const Address& address); void init(size_t workers, - Flags options = Flags(Options::None), - const std::string& workersName = "", - int backlog = Const::MaxBacklog, - PISTACHE_STRING_LOGGER_T logger = PISTACHE_NULL_STRING_LOGGER); + Flags options = Flags(Options::None), + const std::string& workersName = "", + int backlog = Const::MaxBacklog, + size_t acceptors = Const::DefaultAcceptors, + const std::string& acceptorsName = "", + PISTACHE_STRING_LOGGER_T logger = PISTACHE_NULL_STRING_LOGGER); void setTransportFactory(TransportFactory factory); void setHandler(const std::shared_ptr& handler); @@ -105,6 +107,11 @@ namespace Pistache::Tcp Flags options_; std::thread acceptThread; + size_t acceptors_ = Const::DefaultAcceptors; + std::string acceptorsName_; + std::vector acceptWorkers; + std::atomic_int activeAcceptors { 0 }; + size_t workers_ = Const::DefaultWorkers; std::string workersName_; std::shared_ptr handler_; @@ -117,6 +124,7 @@ namespace Pistache::Tcp TransportFactory defaultTransportFactory() const; bool bindListener(const struct addrinfo* addr); + void acceptWorkerFn(); void handleNewConnection(); em_socket_t acceptConnection(struct sockaddr_storage& peer_addr) const; diff --git a/src/common/reactor.cc b/src/common/reactor.cc index 1645f8499..539ed6c9c 100644 --- a/src/common/reactor.cc +++ b/src/common/reactor.cc @@ -40,22 +40,22 @@ static std::atomic_bool lLoggedSetThreadDescriptionFail = false; #ifdef __MINGW32__ -#include #include // for GetProcAddress and GetModuleHandleA -typedef HRESULT (WINAPI *TSetThreadDescription)(HANDLE, PCWSTR); +#include +typedef HRESULT(WINAPI* TSetThreadDescription)(HANDLE, PCWSTR); static std::atomic_bool lSetThreadDescriptionLoaded = false; static std::mutex lSetThreadDescriptionLoadMutex; static TSetThreadDescription lSetThreadDescriptionPtr = nullptr; -TSetThreadDescription getSetThreadDescriptionPtr() +static TSetThreadDescription getSetThreadDescriptionPtr() { if (lSetThreadDescriptionLoaded) - return(lSetThreadDescriptionPtr); + return (lSetThreadDescriptionPtr); GUARD_AND_DBG_LOG(lSetThreadDescriptionLoadMutex); if (lSetThreadDescriptionLoaded) - return(lSetThreadDescriptionPtr); + return (lSetThreadDescriptionPtr); HMODULE hKernelBase = GetModuleHandleA("KernelBase.dll"); @@ -64,18 +64,15 @@ TSetThreadDescription getSetThreadDescriptionPtr() PS_LOG_WARNING( "Failed to get KernelBase.dll for SetThreadDescription"); lSetThreadDescriptionLoaded = true; - return(nullptr); + return (nullptr); } - FARPROC set_thread_desc_fpptr = - GetProcAddress(hKernelBase, "SetThreadDescription"); + FARPROC set_thread_desc_fpptr = GetProcAddress(hKernelBase, "SetThreadDescription"); // We do the cast in two steps, otherwise mingw-gcc complains about // incompatible types - void * set_thread_desc_vptr = - reinterpret_cast(set_thread_desc_fpptr); - lSetThreadDescriptionPtr = - reinterpret_cast(set_thread_desc_vptr); + void* set_thread_desc_vptr = reinterpret_cast(set_thread_desc_fpptr); + lSetThreadDescriptionPtr = reinterpret_cast(set_thread_desc_vptr); lSetThreadDescriptionLoaded = true; if (!lSetThreadDescriptionPtr) @@ -83,7 +80,7 @@ TSetThreadDescription getSetThreadDescriptionPtr() PS_LOG_WARNING( "Failed to get SetThreadDescription from KernelBase.dll"); } - return(lSetThreadDescriptionPtr); + return (lSetThreadDescriptionPtr); } #endif // of ifdef __MINGW32__ #endif // of ifdef _IS_WINDOWS @@ -299,17 +296,16 @@ namespace Pistache::Aio break; case 0: break; - default: - { - if (shutdown_) - return; + default: { + if (shutdown_) + return; - GUARD_AND_DBG_LOG(shutdown_mutex_); - if (shutdown_) - return; + GUARD_AND_DBG_LOG(shutdown_mutex_); + if (shutdown_) + return; - handleFds(std::move(events)); - } + handleFds(std::move(events)); + } } } } @@ -454,11 +450,8 @@ namespace Pistache::Aio // encode the index of the handler is that in the fast path, we // won't need to shift the value to retrieve the fd if there is // only one handler as all the bits will already be set to 0. - auto encodedValue = - (index << HandlerShift) | - PS_FD_CAST_TO_UNUM(uint64_t, static_cast(value)); - Polling::TagValue encodedValueTV = - static_cast(PS_NUM_CAST_TO_FD(encodedValue)); + auto encodedValue = (index << HandlerShift) | PS_FD_CAST_TO_UNUM(uint64_t, static_cast(value)); + Polling::TagValue encodedValueTV = static_cast(PS_NUM_CAST_TO_FD(encodedValue)); return Polling::Tag(encodedValueTV); } @@ -468,8 +461,7 @@ namespace Pistache::Aio auto value = tag.valueU64(); size_t index = value >> HandlerShift; uint64_t maskedValue = value & DataMask; - Polling::TagValue maskedValueTV = - static_cast(PS_NUM_CAST_TO_FD(maskedValue)); + Polling::TagValue maskedValueTV = static_cast(PS_NUM_CAST_TO_FD(maskedValue)); return std::make_pair(index, maskedValueTV); } @@ -720,13 +712,12 @@ namespace Pistache::Aio #ifdef _IS_WINDOWS const std::string threads_name(threadsName_.substr(0, 15)); const std::wstring temp(threads_name.begin(), - threads_name.end()); + threads_name.end()); const LPCWSTR wide_threads_name = temp.c_str(); HRESULT hr = E_NOTIMPL; #ifdef __MINGW32__ - TSetThreadDescription set_thread_description_ptr = - getSetThreadDescriptionPtr(); + TSetThreadDescription set_thread_description_ptr = getSetThreadDescriptionPtr(); if (set_thread_description_ptr) { hr = set_thread_description_ptr( diff --git a/src/server/endpoint.cc b/src/server/endpoint.cc index 7510e25ab..a28279400 100644 --- a/src/server/endpoint.cc +++ b/src/server/endpoint.cc @@ -251,7 +251,8 @@ namespace Pistache::Http } Endpoint::Options::Options() - : threads_(1) + : acceptThreads_(1) + , threads_(1) , flags_() , backlog_(Const::MaxBacklog) , maxRequestSize_(Const::DefaultMaxRequestSize) @@ -264,6 +265,18 @@ namespace Pistache::Http , sslHandshakeTimeout_(Const::DefaultSSLHandshakeTimeout) { } + Endpoint::Options& Endpoint::Options::acceptThreads(int val) + { + acceptThreads_ = val; + return *this; + } + + Endpoint::Options& Endpoint::Options::acceptThreadsName(const std::string& val) + { + acceptThreadName_ = val; + return *this; + } + Endpoint::Options& Endpoint::Options::threads(int val) { threads_ = val; @@ -319,7 +332,9 @@ namespace Pistache::Http void Endpoint::init(const Endpoint::Options& options) { - listener.init(options.threads_, options.flags_, options.threadsName_, options.backlog_); + listener.init(options.threads_, options.flags_, options.threadsName_, options.backlog_, + options.acceptThreads_, options.acceptThreadName_); + listener.setTransportFactory([this, options] { if (!handler_) throw std::runtime_error("Must call setHandler()"); diff --git a/src/server/listener.cc b/src/server/listener.cc index f38d11b44..3e6f1add3 100644 --- a/src/server/listener.cc +++ b/src/server/listener.cc @@ -59,6 +59,68 @@ #endif /* PISTACHE_USE_SSL */ +#ifdef _IS_BSD +// For pthread_set_name_np +#include PST_THREAD_HDR +#ifndef __NetBSD__ +#include +#endif +#endif + +#ifdef _IS_WINDOWS +#include // Needed for PST_THREAD_HDR (processthreadsapi.h) +#include PST_THREAD_HDR // for SetThreadDescription +#endif + +#ifdef _IS_WINDOWS +static std::atomic_bool lLoggedSetThreadDescriptionFail = false; +#ifdef __MINGW32__ + +#include // for GetProcAddress and GetModuleHandleA +#include +typedef HRESULT(WINAPI* TSetThreadDescription)(HANDLE, PCWSTR); + +static std::atomic_bool lSetThreadDescriptionLoaded = false; +static std::mutex lSetThreadDescriptionLoadMutex; +static TSetThreadDescription lSetThreadDescriptionPtr = nullptr; + +static TSetThreadDescription getSetThreadDescriptionPtr() +{ + if (lSetThreadDescriptionLoaded) + return (lSetThreadDescriptionPtr); + + GUARD_AND_DBG_LOG(lSetThreadDescriptionLoadMutex); + if (lSetThreadDescriptionLoaded) + return (lSetThreadDescriptionPtr); + + HMODULE hKernelBase = GetModuleHandleA("KernelBase.dll"); + + if (!hKernelBase) + { + PS_LOG_WARNING( + "Failed to get KernelBase.dll for SetThreadDescription"); + lSetThreadDescriptionLoaded = true; + return (nullptr); + } + + FARPROC set_thread_desc_fpptr = GetProcAddress(hKernelBase, "SetThreadDescription"); + + // We do the cast in two steps, otherwise mingw-gcc complains about + // incompatible types + void* set_thread_desc_vptr = reinterpret_cast(set_thread_desc_fpptr); + lSetThreadDescriptionPtr = reinterpret_cast(set_thread_desc_vptr); + + lSetThreadDescriptionLoaded = true; + if (!lSetThreadDescriptionPtr) + { + PS_LOG_WARNING( + "Failed to get SetThreadDescription from KernelBase.dll"); + } + return (lSetThreadDescriptionPtr); +} +#endif // of ifdef __MINGW32__ +#endif // of ifdef _IS_WINDOWS + using namespace std::chrono_literals; namespace Pistache::Tcp @@ -306,6 +368,7 @@ namespace Pistache::Tcp void Listener::init(size_t workers, Flags options, const std::string& workersName, int backlog, + size_t acceptors, const std::string& acceptorsName, PISTACHE_STRING_LOGGER_T logger) { if (workers > hardware_concurrency()) @@ -313,12 +376,14 @@ namespace Pistache::Tcp // Log::warning() << "More workers than available cores" } - options_ = options; - backlog_ = backlog; - useSSL_ = false; - workers_ = workers; - workersName_ = workersName; - logger_ = logger; + options_ = options; + backlog_ = backlog; + useSSL_ = false; + workers_ = workers; + workersName_ = workersName; + acceptors_ = acceptors; + acceptorsName_ = acceptorsName; + logger_ = logger; } void Listener::setTransportFactory(TransportFactory factory) @@ -555,14 +620,102 @@ namespace Pistache::Tcp shutdownFd.bind(poller); reactor_->run(); + PS_LOG_DEBUG("shutdownFd.bind done"); + + for (size_t i = 0; i < acceptors_; i++) + { + acceptWorkers.emplace_back([this]() { + PS_TIMEDBG_START; + + if (!acceptorsName_.empty()) + { + PS_LOG_DEBUG("Setting thread name/description"); +#ifdef _IS_WINDOWS + const std::string threads_name(acceptorsName_.substr(0, 15)); + const std::wstring temp(threads_name.begin(), + threads_name.end()); + const LPCWSTR wide_threads_name = temp.c_str(); + + HRESULT hr = E_NOTIMPL; +#ifdef __MINGW32__ + TSetThreadDescription set_thread_description_ptr = getSetThreadDescriptionPtr(); + if (set_thread_description_ptr) + { + hr = set_thread_description_ptr( + GetCurrentThread(), wide_threads_name); + } +#else + hr = SetThreadDescription(GetCurrentThread(), + wide_threads_name); +#endif + if ((FAILED(hr)) && (!lLoggedSetThreadDescriptionFail)) + { + lLoggedSetThreadDescriptionFail = true; + // Log it just once + PS_LOG_INFO("SetThreadDescription failed"); + } +#else +#if defined _IS_BSD && !defined __NetBSD__ + pthread_set_name_np( +#else + pthread_setname_np( +#endif +#ifndef __APPLE__ + // Apple's macOS version of pthread_setname_np + // takes only "const char * name" as parm + // (Nov/2023), and assumes that the thread is the + // calling thread. Note that pthread_self returns + // calling thread in Linux, so this amounts to + // the same thing in the end + // It appears older FreeBSD (2003 ?) also behaves + // as per macOS, while newer FreeBSD (2021 ?) + // behaves as per Linux + pthread_self(), +#endif +#ifdef __NetBSD__ + "%s", // NetBSD has 3 parms for pthread_setname_np + (void*)/*cast away const for NetBSD*/ +#endif + acceptorsName_.substr(0, 15) + .c_str()); +#endif // of ifdef _IS_WINDOWS... else... + } + PS_LOG_DEBUG("Calling this->run()"); + this->acceptWorkerFn(); + }); + ++activeAcceptors; + } + + for (auto& t : acceptWorkers) + t.join(); + } + + void Listener::runThreaded() + { + PS_TIMEDBG_START; + + shutdownFd.bind(poller); + PS_LOG_DEBUG("shutdownFd.bind done"); + + acceptThread = std::thread([this]() { + PS_TIMEDBG_START; + this->run(); + }); + } + + void Listener::acceptWorkerFn() + { for (;;) { - { // encapsulate l_guard(poller.reg_unreg_mutex_) - // See comment in class Epoll regarding reg_unreg_mutex_ + { PS_TIMEDBG_START; - std::mutex& poller_reg_unreg_mutex(poller.reg_unreg_mutex_); - GUARD_AND_DBG_LOG(poller_reg_unreg_mutex); + // poller only has 2 fds added/removed in its life time: + // 1. The listening socket + // 2. The shutdown fd + // There won't be any case a fd being processed is removed + // from poller, we don't need this lock + // std::mutex& poller_reg_unreg_mutex(poller.reg_unreg_mutex_); std::vector events; int ready_fds = poller.poll(events); @@ -572,10 +725,14 @@ namespace Pistache::Tcp PS_LOG_DEBUG("Polling failed"); throw Error::system("Polling"); } + for (const auto& event : events) { if (event.tag == shutdownFd.tag()) + { + --activeAcceptors; return; + } if (event.flags.hasFlag(Polling::NotifyOn::Read)) { @@ -596,6 +753,7 @@ namespace Pistache::Tcp PS_LOG_WARNING("Server error"); PISTACHE_LOG_STRING_FATAL( logger_, "Server error: " << ex.what()); + --activeAcceptors; throw; } } @@ -605,26 +763,21 @@ namespace Pistache::Tcp } } - void Listener::runThreaded() - { - PS_TIMEDBG_START; - - shutdownFd.bind(poller); - PS_LOG_DEBUG("shutdownFd.bind done"); - - acceptThread = std::thread([this]() { - PS_TIMEDBG_START; - this->run(); - }); - } - void Listener::shutdown() { if (shutdownFd.isBound()) { PS_TIMEDBG_START_CURLY; - shutdownFd.notify(); + while (activeAcceptors) + { + for (size_t i = 0; i < activeAcceptors; i++) + { + shutdownFd.notify(); + std::this_thread::yield(); + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } } if (reactor_) @@ -731,9 +884,8 @@ namespace Pistache::Tcp #ifdef _IS_WINDOWS - unsigned long int timeout_in_ms = - static_cast( - std::chrono::duration_cast< + unsigned long int timeout_in_ms = static_cast( + std::chrono::duration_cast< std::chrono::milliseconds>(sslHandshakeTimeout_) .count()); diff --git a/version.txt b/version.txt index 7f67c6c45..6a9f81f6b 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.5.6.20250328 +0.6.0.20250508