From 799a2f1f7c136a04d2bd9687f699b2fc504506ec Mon Sep 17 00:00:00 2001 From: craigminihan Date: Thu, 1 Oct 2015 19:29:54 +0100 Subject: [PATCH 1/8] Added thread pool thread start/stop handlers --- tests/thread_pool.t.cpp | 28 ++++++++++++++++++++++++++++ thread_pool/thread_pool.hpp | 14 +++++++++++++- thread_pool/worker.hpp | 25 ++++++++++++++++++++----- 3 files changed, 61 insertions(+), 6 deletions(-) diff --git a/tests/thread_pool.t.cpp b/tests/thread_pool.t.cpp index 518808b4..0a2c083d 100644 --- a/tests/thread_pool.t.cpp +++ b/tests/thread_pool.t.cpp @@ -4,6 +4,7 @@ #include #include #include +#include int main() { std::cout << "*** Testing ThreadPool ***" << std::endl; @@ -50,5 +51,32 @@ int main() { } catch (const my_exception &e) { } }); + + doTest("post job to threadpool with onStart/onStop", []() { + std::atomic someValue{0}; + ThreadPoolOptions options; + options.onStart = [&someValue](){ ++someValue; }; + options.onStop = [&someValue](){ --someValue; }; + + if (true) { + ThreadPool pool{options}; + + std::packaged_task t([&someValue](){ + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + return someValue.load(); + }); + + std::future r = t.get_future(); + + pool.post(t); + + const auto result = r.get(); + + ASSERT(0 < result); + ASSERT(pool.getWorkerCount() == result); + } + + ASSERT(0 == someValue); + }); } diff --git a/thread_pool/thread_pool.hpp b/thread_pool/thread_pool.hpp index 2abcf070..baf72d71 100644 --- a/thread_pool/thread_pool.hpp +++ b/thread_pool/thread_pool.hpp @@ -16,6 +16,8 @@ struct ThreadPoolOptions { enum {AUTODETECT = 0}; size_t threads_count = AUTODETECT; size_t worker_queue_size = 1024; + Worker::OnStart onStart; + Worker::OnStop onStop; }; /** @@ -58,6 +60,12 @@ class ThreadPool { */ template ::type> typename std::future process(Handler &&handler); + + /** + * @brief getWorkerCount Returns the number of workers created by the thread pool + * @return Worker ID. + */ + size_t getWorkerCount() const; private: ThreadPool(const ThreadPool&) = delete; @@ -92,7 +100,7 @@ inline ThreadPool::ThreadPool(const ThreadPoolOptions &options) for (size_t i = 0; i < m_workers.size(); ++i) { Worker *steal_donor = m_workers[(i + 1) % m_workers.size()].get(); - m_workers[i]->start(i, steal_donor); + m_workers[i]->start(i, steal_donor, options.onStart, options.onStop); } } @@ -141,5 +149,9 @@ inline Worker & ThreadPool::getWorker() return *m_workers[id]; } +inline size_t ThreadPool::getWorkerCount() const { + return m_workers.size(); +} + #endif diff --git a/thread_pool/worker.hpp b/thread_pool/worker.hpp index 356afe6f..eae5850b 100644 --- a/thread_pool/worker.hpp +++ b/thread_pool/worker.hpp @@ -15,6 +15,9 @@ class Worker { public: typedef FixedFunction Task; + + using OnStart = std::function; + using OnStop = std::function; /** * @brief Worker Constructor. @@ -26,8 +29,10 @@ class Worker { * @brief start Create the executing thread and start tasks execution. * @param id Worker ID. * @param steal_donor Sibling worker to steal task from it. + * @param onStart A handler which is executed when each thread pool thread starts + * @param onStop A handler which is executed when each thread pool thread stops */ - void start(size_t id, Worker *steal_donor); + void start(size_t id, Worker *steal_donor, OnStart onStart, OnStop onStop); /** * @brief stop Stop all worker's thread and stealing activity. @@ -64,8 +69,10 @@ class Worker { * @brief threadFunc Executing thread function. * @param id Worker ID to be associated with this thread. * @param steal_donor Sibling worker to steal task from it. + * @param onStart A handler which is executed when each thread pool thread starts + * @param onStop A handler which is executed when each thread pool thread stops */ - void threadFunc(size_t id, Worker *steal_donor); + void threadFunc(size_t id, Worker *steal_donor, OnStart onStart, OnStop onStop); MPMCBoundedQueue m_queue; std::atomic m_running_flag; @@ -87,9 +94,9 @@ inline void Worker::stop() m_thread.join(); } -inline void Worker::start(size_t id, Worker *steal_donor) +inline void Worker::start(size_t id, Worker *steal_donor, OnStart onStart, OnStop onStop) { - m_thread = std::thread(&Worker::threadFunc, this, id, steal_donor); + m_thread = std::thread(&Worker::threadFunc, this, id, steal_donor, onStart, onStop); } inline static size_t * thread_id() @@ -114,9 +121,13 @@ inline bool Worker::steal(Task &task) return m_queue.pop(task); } -inline void Worker::threadFunc(size_t id, Worker *steal_donor) +inline void Worker::threadFunc(size_t id, Worker *steal_donor, OnStart onStart, OnStop onStop) { *thread_id() = id; + + if (onStart) { + try { onStart(); } catch (...) {} + } Task handler; @@ -126,6 +137,10 @@ inline void Worker::threadFunc(size_t id, Worker *steal_donor) } else { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } + + if (onStop) { + try { onStop(); } catch (...) {} + } } #endif From 51973df2c12c08d72b1445ce83860caae44bc8d6 Mon Sep 17 00:00:00 2001 From: craigminihan Date: Thu, 1 Oct 2015 22:42:42 +0100 Subject: [PATCH 2/8] Initialize the thread count with the number of cores rather than zero * also removed the zero/core logic from the constructor --- thread_pool/thread_pool.hpp | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/thread_pool/thread_pool.hpp b/thread_pool/thread_pool.hpp index baf72d71..ca6fd617 100644 --- a/thread_pool/thread_pool.hpp +++ b/thread_pool/thread_pool.hpp @@ -13,8 +13,7 @@ * @brief The ThreadPoolOptions struct provides construction options for ThreadPool. */ struct ThreadPoolOptions { - enum {AUTODETECT = 0}; - size_t threads_count = AUTODETECT; + size_t threads_count{std::thread::hardware_concurrency()}; size_t worker_queue_size = 1024; Worker::OnStart onStart; Worker::OnStop onStop; @@ -85,10 +84,6 @@ inline ThreadPool::ThreadPool(const ThreadPoolOptions &options) { size_t workers_count = options.threads_count; - if (ThreadPoolOptions::AUTODETECT == options.threads_count) { - workers_count = std::thread::hardware_concurrency(); - } - if (0 == workers_count) { workers_count = 1; } From 4b45449733100288957ed304e42808f2ab0a4f37 Mon Sep 17 00:00:00 2001 From: craigminihan Date: Mon, 5 Oct 2015 15:40:34 +0100 Subject: [PATCH 3/8] thread-pool-cpp now better handles multiple compilation units --- thread_pool/worker.cpp | 7 +++++++ thread_pool/worker.hpp | 20 ++++++++++---------- 2 files changed, 17 insertions(+), 10 deletions(-) create mode 100644 thread_pool/worker.cpp diff --git a/thread_pool/worker.cpp b/thread_pool/worker.cpp new file mode 100644 index 00000000..6724ef0d --- /dev/null +++ b/thread_pool/worker.cpp @@ -0,0 +1,7 @@ +#include + +size_t * Worker::thread_id() +{ + static thread_local size_t tss_id = -1u; + return &tss_id; +} diff --git a/thread_pool/worker.hpp b/thread_pool/worker.hpp index eae5850b..e46f0d77 100644 --- a/thread_pool/worker.hpp +++ b/thread_pool/worker.hpp @@ -16,8 +16,8 @@ class Worker { public: typedef FixedFunction Task; - using OnStart = std::function; - using OnStop = std::function; + using OnStart = std::function; + using OnStop = std::function; /** * @brief Worker Constructor. @@ -64,6 +64,12 @@ class Worker { private: Worker(const Worker&) = delete; Worker & operator=(const Worker&) = delete; + + /** + * @brief thread_id Return worker ID associated with current thread if exists. + * @return Worker ID. + */ + static size_t * thread_id(); /** * @brief threadFunc Executing thread function. @@ -99,12 +105,6 @@ inline void Worker::start(size_t id, Worker *steal_donor, OnStart onStart, OnSto m_thread = std::thread(&Worker::threadFunc, this, id, steal_donor, onStart, onStop); } -inline static size_t * thread_id() -{ - static thread_local size_t tss_id = -1u; - return &tss_id; -} - inline size_t Worker::getWorkerIdForCurrentThread() { return *thread_id(); @@ -126,7 +126,7 @@ inline void Worker::threadFunc(size_t id, Worker *steal_donor, OnStart onStart, *thread_id() = id; if (onStart) { - try { onStart(); } catch (...) {} + try { onStart(id); } catch (...) {} } Task handler; @@ -139,7 +139,7 @@ inline void Worker::threadFunc(size_t id, Worker *steal_donor, OnStart onStart, } if (onStop) { - try { onStop(); } catch (...) {} + try { onStop(id); } catch (...) {} } } From feb4c6731ef5473bd68c14189c95ddcf2b50d36b Mon Sep 17 00:00:00 2001 From: craigminihan Date: Tue, 3 Nov 2015 12:49:31 +0000 Subject: [PATCH 4/8] Increased the size of the worker closure buffer --- thread_pool/worker.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thread_pool/worker.hpp b/thread_pool/worker.hpp index e46f0d77..f4f17397 100644 --- a/thread_pool/worker.hpp +++ b/thread_pool/worker.hpp @@ -14,7 +14,7 @@ */ class Worker { public: - typedef FixedFunction Task; + typedef FixedFunction Task; using OnStart = std::function; using OnStop = std::function; From a92ee6f0782ef673fd9e371b2fe0614a663afadf Mon Sep 17 00:00:00 2001 From: David Hirvonen Date: Wed, 3 Feb 2016 09:08:21 -0500 Subject: [PATCH 5/8] Added conditionally compiled ATTRIBUTES_TLS alternative to thread_local Initiially for XCode build, but seems to be provide portability: http://stackoverflow.com/a/25393790/5724090 --- thread_pool/worker.hpp | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/thread_pool/worker.hpp b/thread_pool/worker.hpp index 3d51da5c..270bc23b 100644 --- a/thread_pool/worker.hpp +++ b/thread_pool/worker.hpp @@ -1,6 +1,16 @@ #ifndef WORKER_HPP #define WORKER_HPP +// http://stackoverflow.com/a/25393790/5724090 +// Static/global variable exists in a per-thread context (thread local storage). +#if defined (__GNUC__) + #define ATTRIBUTE_TLS __thread +#elif defined (_MSC_VER) + #define ATTRIBUTE_TLS __declspec(thread) +#else // !__GNUC__ && !_MSC_VER + #error "Define a thread local storage qualifier for your compiler/platform!" +#endif + #include #include #include @@ -78,7 +88,7 @@ class Worker { namespace detail { inline size_t * thread_id() { - static thread_local size_t tss_id = -1u; + static ATTRIBUTE_TLS size_t tss_id = -1u; return &tss_id; } } From ac13343b7b28d5daef6942492277362f64f1cb09 Mon Sep 17 00:00:00 2001 From: David Hirvonen Date: Wed, 3 Feb 2016 09:23:47 -0500 Subject: [PATCH 6/8] Add conditional static_assert() This seems to be a self imposed design constraint as optimization, and it doesn't allow the sample applications to build. Making this optional for now via `#ifdef THREAD_POOL_FIXED_FUNCTION_SIZE_CONSTRAINT`. Perhaps this can be configured by the user's code. --- thread_pool/fixed_function.hpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/thread_pool/fixed_function.hpp b/thread_pool/fixed_function.hpp index 39d80997..8a1dac52 100644 --- a/thread_pool/fixed_function.hpp +++ b/thread_pool/fixed_function.hpp @@ -40,8 +40,11 @@ class FixedFunction { { typedef typename std::remove_reference::type unref_type; +#if THREAD_POOL_FIXED_FUNCTION_SIZE_CONSTRAINT static_assert(sizeof(unref_type) < STORAGE_SIZE, "functional object doesn't fit into internal storage"); +#endif + static_assert(std::is_move_constructible::value, "Should be of movable type"); m_method_ptr = [](void *object_ptr, func_ptr_type, ARGS... args) -> R { From 5688457cfe10a52cf8ae5831add8db1ea1c55dc1 Mon Sep 17 00:00:00 2001 From: David Hirvonen Date: Wed, 3 Feb 2016 10:46:26 -0500 Subject: [PATCH 7/8] remove conditional THREAD_POOL_FIXED_FUNCTION_SIZE_CONSTRAINT See discussion: https://github.com/headupinclouds/thread-pool-cpp/pull/3 --- thread_pool/fixed_function.hpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/thread_pool/fixed_function.hpp b/thread_pool/fixed_function.hpp index 8a1dac52..750edf0e 100644 --- a/thread_pool/fixed_function.hpp +++ b/thread_pool/fixed_function.hpp @@ -40,10 +40,8 @@ class FixedFunction { { typedef typename std::remove_reference::type unref_type; -#if THREAD_POOL_FIXED_FUNCTION_SIZE_CONSTRAINT static_assert(sizeof(unref_type) < STORAGE_SIZE, "functional object doesn't fit into internal storage"); -#endif static_assert(std::is_move_constructible::value, "Should be of movable type"); From 01a9a5d761ea5174df0e583e91459a589e59e97b Mon Sep 17 00:00:00 2001 From: David Hirvonen Date: Wed, 3 Feb 2016 10:47:31 -0500 Subject: [PATCH 8/8] Added top level STORAGE_SIZE template parameter to support ThreadPool::Worker configuration without requiring user access to thread-pool-cpp --- thread_pool/thread_pool.hpp | 81 ++++++++++++++++++------------------- thread_pool/worker.hpp | 81 ++++++++++++++++++++----------------- 2 files changed, 83 insertions(+), 79 deletions(-) diff --git a/thread_pool/thread_pool.hpp b/thread_pool/thread_pool.hpp index c9f16e0d..d379a232 100644 --- a/thread_pool/thread_pool.hpp +++ b/thread_pool/thread_pool.hpp @@ -13,10 +13,11 @@ * @brief The ThreadPoolOptions struct provides construction options for ThreadPool. */ struct ThreadPoolOptions { + size_t threads_count{std::thread::hardware_concurrency()}; size_t worker_queue_size = 1024; - Worker::OnStart onStart; - Worker::OnStop onStop; + Worker<>::OnStart onStart; + Worker<>::OnStop onStop; }; /** @@ -26,8 +27,13 @@ struct ThreadPoolOptions { * It implements both work-stealing and work-distribution balancing startegies. * It implements cooperative scheduling strategy for tasks. */ + +template class ThreadPool { public: + + typedef Worker FixedWorker; + /** * @brief ThreadPool Construct and start new thread pool. * @param options Creation options. @@ -47,8 +53,13 @@ class ThreadPool { * execution or exception thrown. */ template - void post(Handler &&handler); - + void post(Handler &&handler) + { + if (!getWorker().post(std::forward(handler))) { + throw std::overflow_error("worker queue is full"); + } + } + /** * @brief process Post piece of job to thread pool and get future for this job. * @param handler Handler to be called from thread pool worker. It has to be callable as 'handler()'. @@ -58,7 +69,20 @@ class ThreadPool { * std::packaged_task construction overhead. */ template ::type> - typename std::future process(Handler &&handler); + typename std::future process(Handler &&handler) + { + std::packaged_task task([handler = std::move(handler)] () { + return handler(); + }); + + std::future result = task.get_future(); + + if (!getWorker().post(task)) { + throw std::overflow_error("worker queue is full"); + } + + return result; + } /** * @brief getWorkerCount Returns the number of workers created by the thread pool @@ -70,16 +94,17 @@ class ThreadPool { ThreadPool(const ThreadPool&) = delete; ThreadPool & operator=(const ThreadPool&) = delete; - Worker & getWorker(); + FixedWorker & getWorker(); - std::vector> m_workers; + std::vector> m_workers; std::atomic m_next_worker; }; /// Implementation -inline ThreadPool::ThreadPool(const ThreadPoolOptions &options) +template +inline ThreadPool::ThreadPool(const ThreadPoolOptions &options) : m_next_worker(0) { size_t workers_count = options.threads_count; @@ -90,50 +115,27 @@ inline ThreadPool::ThreadPool(const ThreadPoolOptions &options) m_workers.resize(workers_count); for (auto &worker_ptr : m_workers) { - worker_ptr.reset(new Worker(options.worker_queue_size)); + worker_ptr.reset(new FixedWorker(options.worker_queue_size)); } for (size_t i = 0; i < m_workers.size(); ++i) { - Worker *steal_donor = m_workers[(i + 1) % m_workers.size()].get(); + FixedWorker *steal_donor = m_workers[(i + 1) % m_workers.size()].get(); m_workers[i]->start(i, steal_donor, options.onStart, options.onStop); } } -inline ThreadPool::~ThreadPool() +template +inline ThreadPool::~ThreadPool() { for (auto &worker_ptr : m_workers) { worker_ptr->stop(); } } -template -inline void ThreadPool::post(Handler &&handler) +template +inline typename ThreadPool::FixedWorker& ThreadPool::getWorker() { - if (!getWorker().post(std::forward(handler))) { - throw std::overflow_error("worker queue is full"); - } -} - -template -typename std::future ThreadPool::process(Handler &&handler) -{ - std::packaged_task task([handler = std::move(handler)] () { - return handler(); - }); - - std::future result = task.get_future(); - - if (!getWorker().post(task)) { - throw std::overflow_error("worker queue is full"); - } - - return result; -} - - -inline Worker & ThreadPool::getWorker() -{ - size_t id = Worker::getWorkerIdForCurrentThread(); + size_t id = FixedWorker::getWorkerIdForCurrentThread(); if (id > m_workers.size()) { id = m_next_worker.fetch_add(1, std::memory_order_relaxed) % m_workers.size(); @@ -142,9 +144,6 @@ inline Worker & ThreadPool::getWorker() return *m_workers[id]; } -inline size_t ThreadPool::getWorkerCount() const { - return m_workers.size(); -} #endif diff --git a/thread_pool/worker.hpp b/thread_pool/worker.hpp index c4b5b234..d0a6f7fc 100644 --- a/thread_pool/worker.hpp +++ b/thread_pool/worker.hpp @@ -22,12 +22,14 @@ * then it tries to steal task from the sibling worker. If stealing was unsuccessful * then spins with one millisecond delay. */ + +template class Worker { public: - typedef FixedFunction Task; - - using OnStart = std::function; - using OnStop = std::function; + typedef FixedFunction Task; + + using OnStart = std::function; + using OnStop = std::function; /** * @brief Worker Constructor. @@ -39,10 +41,10 @@ class Worker { * @brief start Create the executing thread and start tasks execution. * @param id Worker ID. * @param steal_donor Sibling worker to steal task from it. - * @param onStart A handler which is executed when each thread pool thread starts - * @param onStop A handler which is executed when each thread pool thread stops + * @param onStart A handler which is executed when each thread pool thread starts + * @param onStop A handler which is executed when each thread pool thread stops */ - void start(size_t id, Worker *steal_donor, OnStart onStart, OnStop onStop); + void start(size_t id, Worker *steal_donor, OnStart onStart, OnStop onStop); /** * @brief stop Stop all worker's thread and stealing activity. @@ -56,7 +58,10 @@ class Worker { * @return true on success. */ template - bool post(Handler &&handler); + bool post(Handler &&handler) + { + return m_queue.push(std::forward(handler)); + } /** * @brief steal Steal one task from this worker queue. @@ -74,21 +79,21 @@ class Worker { private: Worker(const Worker&) = delete; Worker & operator=(const Worker&) = delete; - - /** - * @brief thread_id Return worker ID associated with current thread if exists. - * @return Worker ID. - */ - static size_t * thread_id(); + + /** + * @brief thread_id Return worker ID associated with current thread if exists. + * @return Worker ID. + */ + static size_t * thread_id(); /** * @brief threadFunc Executing thread function. * @param id Worker ID to be associated with this thread. * @param steal_donor Sibling worker to steal task from it. - * @param onStart A handler which is executed when each thread pool thread starts - * @param onStop A handler which is executed when each thread pool thread stops + * @param onStart A handler which is executed when each thread pool thread starts + * @param onStop A handler which is executed when each thread pool thread stops */ - void threadFunc(size_t id, Worker *steal_donor, OnStart onStart, OnStop onStop); + void threadFunc(size_t id, Worker *steal_donor, OnStart onStart, OnStop onStop); MPMCBoundedQueue m_queue; std::atomic m_running_flag; @@ -106,46 +111,46 @@ namespace detail { } } -inline Worker::Worker(size_t queue_size) +template +inline Worker::Worker(size_t queue_size) : m_queue(queue_size) , m_running_flag(true) { } -inline void Worker::stop() +template +inline void Worker::stop() { m_running_flag.store(false, std::memory_order_relaxed); m_thread.join(); } -inline void Worker::start(size_t id, Worker *steal_donor, OnStart onStart, OnStop onStop) +template +inline void Worker::start(size_t id, Worker *steal_donor, OnStart onStart, OnStop onStop) { - m_thread = std::thread(&Worker::threadFunc, this, id, steal_donor, onStart, onStop); + m_thread = std::thread(&Worker::threadFunc, this, id, steal_donor, onStart, onStop); } -inline size_t Worker::getWorkerIdForCurrentThread() +template +inline size_t Worker::getWorkerIdForCurrentThread() { return *detail::thread_id(); } -template -inline bool Worker::post(Handler &&handler) -{ - return m_queue.push(std::forward(handler)); -} - -inline bool Worker::steal(Task &task) +template +inline bool Worker::steal(Task &task) { return m_queue.pop(task); } -inline void Worker::threadFunc(size_t id, Worker *steal_donor, OnStart onStart, OnStop onStop) +template +inline void Worker::threadFunc(size_t id, Worker *steal_donor, OnStart onStart, OnStop onStop) { *detail::thread_id() = id; - - if (onStart) { - try { onStart(id); } catch (...) {} - } + + if (onStart) { + try { onStart(id); } catch (...) {} + } Task handler; @@ -154,10 +159,10 @@ inline void Worker::threadFunc(size_t id, Worker *steal_donor, OnStart onStart, try {handler();} catch (...) {} } else { std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - - if (onStop) { - try { onStop(id); } catch (...) {} + } + + if (onStop) { + try { onStop(id); } catch (...) {} } }