diff --git a/tests/thread_pool.t.cpp b/tests/thread_pool.t.cpp index 731001f5..e94aa6df 100644 --- a/tests/thread_pool.t.cpp +++ b/tests/thread_pool.t.cpp @@ -1,11 +1,10 @@ #include #include +#include #include #include -#include -#include -#include +#include using namespace tp; @@ -31,4 +30,38 @@ int main() ASSERT(42 == r.get()); }); + + try { + ASSERT(r.get() == 42 && !"should not be called, exception expected"); + } catch (const my_exception &e) { + } + }); + + doTest("post job to threadpool with onStart/onStop", []() { + std::atomic someValue{0}; + ThreadPoolOptions options; + options.onStart = [&someValue](){ ++someValue; }; + options.onStop = [&someValue](){ --someValue; }; + + if (true) { + ThreadPool pool{options}; + + std::packaged_task t([&someValue](){ + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + return someValue.load(); + }); + + std::future r = t.get_future(); + + pool.post(t); + + const auto result = r.get(); + + ASSERT(0 < result); + ASSERT(pool.getWorkerCount() == result); + } + + ASSERT(0 == someValue); + }); + } diff --git a/thread_pool/fixed_function.hpp b/thread_pool/fixed_function.hpp new file mode 100644 index 00000000..750edf0e --- /dev/null +++ b/thread_pool/fixed_function.hpp @@ -0,0 +1,146 @@ +#ifndef FIXED_FUNCTION_HPP +#define FIXED_FUNCTION_HPP + +#include +#include +#include +#include + +/** + * @brief The FixedFunction class implements functional object. + * This function is analog of 'std::function' with limited capabilities: + * - It supports only move semantics. + * - The size of functional objects is limited to storage size. + * Due to limitations above it is much faster on creation and copying than std::function. + */ +template +class FixedFunction; + +template +class FixedFunction { + + typedef R (*func_ptr_type)(ARGS...); + +public: + FixedFunction() + : m_function_ptr(nullptr) + , m_method_ptr(nullptr) + , m_alloc_ptr(nullptr) + { + } + + /** + * @brief FixedFunction Constructor from functional object. + * @param object Functor object will be stored in the internal storage + * using move constructor. Unmovable objects are prohibited explicitly. + */ + template + FixedFunction(FUNC &&object) + : FixedFunction() + { + typedef typename std::remove_reference::type unref_type; + + static_assert(sizeof(unref_type) < STORAGE_SIZE, + "functional object doesn't fit into internal storage"); + + static_assert(std::is_move_constructible::value, "Should be of movable type"); + + m_method_ptr = [](void *object_ptr, func_ptr_type, ARGS... args) -> R { + return static_cast(object_ptr)->operator()(args...); + }; + + m_alloc_ptr = [](void *storage_ptr, void *object_ptr) { + if (object_ptr) { + unref_type *object = static_cast(object_ptr); + new (storage_ptr) unref_type(std::move(*object)); + } else { + static_cast(storage_ptr)->~unref_type(); + } + }; + + m_alloc_ptr(&m_storage, &object); + } + + /** + * @brief FixedFunction Constructor from free function or static member. + */ + template + FixedFunction(RET(*func_ptr)(PARAMS...)) + : FixedFunction() + { + m_function_ptr = func_ptr; + m_method_ptr = [](void *, func_ptr_type f_ptr, ARGS... args) -> R { + return static_cast(f_ptr)(args...); + }; + } + + FixedFunction(FixedFunction &&o) + : FixedFunction() + { + moveFromOther(o); + } + + FixedFunction & operator=(FixedFunction &&o) + { + moveFromOther(o); + return *this; + } + + ~FixedFunction() + { + if (m_alloc_ptr) + m_alloc_ptr(&m_storage, nullptr); + } + + /** + * @brief operator () Execute stored functional object. + * @throws std::runtime_error if no functional object is stored. + */ + R operator()(ARGS... args) + { + if (!m_method_ptr) + throw std::runtime_error("call of empty functor"); + return m_method_ptr(&m_storage, m_function_ptr, args...); + } + +private: + FixedFunction & operator=(const FixedFunction &) = delete; + FixedFunction(const FixedFunction &) = delete; + + union { + typename std::aligned_storage::type m_storage; + func_ptr_type m_function_ptr; + }; + + typedef R(*method_type)(void *object_ptr, func_ptr_type free_func_ptr, ARGS... args); + method_type m_method_ptr; + + typedef void(*alloc_type)(void *storage_ptr, void *object_ptr); + alloc_type m_alloc_ptr; + + void moveFromOther(FixedFunction &o) + { + if (this == &o) + return; + + if (m_alloc_ptr) { + m_alloc_ptr(&m_storage, nullptr); + m_alloc_ptr = nullptr; + } else { + m_function_ptr = nullptr; + } + + m_method_ptr = o.m_method_ptr; + o.m_method_ptr = nullptr; + + if (o.m_alloc_ptr) { + m_alloc_ptr = o.m_alloc_ptr; + m_alloc_ptr(&m_storage, &o.m_storage); + } else { + m_function_ptr = o.m_function_ptr; + } + } +}; + + +#endif diff --git a/thread_pool/thread_pool.hpp b/thread_pool/thread_pool.hpp new file mode 100644 index 00000000..d379a232 --- /dev/null +++ b/thread_pool/thread_pool.hpp @@ -0,0 +1,149 @@ +#ifndef THREAD_POOL_HPP +#define THREAD_POOL_HPP + +#include +#include +#include +#include +#include +#include +#include + +/** + * @brief The ThreadPoolOptions struct provides construction options for ThreadPool. + */ +struct ThreadPoolOptions { + + size_t threads_count{std::thread::hardware_concurrency()}; + size_t worker_queue_size = 1024; + Worker<>::OnStart onStart; + Worker<>::OnStop onStop; +}; + +/** + * @brief The ThreadPool class implements thread pool pattern. + * It is highly scalable and fast. + * It is header only. + * It implements both work-stealing and work-distribution balancing startegies. + * It implements cooperative scheduling strategy for tasks. + */ + +template +class ThreadPool { +public: + + typedef Worker FixedWorker; + + /** + * @brief ThreadPool Construct and start new thread pool. + * @param options Creation options. + */ + explicit ThreadPool(const ThreadPoolOptions &options = ThreadPoolOptions()); + + /** + * @brief ~ThreadPool Stop all workers and destroy thread pool. + */ + ~ThreadPool(); + + /** + * @brief post Post piece of job to thread pool. + * @param handler Handler to be called from thread pool worker. It has to be callable as 'handler()'. + * @throws std::overflow_error if worker's queue is full. + * @note All exceptions thrown by handler will be suppressed. Use 'process()' to get result of handler's + * execution or exception thrown. + */ + template + void post(Handler &&handler) + { + if (!getWorker().post(std::forward(handler))) { + throw std::overflow_error("worker queue is full"); + } + } + + /** + * @brief process Post piece of job to thread pool and get future for this job. + * @param handler Handler to be called from thread pool worker. It has to be callable as 'handler()'. + * @return Future which hold handler result or exception thrown. + * @throws std::overflow_error if worker's queue is full. + * @note This method of posting job to thread pool is much slower than 'post()' due to std::future and + * std::packaged_task construction overhead. + */ + template ::type> + typename std::future process(Handler &&handler) + { + std::packaged_task task([handler = std::move(handler)] () { + return handler(); + }); + + std::future result = task.get_future(); + + if (!getWorker().post(task)) { + throw std::overflow_error("worker queue is full"); + } + + return result; + } + + /** + * @brief getWorkerCount Returns the number of workers created by the thread pool + * @return Worker ID. + */ + size_t getWorkerCount() const; + +private: + ThreadPool(const ThreadPool&) = delete; + ThreadPool & operator=(const ThreadPool&) = delete; + + FixedWorker & getWorker(); + + std::vector> m_workers; + std::atomic m_next_worker; +}; + + +/// Implementation + +template +inline ThreadPool::ThreadPool(const ThreadPoolOptions &options) + : m_next_worker(0) +{ + size_t workers_count = options.threads_count; + + if (0 == workers_count) { + workers_count = 1; + } + + m_workers.resize(workers_count); + for (auto &worker_ptr : m_workers) { + worker_ptr.reset(new FixedWorker(options.worker_queue_size)); + } + + for (size_t i = 0; i < m_workers.size(); ++i) { + FixedWorker *steal_donor = m_workers[(i + 1) % m_workers.size()].get(); + m_workers[i]->start(i, steal_donor, options.onStart, options.onStop); + } +} + +template +inline ThreadPool::~ThreadPool() +{ + for (auto &worker_ptr : m_workers) { + worker_ptr->stop(); + } +} + +template +inline typename ThreadPool::FixedWorker& ThreadPool::getWorker() +{ + size_t id = FixedWorker::getWorkerIdForCurrentThread(); + + if (id > m_workers.size()) { + id = m_next_worker.fetch_add(1, std::memory_order_relaxed) % m_workers.size(); + } + + return *m_workers[id]; +} + + +#endif + diff --git a/thread_pool/worker.cpp b/thread_pool/worker.cpp new file mode 100644 index 00000000..6724ef0d --- /dev/null +++ b/thread_pool/worker.cpp @@ -0,0 +1,7 @@ +#include + +size_t * Worker::thread_id() +{ + static thread_local size_t tss_id = -1u; + return &tss_id; +}