From a9f0e6065e1b8034cbf607a80829f8fe8a93a92c Mon Sep 17 00:00:00 2001 From: Vittorio Romeo Date: Wed, 24 Feb 2016 17:22:49 +0100 Subject: [PATCH 01/10] Major changes * Moved everything to "./include/thread_pool" * Main include file is now: * Everything is now in the `tp` namespace * Added optional `boost::future` compatiblity with `THREAD_POOL_USE_BOOST` macro * Added `make install` command to CMakeFiles.txt * Adapted tests and benchmarks to new changes --- CMakeLists.txt | 10 +- benchmark/benchmark.cpp | 4 +- include/thread_pool.hpp | 3 + include/thread_pool/fixed_function.hpp | 165 ++++++++++++++++ include/thread_pool/future_dependencies.hpp | 18 ++ include/thread_pool/mpsc_bounded_queue.hpp | 199 ++++++++++++++++++++ include/thread_pool/thread_pool.hpp | 172 +++++++++++++++++ include/thread_pool/worker.hpp | 149 +++++++++++++++ tests/fixed_function.t.cpp | 4 +- tests/thread_pool.t.cpp | 4 +- tests/thread_pool2.t.cpp | 4 +- thread_pool/fixed_function.hpp | 145 -------------- thread_pool/mpsc_bounded_queue.hpp | 164 ---------------- thread_pool/thread_pool.hpp | 143 -------------- thread_pool/worker.hpp | 133 ------------- 15 files changed, 727 insertions(+), 590 deletions(-) create mode 100644 include/thread_pool.hpp create mode 100644 include/thread_pool/fixed_function.hpp create mode 100644 include/thread_pool/future_dependencies.hpp create mode 100644 include/thread_pool/mpsc_bounded_queue.hpp create mode 100644 include/thread_pool/thread_pool.hpp create mode 100644 include/thread_pool/worker.hpp delete mode 100644 thread_pool/fixed_function.hpp delete mode 100644 thread_pool/mpsc_bounded_queue.hpp delete mode 100644 thread_pool/thread_pool.hpp delete mode 100644 thread_pool/worker.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 8cc01e91..9425fc06 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,7 +8,15 @@ ADD_DEFINITIONS( -std=c++1y -Wall -Werror -O3 ) -include_directories(${CMAKE_CURRENT_SOURCE_DIR}/thread_pool) +include_directories("${CMAKE_CURRENT_SOURCE_DIR}/include") add_subdirectory(tests) add_subdirectory(benchmark) + +# Install as header-only library +set(INC_DIR "${CMAKE_CURRENT_SOURCE_DIR}/include/") +file(GLOB_RECURSE INSTALL_FILES_LIST ${INC_DIR}) +set_source_files_properties(${file_list} PROPERTIES HEADER_FILE_ONLY 1) +add_library(HEADER_ONLY_TARGET STATIC ${file_list}) +set_target_properties(HEADER_ONLY_TARGET PROPERTIES LINKER_LANGUAGE CXX) +install(DIRECTORY ${INC_DIR} DESTINATION "include") \ No newline at end of file diff --git a/benchmark/benchmark.cpp b/benchmark/benchmark.cpp index f3b764b9..553b6b24 100644 --- a/benchmark/benchmark.cpp +++ b/benchmark/benchmark.cpp @@ -1,6 +1,6 @@ //#define WITHOUT_ASIO 1 -#include +#include #ifndef WITHOUT_ASIO #include @@ -12,6 +12,8 @@ #include #include +using namespace tp; + static const size_t CONCURRENCY = 16; static const size_t REPOST_COUNT = 1000000; diff --git a/include/thread_pool.hpp b/include/thread_pool.hpp new file mode 100644 index 00000000..69245156 --- /dev/null +++ b/include/thread_pool.hpp @@ -0,0 +1,3 @@ +#pragma once + +#include \ No newline at end of file diff --git a/include/thread_pool/fixed_function.hpp b/include/thread_pool/fixed_function.hpp new file mode 100644 index 00000000..4c4935ae --- /dev/null +++ b/include/thread_pool/fixed_function.hpp @@ -0,0 +1,165 @@ +#ifndef FIXED_FUNCTION_HPP +#define FIXED_FUNCTION_HPP + +#include +#include +#include +#include + +namespace tp +{ + + /** + * @brief The FixedFunction class implements + * functional object. + * This function is analog of 'std::function' with limited capabilities: + * - It supports only move semantics. + * - The size of functional objects is limited to storage size. + * Due to limitations above it is much faster on creation and copying than + * std::function. + */ + template + class FixedFunction; + + template + class FixedFunction + { + + typedef R (*func_ptr_type)(ARGS...); + + public: + FixedFunction() + : m_function_ptr(nullptr), m_method_ptr(nullptr), + m_alloc_ptr(nullptr) + { + } + + /** + * @brief FixedFunction Constructor from functional object. + * @param object Functor object will be stored in the internal storage + * using move constructor. Unmovable objects are prohibited explicitly. + */ + template + FixedFunction(FUNC&& object) + : FixedFunction() + { + typedef typename std::remove_reference::type unref_type; + + static_assert(sizeof(unref_type) < STORAGE_SIZE, + "functional object doesn't fit into internal storage"); + static_assert(std::is_move_constructible::value, + "Should be of movable type"); + + m_method_ptr = []( + void* object_ptr, func_ptr_type, ARGS... args) -> R + { + return static_cast(object_ptr) + -> + operator()(args...); + }; + + m_alloc_ptr = [](void* storage_ptr, void* object_ptr) + { + if(object_ptr) + { + unref_type* x_object = static_cast(object_ptr); + new(storage_ptr) unref_type(std::move(*x_object)); + } + else + { + static_cast(storage_ptr)->~unref_type(); + } + }; + + m_alloc_ptr(&m_storage, &object); + } + + /** + * @brief FixedFunction Constructor from free function or static member. + */ + template + FixedFunction(RET (*func_ptr)(PARAMS...)) + : FixedFunction() + { + m_function_ptr = func_ptr; + m_method_ptr = [](void*, func_ptr_type f_ptr, ARGS... args) -> R + { + return static_cast(f_ptr)(args...); + }; + } + + FixedFunction(FixedFunction&& o) : FixedFunction() + { + moveFromOther(o); + } + + FixedFunction& operator=(FixedFunction&& o) + { + moveFromOther(o); + return *this; + } + + ~FixedFunction() + { + if(m_alloc_ptr) m_alloc_ptr(&m_storage, nullptr); + } + + /** + * @brief operator () Execute stored functional object. + * @throws std::runtime_error if no functional object is stored. + */ + R operator()(ARGS... args) + { + if(!m_method_ptr) throw std::runtime_error("call of empty functor"); + return m_method_ptr(&m_storage, m_function_ptr, args...); + } + + private: + FixedFunction& operator=(const FixedFunction&) = delete; + FixedFunction(const FixedFunction&) = delete; + + union + { + typename std::aligned_storage::type + m_storage; + func_ptr_type m_function_ptr; + }; + + typedef R (*method_type)( + void* object_ptr, func_ptr_type free_func_ptr, ARGS... args); + method_type m_method_ptr; + + typedef void (*alloc_type)(void* storage_ptr, void* object_ptr); + alloc_type m_alloc_ptr; + + void moveFromOther(FixedFunction& o) + { + if(this == &o) return; + + if(m_alloc_ptr) + { + m_alloc_ptr(&m_storage, nullptr); + m_alloc_ptr = nullptr; + } + else + { + m_function_ptr = nullptr; + } + + m_method_ptr = o.m_method_ptr; + o.m_method_ptr = nullptr; + + if(o.m_alloc_ptr) + { + m_alloc_ptr = o.m_alloc_ptr; + m_alloc_ptr(&m_storage, &o.m_storage); + } + else + { + m_function_ptr = o.m_function_ptr; + } + } + }; +} + +#endif diff --git a/include/thread_pool/future_dependencies.hpp b/include/thread_pool/future_dependencies.hpp new file mode 100644 index 00000000..138f6a3b --- /dev/null +++ b/include/thread_pool/future_dependencies.hpp @@ -0,0 +1,18 @@ +#pragma once + +#if defined(THREAD_POOL_USE_BOOST) +#include +#else +#include +#endif + +namespace tp +{ +#if defined(THREAD_POOL_USE_BOOST) + template + using packaged_task_type = boost::packaged_task; +#else + template + using packaged_task_type = std::packaged_task; +#endif +} \ No newline at end of file diff --git a/include/thread_pool/mpsc_bounded_queue.hpp b/include/thread_pool/mpsc_bounded_queue.hpp new file mode 100644 index 00000000..5c443af7 --- /dev/null +++ b/include/thread_pool/mpsc_bounded_queue.hpp @@ -0,0 +1,199 @@ +// Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided +// that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, +// this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright +// notice, this list +// of conditions and the following disclaimer in the documentation and/or +// other materials +// provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED +// WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO +// EVENT +// SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +// INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, +// OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +// LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR +// OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +// ADVISED OF +// THE POSSIBILITY OF SUCH DAMAGE. +// +// The views and conclusions contained in the software and documentation are +// those of the authors and +// should not be interpreted as representing official policies, either expressed +// or implied, of Dmitry Vyukov. + +#ifndef MPSC_QUEUE_HPP +#define MPSC_QUEUE_HPP + +#include +#include +#include +#include + +namespace tp +{ + + /** + * @brief The MPMCBoundedQueue class implements bounded + * multi-producers/multi-consumers lock-free queue. + * Doesn't accept non-movabe types as T. + * Inspired by Dmitry Vyukov's mpmc queue. + * http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue + */ + template + class MPMCBoundedQueue + { + static_assert( + std::is_move_constructible::value, "Should be of movable type"); + + public: + /** + * @brief MPMCBoundedQueue Constructor. + * @param size Power of 2 number - queue length. + * @throws std::invalid_argument if size is bad. + */ + explicit MPMCBoundedQueue(size_t size); + + /** + * @brief push Push data to queue. + * @param data Data to be pushed. + * @return true on success. + */ + template + bool push(U&& data); + + /** + * @brief pop Pop data from queue. + * @param data Place to store popped data. + * @return true on sucess. + */ + bool pop(T& data); + + private: + MPMCBoundedQueue(const MPMCBoundedQueue&) = delete; + MPMCBoundedQueue& operator=(const MPMCBoundedQueue&) = delete; + + struct Cell + { + std::atomic sequence; + T data; + }; + + typedef char Cacheline[64]; + + Cacheline pad0; + std::vector m_buffer; + const size_t m_buffer_mask; + Cacheline pad1; + std::atomic m_enqueue_pos; + Cacheline pad2; + std::atomic m_dequeue_pos; + Cacheline pad3; + }; + + + /// Implementation + + template + inline MPMCBoundedQueue::MPMCBoundedQueue(size_t size) + : m_buffer(size), m_buffer_mask(size - 1), m_enqueue_pos(0), + m_dequeue_pos(0) + { + bool size_is_power_of_2 = (size >= 2) && ((size & (size - 1)) == 0); + if(!size_is_power_of_2) + { + throw std::invalid_argument("buffer size should be a power of 2"); + } + + for(size_t i = 0; i < size; ++i) + { + m_buffer[i].sequence = i; + } + } + + template + template + inline bool MPMCBoundedQueue::push(U&& data) + { + Cell* cell; + size_t pos = m_enqueue_pos.load(std::memory_order_relaxed); + for(;;) + { + cell = &m_buffer[pos & m_buffer_mask]; + size_t seq = cell->sequence.load(std::memory_order_acquire); + intptr_t dif = (intptr_t)seq - (intptr_t)pos; + if(dif == 0) + { + if(m_enqueue_pos.compare_exchange_weak( + pos, pos + 1, std::memory_order_relaxed)) + { + break; + } + } + else if(dif < 0) + { + return false; + } + else + { + pos = m_enqueue_pos.load(std::memory_order_relaxed); + } + } + + cell->data = std::forward(data); + + cell->sequence.store(pos + 1, std::memory_order_release); + + return true; + } + + template + inline bool MPMCBoundedQueue::pop(T& data) + { + Cell* cell; + size_t pos = m_dequeue_pos.load(std::memory_order_relaxed); + for(;;) + { + cell = &m_buffer[pos & m_buffer_mask]; + size_t seq = cell->sequence.load(std::memory_order_acquire); + intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1); + if(dif == 0) + { + if(m_dequeue_pos.compare_exchange_weak( + pos, pos + 1, std::memory_order_relaxed)) + { + break; + } + } + else if(dif < 0) + { + return false; + } + else + { + pos = m_dequeue_pos.load(std::memory_order_relaxed); + } + } + + data = std::move(cell->data); + + cell->sequence.store( + pos + m_buffer_mask + 1, std::memory_order_release); + + return true; + } +} + +#endif diff --git a/include/thread_pool/thread_pool.hpp b/include/thread_pool/thread_pool.hpp new file mode 100644 index 00000000..8a68f5c5 --- /dev/null +++ b/include/thread_pool/thread_pool.hpp @@ -0,0 +1,172 @@ +#ifndef THREAD_POOL_HPP +#define THREAD_POOL_HPP + +#include +#include +#include +#include +#include +#include +#include + +namespace tp +{ + /** + * @brief The ThreadPoolOptions struct provides construction options for + * ThreadPool. + */ + struct ThreadPoolOptions + { + enum + { + AUTODETECT = 0 + }; + size_t threads_count = AUTODETECT; + size_t worker_queue_size = 1024; + }; + + /** + * @brief The ThreadPool class implements thread pool pattern. + * It is highly scalable and fast. + * It is header only. + * It implements both work-stealing and work-distribution balancing + * startegies. + * It implements cooperative scheduling strategy for tasks. + */ + class ThreadPool + { + public: + /** + * @brief ThreadPool Construct and start new thread pool. + * @param options Creation options. + */ + explicit ThreadPool( + const ThreadPoolOptions& options = ThreadPoolOptions()); + + /** + * @brief ~ThreadPool Stop all workers and destroy thread pool. + */ + ~ThreadPool(); + + /** + * @brief post Post piece of job to thread pool. + * @param handler Handler to be called from thread pool worker. It has + * to be + * callable as 'handler()'. + * @throws std::overflow_error if worker's queue is full. + * @note All exceptions thrown by handler will be suppressed. Use + * 'process()' to get result of handler's + * execution or exception thrown. + */ + template + void post(Handler&& handler); + + /** + * @brief process Post piece of job to thread pool and get future for + * this + * job. + * @param handler Handler to be called from thread pool worker. It has + * to be + * callable as 'handler()'. + * @return Future which hold handler result or exception thrown. + * @throws std::overflow_error if worker's queue is full. + * @note This method of posting job to thread pool is much slower than + * 'post()' due to std::future and + * std::packaged_task construction overhead. + */ + template ::type> + auto process(Handler&& handler); + + private: + ThreadPool(const ThreadPool&) = delete; + ThreadPool& operator=(const ThreadPool&) = delete; + + Worker& getWorker(); + + std::vector> m_workers; + std::atomic m_next_worker; + }; + + + /// Implementation + + inline ThreadPool::ThreadPool(const ThreadPoolOptions& options) + : m_next_worker(0) + { + size_t workers_count = options.threads_count; + + if(ThreadPoolOptions::AUTODETECT == options.threads_count) + { + workers_count = std::thread::hardware_concurrency(); + } + + if(0 == workers_count) + { + workers_count = 1; + } + + m_workers.resize(workers_count); + for(auto& worker_ptr : m_workers) + { + worker_ptr.reset(new Worker(options.worker_queue_size)); + } + + for(size_t i = 0; i < m_workers.size(); ++i) + { + Worker* steal_donor = m_workers[(i + 1) % m_workers.size()].get(); + m_workers[i]->start(i, steal_donor); + } + } + + inline ThreadPool::~ThreadPool() + { + for(auto& worker_ptr : m_workers) + { + worker_ptr->stop(); + } + } + + template + inline void ThreadPool::post(Handler&& handler) + { + if(!getWorker().post(std::forward(handler))) + { + throw std::overflow_error("worker queue is full"); + } + } + + template + auto ThreadPool::process(Handler&& handler) + { + packaged_task_type task([handler = std::move(handler)]() + { + return handler(); + }); + + auto result = task.get_future(); + + if(!getWorker().post(task)) + { + throw std::overflow_error("worker queue is full"); + } + + return result; + } + + + inline Worker& ThreadPool::getWorker() + { + size_t id = Worker::getWorkerIdForCurrentThread(); + + if(id > m_workers.size()) + { + id = m_next_worker.fetch_add(1, std::memory_order_relaxed) % + m_workers.size(); + } + + return *m_workers[id]; + } +} + +#endif diff --git a/include/thread_pool/worker.hpp b/include/thread_pool/worker.hpp new file mode 100644 index 00000000..fc60ff9b --- /dev/null +++ b/include/thread_pool/worker.hpp @@ -0,0 +1,149 @@ +#ifndef WORKER_HPP +#define WORKER_HPP + +#include +#include +#include +#include + +namespace tp +{ + + /** + * @brief The Worker class owns task queue and executing thread. + * In executing thread it tries to pop task from queue. If queue is empty + * then it tries to steal task from the sibling worker. If stealing was + * unsuccessful + * then spins with one millisecond delay. + */ + class Worker + { + public: + typedef FixedFunction Task; + + /** + * @brief Worker Constructor. + * @param queue_size Length of undelaying task queue. + */ + explicit Worker(size_t queue_size); + + /** + * @brief start Create the executing thread and start tasks execution. + * @param id Worker ID. + * @param steal_donor Sibling worker to steal task from it. + */ + void start(size_t id, Worker* steal_donor); + + /** + * @brief stop Stop all worker's thread and stealing activity. + * Waits until the executing thread became finished. + */ + void stop(); + + /** + * @brief post Post task to queue. + * @param handler Handler to be executed in executing thread. + * @return true on success. + */ + template + bool post(Handler&& handler); + + /** + * @brief steal Steal one task from this worker queue. + * @param task Place for stealed task to be stored. + * @return true on success. + */ + bool steal(Task& task); + + /** + * @brief getWorkerIdForCurrentThread Return worker ID associated with + * current thread if exists. + * @return Worker ID. + */ + static size_t getWorkerIdForCurrentThread(); + + private: + Worker(const Worker&) = delete; + Worker& operator=(const Worker&) = delete; + + /** + * @brief threadFunc Executing thread function. + * @param id Worker ID to be associated with this thread. + * @param steal_donor Sibling worker to steal task from it. + */ + void threadFunc(size_t id, Worker* steal_donor); + + MPMCBoundedQueue m_queue; + std::atomic m_running_flag; + std::thread m_thread; + }; + + + /// Implementation + + namespace detail + { + inline size_t* thread_id() + { + static thread_local size_t tss_id = -1u; + return &tss_id; + } + } + + inline Worker::Worker(size_t queue_size) + : m_queue(queue_size), m_running_flag(true) + { + } + + inline void Worker::stop() + { + m_running_flag.store(false, std::memory_order_relaxed); + m_thread.join(); + } + + inline void Worker::start(size_t id, Worker* steal_donor) + { + m_thread = std::thread(&Worker::threadFunc, this, id, steal_donor); + } + + inline size_t Worker::getWorkerIdForCurrentThread() + { + return *detail::thread_id(); + } + + template + inline bool Worker::post(Handler&& handler) + { + return m_queue.push(std::forward(handler)); + } + + inline bool Worker::steal(Task& task) + { + return m_queue.pop(task); + } + + inline void Worker::threadFunc(size_t id, Worker* steal_donor) + { + *detail::thread_id() = id; + + Task handler; + + while(m_running_flag.load(std::memory_order_relaxed)) + if(m_queue.pop(handler) || steal_donor->steal(handler)) + { + try + { + handler(); + } + catch(...) + { + } + } + else + { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + } +} + +#endif diff --git a/tests/fixed_function.t.cpp b/tests/fixed_function.t.cpp index 29fee14a..d24acaf7 100644 --- a/tests/fixed_function.t.cpp +++ b/tests/fixed_function.t.cpp @@ -1,4 +1,4 @@ -#include +#include #include #include @@ -6,6 +6,8 @@ #include #include +using namespace tp; + int test_free_func(int i) { return i; diff --git a/tests/thread_pool.t.cpp b/tests/thread_pool.t.cpp index 3df8c155..fa047cf2 100644 --- a/tests/thread_pool.t.cpp +++ b/tests/thread_pool.t.cpp @@ -1,4 +1,4 @@ -#include +#include #include #include @@ -7,6 +7,8 @@ #include #include +using namespace tp; + int main() { std::cout << "*** Testing ThreadPool ***" << std::endl; diff --git a/tests/thread_pool2.t.cpp b/tests/thread_pool2.t.cpp index f7bb906b..ac45eaed 100644 --- a/tests/thread_pool2.t.cpp +++ b/tests/thread_pool2.t.cpp @@ -1,4 +1,6 @@ -#include +#include + +using namespace tp; size_t getWorkerIdForCurrentThread() { diff --git a/thread_pool/fixed_function.hpp b/thread_pool/fixed_function.hpp deleted file mode 100644 index 39d80997..00000000 --- a/thread_pool/fixed_function.hpp +++ /dev/null @@ -1,145 +0,0 @@ -#ifndef FIXED_FUNCTION_HPP -#define FIXED_FUNCTION_HPP - -#include -#include -#include -#include - -/** - * @brief The FixedFunction class implements functional object. - * This function is analog of 'std::function' with limited capabilities: - * - It supports only move semantics. - * - The size of functional objects is limited to storage size. - * Due to limitations above it is much faster on creation and copying than std::function. - */ -template -class FixedFunction; - -template -class FixedFunction { - - typedef R (*func_ptr_type)(ARGS...); - -public: - FixedFunction() - : m_function_ptr(nullptr) - , m_method_ptr(nullptr) - , m_alloc_ptr(nullptr) - { - } - - /** - * @brief FixedFunction Constructor from functional object. - * @param object Functor object will be stored in the internal storage - * using move constructor. Unmovable objects are prohibited explicitly. - */ - template - FixedFunction(FUNC &&object) - : FixedFunction() - { - typedef typename std::remove_reference::type unref_type; - - static_assert(sizeof(unref_type) < STORAGE_SIZE, - "functional object doesn't fit into internal storage"); - static_assert(std::is_move_constructible::value, "Should be of movable type"); - - m_method_ptr = [](void *object_ptr, func_ptr_type, ARGS... args) -> R { - return static_cast(object_ptr)->operator()(args...); - }; - - m_alloc_ptr = [](void *storage_ptr, void *object_ptr) { - if (object_ptr) { - unref_type *object = static_cast(object_ptr); - new (storage_ptr) unref_type(std::move(*object)); - } else { - static_cast(storage_ptr)->~unref_type(); - } - }; - - m_alloc_ptr(&m_storage, &object); - } - - /** - * @brief FixedFunction Constructor from free function or static member. - */ - template - FixedFunction(RET(*func_ptr)(PARAMS...)) - : FixedFunction() - { - m_function_ptr = func_ptr; - m_method_ptr = [](void *, func_ptr_type f_ptr, ARGS... args) -> R { - return static_cast(f_ptr)(args...); - }; - } - - FixedFunction(FixedFunction &&o) - : FixedFunction() - { - moveFromOther(o); - } - - FixedFunction & operator=(FixedFunction &&o) - { - moveFromOther(o); - return *this; - } - - ~FixedFunction() - { - if (m_alloc_ptr) - m_alloc_ptr(&m_storage, nullptr); - } - - /** - * @brief operator () Execute stored functional object. - * @throws std::runtime_error if no functional object is stored. - */ - R operator()(ARGS... args) - { - if (!m_method_ptr) - throw std::runtime_error("call of empty functor"); - return m_method_ptr(&m_storage, m_function_ptr, args...); - } - -private: - FixedFunction & operator=(const FixedFunction &) = delete; - FixedFunction(const FixedFunction &) = delete; - - union { - typename std::aligned_storage::type m_storage; - func_ptr_type m_function_ptr; - }; - - typedef R(*method_type)(void *object_ptr, func_ptr_type free_func_ptr, ARGS... args); - method_type m_method_ptr; - - typedef void(*alloc_type)(void *storage_ptr, void *object_ptr); - alloc_type m_alloc_ptr; - - void moveFromOther(FixedFunction &o) - { - if (this == &o) - return; - - if (m_alloc_ptr) { - m_alloc_ptr(&m_storage, nullptr); - m_alloc_ptr = nullptr; - } else { - m_function_ptr = nullptr; - } - - m_method_ptr = o.m_method_ptr; - o.m_method_ptr = nullptr; - - if (o.m_alloc_ptr) { - m_alloc_ptr = o.m_alloc_ptr; - m_alloc_ptr(&m_storage, &o.m_storage); - } else { - m_function_ptr = o.m_function_ptr; - } - } -}; - - -#endif diff --git a/thread_pool/mpsc_bounded_queue.hpp b/thread_pool/mpsc_bounded_queue.hpp deleted file mode 100644 index 47609283..00000000 --- a/thread_pool/mpsc_bounded_queue.hpp +++ /dev/null @@ -1,164 +0,0 @@ -// Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. -// -// Redistribution and use in source and binary forms, with or without modification, are permitted provided -// that the following conditions are met: -// -// 1. Redistributions of source code must retain the above copyright notice, this list of -// conditions and the following disclaimer. -// -// 2. Redistributions in binary form must reproduce the above copyright notice, this list -// of conditions and the following disclaimer in the documentation and/or other materials -// provided with the distribution. -// -// THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED -// WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT -// SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, -// INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR -// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, -// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR -// OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF -// THE POSSIBILITY OF SUCH DAMAGE. -// -// The views and conclusions contained in the software and documentation are those of the authors and -// should not be interpreted as representing official policies, either expressed or implied, of Dmitry Vyukov. - -#ifndef MPSC_QUEUE_HPP -#define MPSC_QUEUE_HPP - -#include -#include -#include -#include - -/** - * @brief The MPMCBoundedQueue class implements bounded multi-producers/multi-consumers lock-free queue. - * Doesn't accept non-movabe types as T. - * Inspired by Dmitry Vyukov's mpmc queue. - * http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue - */ -template -class MPMCBoundedQueue { - static_assert(std::is_move_constructible::value, "Should be of movable type"); -public: - /** - * @brief MPMCBoundedQueue Constructor. - * @param size Power of 2 number - queue length. - * @throws std::invalid_argument if size is bad. - */ - explicit MPMCBoundedQueue(size_t size); - - /** - * @brief push Push data to queue. - * @param data Data to be pushed. - * @return true on success. - */ - template - bool push(U &&data); - - /** - * @brief pop Pop data from queue. - * @param data Place to store popped data. - * @return true on sucess. - */ - bool pop(T &data); - -private: - MPMCBoundedQueue(const MPMCBoundedQueue&) = delete; - MPMCBoundedQueue & operator=(const MPMCBoundedQueue&) = delete; - - struct Cell { - std::atomic sequence; - T data; - }; - - typedef char Cacheline[64]; - - Cacheline pad0; - std::vector m_buffer; - const size_t m_buffer_mask; - Cacheline pad1; - std::atomic m_enqueue_pos; - Cacheline pad2; - std::atomic m_dequeue_pos; - Cacheline pad3; -}; - - -/// Implementation - -template -inline MPMCBoundedQueue::MPMCBoundedQueue(size_t size) - : m_buffer(size) - , m_buffer_mask(size - 1) - , m_enqueue_pos(0) - , m_dequeue_pos(0) -{ - bool size_is_power_of_2 = (size >= 2) && ((size & (size - 1)) == 0); - if (!size_is_power_of_2) { - throw std::invalid_argument("buffer size should be a power of 2"); - } - - for (size_t i = 0; i < size; ++i) - { - m_buffer[i].sequence = i; - } -} - -template -template -inline bool MPMCBoundedQueue::push(U &&data) -{ - Cell *cell; - size_t pos = m_enqueue_pos.load(std::memory_order_relaxed); - for (;;) { - cell = &m_buffer[pos & m_buffer_mask]; - size_t seq = cell->sequence.load(std::memory_order_acquire); - intptr_t dif = (intptr_t)seq - (intptr_t)pos; - if (dif == 0) { - if (m_enqueue_pos.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) { - break; - } - } else if (dif < 0) { - return false; - } else { - pos = m_enqueue_pos.load(std::memory_order_relaxed); - } - } - - cell->data = std::forward(data); - - cell->sequence.store(pos + 1, std::memory_order_release); - - return true; -} - -template -inline bool MPMCBoundedQueue::pop(T &data) -{ - Cell *cell; - size_t pos = m_dequeue_pos.load(std::memory_order_relaxed); - for (;;) { - cell = &m_buffer[pos & m_buffer_mask]; - size_t seq = cell->sequence.load(std::memory_order_acquire); - intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1); - if (dif == 0) { - if (m_dequeue_pos.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) { - break; - } - } else if (dif < 0) { - return false; - } else { - pos = m_dequeue_pos.load(std::memory_order_relaxed); - } - } - - data = std::move(cell->data); - - cell->sequence.store(pos + m_buffer_mask + 1, std::memory_order_release); - - return true; -} - -#endif diff --git a/thread_pool/thread_pool.hpp b/thread_pool/thread_pool.hpp deleted file mode 100644 index ee3ea034..00000000 --- a/thread_pool/thread_pool.hpp +++ /dev/null @@ -1,143 +0,0 @@ -#ifndef THREAD_POOL_HPP -#define THREAD_POOL_HPP - -#include -#include -#include -#include -#include -#include -#include - -/** - * @brief The ThreadPoolOptions struct provides construction options for ThreadPool. - */ -struct ThreadPoolOptions { - enum {AUTODETECT = 0}; - size_t threads_count = AUTODETECT; - size_t worker_queue_size = 1024; -}; - -/** - * @brief The ThreadPool class implements thread pool pattern. - * It is highly scalable and fast. - * It is header only. - * It implements both work-stealing and work-distribution balancing startegies. - * It implements cooperative scheduling strategy for tasks. - */ -class ThreadPool { -public: - /** - * @brief ThreadPool Construct and start new thread pool. - * @param options Creation options. - */ - explicit ThreadPool(const ThreadPoolOptions &options = ThreadPoolOptions()); - - /** - * @brief ~ThreadPool Stop all workers and destroy thread pool. - */ - ~ThreadPool(); - - /** - * @brief post Post piece of job to thread pool. - * @param handler Handler to be called from thread pool worker. It has to be callable as 'handler()'. - * @throws std::overflow_error if worker's queue is full. - * @note All exceptions thrown by handler will be suppressed. Use 'process()' to get result of handler's - * execution or exception thrown. - */ - template - void post(Handler &&handler); - - /** - * @brief process Post piece of job to thread pool and get future for this job. - * @param handler Handler to be called from thread pool worker. It has to be callable as 'handler()'. - * @return Future which hold handler result or exception thrown. - * @throws std::overflow_error if worker's queue is full. - * @note This method of posting job to thread pool is much slower than 'post()' due to std::future and - * std::packaged_task construction overhead. - */ - template ::type> - typename std::future process(Handler &&handler); - -private: - ThreadPool(const ThreadPool&) = delete; - ThreadPool & operator=(const ThreadPool&) = delete; - - Worker & getWorker(); - - std::vector> m_workers; - std::atomic m_next_worker; -}; - - -/// Implementation - -inline ThreadPool::ThreadPool(const ThreadPoolOptions &options) - : m_next_worker(0) -{ - size_t workers_count = options.threads_count; - - if (ThreadPoolOptions::AUTODETECT == options.threads_count) { - workers_count = std::thread::hardware_concurrency(); - } - - if (0 == workers_count) { - workers_count = 1; - } - - m_workers.resize(workers_count); - for (auto &worker_ptr : m_workers) { - worker_ptr.reset(new Worker(options.worker_queue_size)); - } - - for (size_t i = 0; i < m_workers.size(); ++i) { - Worker *steal_donor = m_workers[(i + 1) % m_workers.size()].get(); - m_workers[i]->start(i, steal_donor); - } -} - -inline ThreadPool::~ThreadPool() -{ - for (auto &worker_ptr : m_workers) { - worker_ptr->stop(); - } -} - -template -inline void ThreadPool::post(Handler &&handler) -{ - if (!getWorker().post(std::forward(handler))) { - throw std::overflow_error("worker queue is full"); - } -} - -template -typename std::future ThreadPool::process(Handler &&handler) -{ - std::packaged_task task([handler = std::move(handler)] () { - return handler(); - }); - - std::future result = task.get_future(); - - if (!getWorker().post(task)) { - throw std::overflow_error("worker queue is full"); - } - - return result; -} - - -inline Worker & ThreadPool::getWorker() -{ - size_t id = Worker::getWorkerIdForCurrentThread(); - - if (id > m_workers.size()) { - id = m_next_worker.fetch_add(1, std::memory_order_relaxed) % m_workers.size(); - } - - return *m_workers[id]; -} - -#endif - diff --git a/thread_pool/worker.hpp b/thread_pool/worker.hpp deleted file mode 100644 index 3d51da5c..00000000 --- a/thread_pool/worker.hpp +++ /dev/null @@ -1,133 +0,0 @@ -#ifndef WORKER_HPP -#define WORKER_HPP - -#include -#include -#include -#include - -/** - * @brief The Worker class owns task queue and executing thread. - * In executing thread it tries to pop task from queue. If queue is empty - * then it tries to steal task from the sibling worker. If stealing was unsuccessful - * then spins with one millisecond delay. - */ -class Worker { -public: - typedef FixedFunction Task; - - /** - * @brief Worker Constructor. - * @param queue_size Length of undelaying task queue. - */ - explicit Worker(size_t queue_size); - - /** - * @brief start Create the executing thread and start tasks execution. - * @param id Worker ID. - * @param steal_donor Sibling worker to steal task from it. - */ - void start(size_t id, Worker *steal_donor); - - /** - * @brief stop Stop all worker's thread and stealing activity. - * Waits until the executing thread became finished. - */ - void stop(); - - /** - * @brief post Post task to queue. - * @param handler Handler to be executed in executing thread. - * @return true on success. - */ - template - bool post(Handler &&handler); - - /** - * @brief steal Steal one task from this worker queue. - * @param task Place for stealed task to be stored. - * @return true on success. - */ - bool steal(Task &task); - - /** - * @brief getWorkerIdForCurrentThread Return worker ID associated with current thread if exists. - * @return Worker ID. - */ - static size_t getWorkerIdForCurrentThread(); - -private: - Worker(const Worker&) = delete; - Worker & operator=(const Worker&) = delete; - - /** - * @brief threadFunc Executing thread function. - * @param id Worker ID to be associated with this thread. - * @param steal_donor Sibling worker to steal task from it. - */ - void threadFunc(size_t id, Worker *steal_donor); - - MPMCBoundedQueue m_queue; - std::atomic m_running_flag; - std::thread m_thread; -}; - - -/// Implementation - -namespace detail { - inline size_t * thread_id() - { - static thread_local size_t tss_id = -1u; - return &tss_id; - } -} - -inline Worker::Worker(size_t queue_size) - : m_queue(queue_size) - , m_running_flag(true) -{ -} - -inline void Worker::stop() -{ - m_running_flag.store(false, std::memory_order_relaxed); - m_thread.join(); -} - -inline void Worker::start(size_t id, Worker *steal_donor) -{ - m_thread = std::thread(&Worker::threadFunc, this, id, steal_donor); -} - -inline size_t Worker::getWorkerIdForCurrentThread() -{ - return *detail::thread_id(); -} - -template -inline bool Worker::post(Handler &&handler) -{ - return m_queue.push(std::forward(handler)); -} - -inline bool Worker::steal(Task &task) -{ - return m_queue.pop(task); -} - -inline void Worker::threadFunc(size_t id, Worker *steal_donor) -{ - *detail::thread_id() = id; - - Task handler; - - while (m_running_flag.load(std::memory_order_relaxed)) - if (m_queue.pop(handler) || steal_donor->steal(handler)) { - try {handler();} catch (...) {} - } else { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } -} - -#endif From 5d30844f5a1d3707b14e7f7e62a13306610241ea Mon Sep 17 00:00:00 2001 From: Vittorio Romeo Date: Wed, 24 Feb 2016 17:26:26 +0100 Subject: [PATCH 02/10] Cleanup * Use header guards in place of `#pragma once` * Prepend `THREAD_POOL_` to all header guards * Automatic `clang-format` on every file --- include/thread_pool/fixed_function.hpp | 4 ++-- include/thread_pool/future_dependencies.hpp | 11 +++++++---- include/thread_pool/mpsc_bounded_queue.hpp | 5 ++--- include/thread_pool/thread_pool.hpp | 2 +- include/thread_pool/worker.hpp | 5 ++--- 5 files changed, 14 insertions(+), 13 deletions(-) diff --git a/include/thread_pool/fixed_function.hpp b/include/thread_pool/fixed_function.hpp index 4c4935ae..8026640c 100644 --- a/include/thread_pool/fixed_function.hpp +++ b/include/thread_pool/fixed_function.hpp @@ -1,5 +1,5 @@ -#ifndef FIXED_FUNCTION_HPP -#define FIXED_FUNCTION_HPP +#ifndef THREAD_POOL_FIXED_FUNCTION_HPP +#define THREAD_POOL_FIXED_FUNCTION_HPP #include #include diff --git a/include/thread_pool/future_dependencies.hpp b/include/thread_pool/future_dependencies.hpp index 138f6a3b..5c5595d6 100644 --- a/include/thread_pool/future_dependencies.hpp +++ b/include/thread_pool/future_dependencies.hpp @@ -1,6 +1,7 @@ -#pragma once +#ifndef THREAD_POOL_FUTURE_DEPENDENCIES_HPP +#define THREAD_POOL_FUTURE_DEPENDENCIES_HPP -#if defined(THREAD_POOL_USE_BOOST) +#if defined(THREAD_POOL_USE_BOOST_FUTURE) #include #else #include @@ -8,11 +9,13 @@ namespace tp { -#if defined(THREAD_POOL_USE_BOOST) +#if defined(THREAD_POOL_USE_BOOST_FUTURE) template using packaged_task_type = boost::packaged_task; #else template using packaged_task_type = std::packaged_task; #endif -} \ No newline at end of file +} + +#endif \ No newline at end of file diff --git a/include/thread_pool/mpsc_bounded_queue.hpp b/include/thread_pool/mpsc_bounded_queue.hpp index 5c443af7..71654ef0 100644 --- a/include/thread_pool/mpsc_bounded_queue.hpp +++ b/include/thread_pool/mpsc_bounded_queue.hpp @@ -34,8 +34,8 @@ // should not be interpreted as representing official policies, either expressed // or implied, of Dmitry Vyukov. -#ifndef MPSC_QUEUE_HPP -#define MPSC_QUEUE_HPP +#ifndef THREAD_POOL_MPSC_QUEUE_HPP +#define THREAD_POOL_MPSC_QUEUE_HPP #include #include @@ -44,7 +44,6 @@ namespace tp { - /** * @brief The MPMCBoundedQueue class implements bounded * multi-producers/multi-consumers lock-free queue. diff --git a/include/thread_pool/thread_pool.hpp b/include/thread_pool/thread_pool.hpp index 8a68f5c5..0044c369 100644 --- a/include/thread_pool/thread_pool.hpp +++ b/include/thread_pool/thread_pool.hpp @@ -169,4 +169,4 @@ namespace tp } } -#endif +#endif \ No newline at end of file diff --git a/include/thread_pool/worker.hpp b/include/thread_pool/worker.hpp index fc60ff9b..d8b99c95 100644 --- a/include/thread_pool/worker.hpp +++ b/include/thread_pool/worker.hpp @@ -1,5 +1,5 @@ -#ifndef WORKER_HPP -#define WORKER_HPP +#ifndef THREAD_POOL_WORKER_HPP +#define THREAD_POOL_WORKER_HPP #include #include @@ -8,7 +8,6 @@ namespace tp { - /** * @brief The Worker class owns task queue and executing thread. * In executing thread it tries to pop task from queue. If queue is empty From 78ccd719a15db3233a7d146be34a13aa3c8abf84 Mon Sep 17 00:00:00 2001 From: Vittorio Romeo Date: Sat, 27 Feb 2016 19:42:03 +0100 Subject: [PATCH 03/10] Added Find thread-pool-cpp cmake file --- cmake/Findthread-pool-cpp.cmake | 86 +++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 cmake/Findthread-pool-cpp.cmake diff --git a/cmake/Findthread-pool-cpp.cmake b/cmake/Findthread-pool-cpp.cmake new file mode 100644 index 00000000..1910b9d3 --- /dev/null +++ b/cmake/Findthread-pool-cpp.cmake @@ -0,0 +1,86 @@ +# Copyright (c) 2015-2016 Vittorio Romeo +# License: Academic Free License ("AFL") v. 3.0 +# AFL License page: http://opensource.org/licenses/AFL-3.0 +# http://vittorioromeo.info | vittorio.romeo@outlook.com + +# Adapted from Louise Dionne's FindHana.cmake file + +# Copyright Louis Dionne 2015 +# Distributed under the Boost Software License, Version 1.0. +# (See accompanying file LICENSE.md or copy at http://boost.org/LICENSE_1_0.txt) + +# TODO: document variables: +# THREAD_POOL_CPP_FOUND +# THREAD_POOL_CPP_INCLUDE_DIR +# THREAD_POOL_CPP_CLONE_DIR +# THREAD_POOL_CPP_ENABLE_TESTS + +find_path( + THREAD_POOL_CPP_INCLUDE_DIR + + NAMES vrm/core.hpp + DOC "Include directory for the thread-pool-cpp library" + + PATH_SUFFIXES include/ + + PATHS + "${PROJECT_SOURCE_DIR}/../thread-pool-cpp/" + "${PROJECT_SOURCE_DIR}/../thread_pool_cpp/" + ${THREAD_POOL_CPP_ROOT} + $ENV{THREAD_POOL_CPP_ROOT} + /usr/local/ + /usr/ + /sw/ + /opt/local/ + /opt/csw/ + /opt/ + "${PROJECT_SOURCE_DIR}/extlibs/thread-pool-cpp/" + "${PROJECT_SOURCE_DIR}/extlibs/thread_pool_cpp/" + "${CMAKE_CURRENT_LIST_DIR}/../../" + + NO_DEFAULT_PATH +) + +if (NOT EXISTS "${THREAD_POOL_CPP_INCLUDE_DIR}" AND DEFINED THREAD_POOL_CPP_CLONE_DIR) + set(_build_dir "${CMAKE_CURRENT_BINARY_DIR}/thread-pool-cpp") + + if (DEFINED THREAD_POOL_CPP_ENABLE_TESTS) + set(_test_cmd ${CMAKE_COMMAND} --build ${_build_dir} --target check) + else() + set(_test_cmd "") + endif() + + include(ExternalProject) + ExternalProject_Add(thread_pool_cpp + PREFIX ${_build_dir} + STAMP_DIR ${_build_dir}/_stamps + TMP_DIR ${_build_dir}/_tmp + + # Since we don't have any files to download, we set the DOWNLOAD_DIR + # to TMP_DIR to avoid creating a useless empty directory. + DOWNLOAD_DIR ${_build_dir}/_tmp + + # Download step + GIT_REPOSITORY https://github.com/SuperV1234/thread-pool-cpp + GIT_TAG master + TIMEOUT 20 + + # Configure step + SOURCE_DIR "${THREAD_POOL_CPP_CLONE_DIR}" + CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER} -DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS} + + BINARY_DIR "${_build_dir}" + BUILD_COMMAND "" + + # Install step (nothing to be done) + INSTALL_COMMAND "" + + # Test step + TEST_COMMAND ${_test_cmd} + ) + + set(THREAD_POOL_CPP_INCLUDE_DIR "${THREAD_POOL_CPP_CLONE_DIR}/include") +endif() + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(THREAD_POOL_CPP DEFAULT_MSG THREAD_POOL_CPP_INCLUDE_DIR) \ No newline at end of file From 7d77deb0ea84d90039bf904d21efd2a957bfb559 Mon Sep 17 00:00:00 2001 From: Vittorio Romeo Date: Sat, 27 Feb 2016 20:41:40 +0100 Subject: [PATCH 04/10] ThreadPool is now a template class (can be used independently from the underlying future or packaged_task type --- .gitignore | 3 +- CMakeLists.txt | 28 ++-- benchmark/benchmark.cpp | 122 ++++++++++-------- include/thread_pool/future_dependencies.hpp | 21 --- include/thread_pool/thread_pool.hpp | 49 +++++-- tests/thread_pool.t.cpp | 134 +++++++++++--------- 6 files changed, 204 insertions(+), 153 deletions(-) delete mode 100644 include/thread_pool/future_dependencies.hpp diff --git a/.gitignore b/.gitignore index 78e40937..b1a1883a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ boost* build* thread_pool-build* -*.pro.user \ No newline at end of file +*.pro.user +*.user \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index 9425fc06..7423b2d0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,22 +1,22 @@ -# thread-poll-cpp build script +cmake_minimum_required(VERSION 3.0) -project(thread-pool-cpp) - -cmake_minimum_required(VERSION 2.8) +list(APPEND CMAKE_MODULE_PATH + "${CMAKE_CURRENT_SOURCE_DIR}/cmake" + "${CMAKE_CURRENT_SOURCE_DIR}/cmake/modules") -ADD_DEFINITIONS( - -std=c++1y -Wall -Werror -O3 -) - -include_directories("${CMAKE_CURRENT_SOURCE_DIR}/include") +project(thread-pool-cpp) +add_definitions(-std=c++14 -Wall -Werror -O3) add_subdirectory(tests) add_subdirectory(benchmark) -# Install as header-only library +# Get all include files set(INC_DIR "${CMAKE_CURRENT_SOURCE_DIR}/include/") -file(GLOB_RECURSE INSTALL_FILES_LIST ${INC_DIR}) -set_source_files_properties(${file_list} PROPERTIES HEADER_FILE_ONLY 1) -add_library(HEADER_ONLY_TARGET STATIC ${file_list}) +include_directories("${INC_DIR}") +file(GLOB_RECURSE INSTALL_FILES_LIST "${INC_DIR}/*") + +# Install as header-only library +set_source_files_properties(${INSTALL_FILES_LIST} PROPERTIES HEADER_FILE_ONLY 1) +add_library(HEADER_ONLY_TARGET STATIC ${INSTALL_FILES_LIST}) set_target_properties(HEADER_ONLY_TARGET PROPERTIES LINKER_LANGUAGE CXX) -install(DIRECTORY ${INC_DIR} DESTINATION "include") \ No newline at end of file +install(DIRECTORY ${INC_DIR} DESTINATION "include") diff --git a/benchmark/benchmark.cpp b/benchmark/benchmark.cpp index 553b6b24..da77d332 100644 --- a/benchmark/benchmark.cpp +++ b/benchmark/benchmark.cpp @@ -14,55 +14,58 @@ using namespace tp; +using ThreadPoolStd = + ThreadPool>>; + static const size_t CONCURRENCY = 16; static const size_t REPOST_COUNT = 1000000; -struct Heavy { +struct Heavy +{ bool verbose; std::vector resource; - Heavy(bool verbose = false) - : verbose(verbose) - , resource(100*1024*1024) + Heavy(bool verbose = false) : verbose(verbose), resource(100 * 1024 * 1024) { - if (verbose) { + if(verbose) + { std::cout << "heavy default constructor" << std::endl; } } - Heavy(const Heavy &o) - : verbose(o.verbose) - , resource(o.resource) + Heavy(const Heavy& o) : verbose(o.verbose), resource(o.resource) { - if (verbose) { + if(verbose) + { std::cout << "heavy copy constructor" << std::endl; } } - Heavy(Heavy &&o) - : verbose(o.verbose) - , resource(std::move(o.resource)) + Heavy(Heavy&& o) : verbose(o.verbose), resource(std::move(o.resource)) { - if (verbose) { + if(verbose) + { std::cout << "heavy move constructor" << std::endl; } } - Heavy & operator==(const Heavy &o) + Heavy& operator==(const Heavy& o) { verbose = o.verbose; resource = o.resource; - if (verbose) { + if(verbose) + { std::cout << "heavy copy operator" << std::endl; } return *this; } - Heavy & operator==(const Heavy &&o) + Heavy& operator==(const Heavy&& o) { verbose = o.verbose; resource = std::move(o.resource); - if (verbose) { + if(verbose) + { std::cout << "heavy move operator" << std::endl; } return *this; @@ -70,72 +73,86 @@ struct Heavy { ~Heavy() { - if (verbose) { - std::cout << "heavy destructor. " << (resource.size() ? "Owns resource" : "Doesn't own resource") << std::endl; + if(verbose) + { + std::cout << "heavy destructor. " + << (resource.size() ? "Owns resource" + : "Doesn't own resource") + << std::endl; } } }; -struct RepostJob { - //Heavy heavy; +struct RepostJob +{ + // Heavy heavy; - ThreadPool *thread_pool; + ThreadPoolStd* thread_pool; #ifndef WITHOUT_ASIO - AsioThreadPool *asio_thread_pool; + AsioThreadPool* asio_thread_pool; #endif volatile size_t counter; long long int begin_count; - std::promise *waiter; + std::promise* waiter; - RepostJob(ThreadPool *thread_pool, std::promise *waiter) + RepostJob(ThreadPoolStd* thread_pool, std::promise* waiter) : thread_pool(thread_pool) #ifndef WITHOUT_ASIO - , asio_thread_pool(0) + , + asio_thread_pool(0) #endif - , counter(0) - , waiter(waiter) + , + counter(0), waiter(waiter) { - begin_count = std::chrono::high_resolution_clock::now().time_since_epoch().count(); + begin_count = std::chrono::high_resolution_clock::now() + .time_since_epoch() + .count(); } #ifndef WITHOUT_ASIO - RepostJob(AsioThreadPool *asio_thread_pool, std::promise *waiter) - : thread_pool(0) - , asio_thread_pool(asio_thread_pool) - , counter(0) - , waiter(waiter) + RepostJob(AsioThreadPool* asio_thread_pool, std::promise* waiter) + : thread_pool(0), asio_thread_pool(asio_thread_pool), counter(0), + waiter(waiter) { - begin_count = std::chrono::high_resolution_clock::now().time_since_epoch().count(); + begin_count = std::chrono::high_resolution_clock::now() + .time_since_epoch() + .count(); } #endif void operator()() { - if (counter++ < REPOST_COUNT) { + if(counter++ < REPOST_COUNT) + { #ifndef WITHOUT_ASIO - if (asio_thread_pool) { + if(asio_thread_pool) + { asio_thread_pool->post(*this); return; } #endif - if (thread_pool) { + if(thread_pool) + { thread_pool->post(*this); return; } } - else { - long long int end_count = std::chrono::high_resolution_clock::now().time_since_epoch().count(); - std::cout << "reposted " << counter - << " in " << (double)(end_count - begin_count)/(double)1000000 << " ms" - << std::endl; + else + { + long long int end_count = std::chrono::high_resolution_clock::now() + .time_since_epoch() + .count(); + std::cout << "reposted " << counter << " in " + << (double)(end_count - begin_count) / (double)1000000 + << " ms" << std::endl; waiter->set_value(); } } }; -int main(int, const char *[]) +int main(int, const char* []) { std::cout << "Benchmark job reposting" << std::endl; @@ -143,12 +160,14 @@ int main(int, const char *[]) std::cout << "***thread pool cpp***" << std::endl; std::promise waiters[CONCURRENCY]; - ThreadPool thread_pool; - for (auto &waiter : waiters) { + ThreadPoolStd thread_pool; + for(auto& waiter : waiters) + { thread_pool.post(RepostJob(&thread_pool, &waiter)); } - for (auto &waiter : waiters) { + for(auto& waiter : waiters) + { waiter.get_future().wait(); } } @@ -158,18 +177,21 @@ int main(int, const char *[]) std::cout << "***asio thread pool***" << std::endl; size_t workers_count = std::thread::hardware_concurrency(); - if (0 == workers_count) { + if(0 == workers_count) + { workers_count = 1; } AsioThreadPool asio_thread_pool(workers_count); std::promise waiters[CONCURRENCY]; - for (auto &waiter : waiters) { + for(auto& waiter : waiters) + { asio_thread_pool.post(RepostJob(&asio_thread_pool, &waiter)); } - for (auto &waiter : waiters) { + for(auto& waiter : waiters) + { waiter.get_future().wait(); } } diff --git a/include/thread_pool/future_dependencies.hpp b/include/thread_pool/future_dependencies.hpp deleted file mode 100644 index 5c5595d6..00000000 --- a/include/thread_pool/future_dependencies.hpp +++ /dev/null @@ -1,21 +0,0 @@ -#ifndef THREAD_POOL_FUTURE_DEPENDENCIES_HPP -#define THREAD_POOL_FUTURE_DEPENDENCIES_HPP - -#if defined(THREAD_POOL_USE_BOOST_FUTURE) -#include -#else -#include -#endif - -namespace tp -{ -#if defined(THREAD_POOL_USE_BOOST_FUTURE) - template - using packaged_task_type = boost::packaged_task; -#else - template - using packaged_task_type = std::packaged_task; -#endif -} - -#endif \ No newline at end of file diff --git a/include/thread_pool/thread_pool.hpp b/include/thread_pool/thread_pool.hpp index 0044c369..78e86dbc 100644 --- a/include/thread_pool/thread_pool.hpp +++ b/include/thread_pool/thread_pool.hpp @@ -1,7 +1,6 @@ #ifndef THREAD_POOL_HPP #define THREAD_POOL_HPP -#include #include #include #include @@ -25,6 +24,30 @@ namespace tp size_t worker_queue_size = 1024; }; + template