diff --git a/.gitignore b/.gitignore index b1a1883a..cdf1ed2c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1 @@ -boost* build* -thread_pool-build* -*.pro.user -*.user \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index 50c8f2d9..92f1233d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,23 +1,22 @@ cmake_minimum_required(VERSION 3.0) +project(thread-pool-cpp CXX) + list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake" "${CMAKE_CURRENT_SOURCE_DIR}/cmake/modules") -project(thread-pool-cpp) -add_definitions(-std=c++14 -Wall -Werror -O3) - +add_definitions(-std=c++14 -Wall -Werror) # Get all include files -set(THREAD_POOL_CPP_INC_DIR "${CMAKE_CURRENT_SOURCE_DIR}/include/") -include_directories("${THREAD_POOL_CPP_INC_DIR}") -file(GLOB_RECURSE INSTALL_FILES_LIST "${THREAD_POOL_CPP_INC_DIR}/*") +include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include) add_subdirectory(tests) add_subdirectory(benchmark) # Install as header-only library +file(GLOB_RECURSE INSTALL_FILES_LIST "${CMAKE_CURRENT_SOURCE_DIR}/include/*") set_source_files_properties(${INSTALL_FILES_LIST} PROPERTIES HEADER_FILE_ONLY 1) add_library(HEADER_ONLY_TARGET STATIC ${INSTALL_FILES_LIST}) set_target_properties(HEADER_ONLY_TARGET PROPERTIES LINKER_LANGUAGE CXX) -install(DIRECTORY ${THREAD_POOL_CPP_INC_DIR} DESTINATION "include") +install(DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/include/" DESTINATION "include") diff --git a/benchmark/CMakeLists.txt b/benchmark/CMakeLists.txt index 8d3377bd..a5e1773f 100644 --- a/benchmark/CMakeLists.txt +++ b/benchmark/CMakeLists.txt @@ -1,11 +1,7 @@ #benchmark -include_directories("${THREAD_POOL_CPP_INC_DIR}") - -find_package(Boost REQUIRED COMPONENTS system) - include_directories(${CMAKE_CURRENT_SOURCE_DIR}) -include_directories(${Boost_INCLUDE_DIR}) + add_executable(benchmark benchmark.cpp) -target_link_libraries(benchmark ${Boost_LIBRARIES} pthread) +target_link_libraries(benchmark pthread) diff --git a/benchmark/benchmark.cpp b/benchmark/benchmark.cpp index a6df324b..eae093bf 100644 --- a/benchmark/benchmark.cpp +++ b/benchmark/benchmark.cpp @@ -1,8 +1,6 @@ -//#define WITHOUT_ASIO 1 - #include -#ifndef WITHOUT_ASIO +#ifdef WITH_ASIO #include #endif @@ -14,8 +12,6 @@ using namespace tp; -using ThreadPoolStd = ThreadPool<>; - static const size_t CONCURRENCY = 16; static const size_t REPOST_COUNT = 1000000; @@ -85,10 +81,10 @@ struct Heavy struct RepostJob { - // Heavy heavy; + //Heavy heavy; - ThreadPoolStd* thread_pool; -#ifndef WITHOUT_ASIO + ThreadPool* thread_pool; +#ifdef WITH_ASIO AsioThreadPool* asio_thread_pool; #endif @@ -96,9 +92,9 @@ struct RepostJob long long int begin_count; std::promise* waiter; - RepostJob(ThreadPoolStd* thread_pool, std::promise* waiter) + RepostJob(ThreadPool* thread_pool, std::promise* waiter) : thread_pool(thread_pool) -#ifndef WITHOUT_ASIO +#ifdef WITH_ASIO , asio_thread_pool(0) #endif @@ -110,7 +106,7 @@ struct RepostJob .count(); } -#ifndef WITHOUT_ASIO +#ifdef WITH_ASIO RepostJob(AsioThreadPool* asio_thread_pool, std::promise* waiter) : thread_pool(0), asio_thread_pool(asio_thread_pool), counter(0), waiter(waiter) @@ -123,9 +119,9 @@ struct RepostJob void operator()() { - if(counter++ < REPOST_COUNT) + if(++counter < REPOST_COUNT) { -#ifndef WITHOUT_ASIO +#ifdef WITH_ASIO if(asio_thread_pool) { asio_thread_pool->post(*this); @@ -159,7 +155,7 @@ int main(int, const char* []) std::cout << "***thread pool cpp***" << std::endl; std::promise waiters[CONCURRENCY]; - ThreadPoolStd thread_pool; + ThreadPool thread_pool; for(auto& waiter : waiters) { thread_pool.post(RepostJob(&thread_pool, &waiter)); @@ -171,7 +167,7 @@ int main(int, const char* []) } } -#ifndef WITHOUT_ASIO +#ifdef WITH_ASIO { std::cout << "***asio thread pool***" << std::endl; diff --git a/cmake/Findthread-pool-cpp.cmake b/cmake/Findthread-pool-cpp.cmake index 1910b9d3..6205b2fe 100644 --- a/cmake/Findthread-pool-cpp.cmake +++ b/cmake/Findthread-pool-cpp.cmake @@ -83,4 +83,4 @@ if (NOT EXISTS "${THREAD_POOL_CPP_INCLUDE_DIR}" AND DEFINED THREAD_POOL_CPP_CLON endif() include(FindPackageHandleStandardArgs) -find_package_handle_standard_args(THREAD_POOL_CPP DEFAULT_MSG THREAD_POOL_CPP_INCLUDE_DIR) \ No newline at end of file +find_package_handle_standard_args(THREAD_POOL_CPP DEFAULT_MSG THREAD_POOL_CPP_INCLUDE_DIR) diff --git a/include/thread_pool.hpp b/include/thread_pool.hpp index 162fcc84..d8b0bc4b 100644 --- a/include/thread_pool.hpp +++ b/include/thread_pool.hpp @@ -1,3 +1,3 @@ #pragma once -#include "./thread_pool/thread_pool.hpp" \ No newline at end of file +#include diff --git a/include/thread_pool/fixed_function.hpp b/include/thread_pool/fixed_function.hpp index 1f469cd1..5237052a 100644 --- a/include/thread_pool/fixed_function.hpp +++ b/include/thread_pool/fixed_function.hpp @@ -8,155 +8,156 @@ namespace tp { +/** + * @brief The FixedFunction class implements + * functional object. + * This function is analog of 'std::function' with limited capabilities: + * - It supports only move semantics. + * - The size of functional objects is limited to storage size. + * Due to limitations above it is much faster on creation and copying than + * std::function. + */ +template +class FixedFunction; + +template +class FixedFunction +{ + + typedef R (*func_ptr_type)(ARGS...); + +public: + FixedFunction() + : m_function_ptr(nullptr), m_method_ptr(nullptr), + m_alloc_ptr(nullptr) + { + } + /** - * @brief The FixedFunction class implements - * functional object. - * This function is analog of 'std::function' with limited capabilities: - * - It supports only move semantics. - * - The size of functional objects is limited to storage size. - * Due to limitations above it is much faster on creation and copying than - * std::function. + * @brief FixedFunction Constructor from functional object. + * @param object Functor object will be stored in the internal storage + * using move constructor. Unmovable objects are prohibited explicitly. */ - template - class FixedFunction; - - template - class FixedFunction + template + FixedFunction(FUNC&& object) + : FixedFunction() { + typedef typename std::remove_reference::type unref_type; - typedef R (*func_ptr_type)(ARGS...); + static_assert(sizeof(unref_type) < STORAGE_SIZE, + "functional object doesn't fit into internal storage"); + static_assert(std::is_move_constructible::value, + "Should be of movable type"); - public: - FixedFunction() - : m_function_ptr(nullptr), m_method_ptr(nullptr), - m_alloc_ptr(nullptr) + m_method_ptr = []( + void* object_ptr, func_ptr_type, ARGS... args) -> R { - } + return static_cast(object_ptr) + -> + operator()(args...); + }; - /** - * @brief FixedFunction Constructor from functional object. - * @param object Functor object will be stored in the internal storage - * using move constructor. Unmovable objects are prohibited explicitly. - */ - template - FixedFunction(FUNC&& object) - : FixedFunction() + m_alloc_ptr = [](void* storage_ptr, void* object_ptr) { - typedef typename std::remove_reference::type unref_type; - - static_assert(sizeof(unref_type) < STORAGE_SIZE, - "functional object doesn't fit into internal storage"); - static_assert(std::is_move_constructible::value, - "Should be of movable type"); - - m_method_ptr = []( - void* object_ptr, func_ptr_type, ARGS... args) -> R + if(object_ptr) { - return static_cast(object_ptr) - -> - operator()(args...); - }; - - m_alloc_ptr = [](void* storage_ptr, void* object_ptr) + unref_type* x_object = static_cast(object_ptr); + new(storage_ptr) unref_type(std::move(*x_object)); + } + else { - if(object_ptr) - { - unref_type* x_object = static_cast(object_ptr); - new(storage_ptr) unref_type(std::move(*x_object)); - } - else - { - static_cast(storage_ptr)->~unref_type(); - } - }; - - m_alloc_ptr(&m_storage, &object); - } + static_cast(storage_ptr)->~unref_type(); + } + }; - /** - * @brief FixedFunction Constructor from free function or static member. - */ - template - FixedFunction(RET (*func_ptr)(PARAMS...)) - : FixedFunction() - { - m_function_ptr = func_ptr; - m_method_ptr = [](void*, func_ptr_type f_ptr, ARGS... args) -> R - { - return static_cast(f_ptr)(args...); - }; - } + m_alloc_ptr(&m_storage, &object); + } - FixedFunction(FixedFunction&& o) : FixedFunction() + /** + * @brief FixedFunction Constructor from free function or static member. + */ + template + FixedFunction(RET (*func_ptr)(PARAMS...)) + : FixedFunction() + { + m_function_ptr = func_ptr; + m_method_ptr = [](void*, func_ptr_type f_ptr, ARGS... args) -> R { - moveFromOther(o); - } + return static_cast(f_ptr)(args...); + }; + } - FixedFunction& operator=(FixedFunction&& o) - { - moveFromOther(o); - return *this; - } + FixedFunction(FixedFunction&& o) : FixedFunction() + { + moveFromOther(o); + } - ~FixedFunction() - { - if(m_alloc_ptr) m_alloc_ptr(&m_storage, nullptr); - } + FixedFunction& operator=(FixedFunction&& o) + { + moveFromOther(o); + return *this; + } - /** - * @brief operator () Execute stored functional object. - * @throws std::runtime_error if no functional object is stored. - */ - R operator()(ARGS... args) - { - if(!m_method_ptr) throw std::runtime_error("call of empty functor"); - return m_method_ptr(&m_storage, m_function_ptr, args...); - } + ~FixedFunction() + { + if(m_alloc_ptr) m_alloc_ptr(&m_storage, nullptr); + } - private: - FixedFunction& operator=(const FixedFunction&) = delete; - FixedFunction(const FixedFunction&) = delete; + /** + * @brief operator () Execute stored functional object. + * @throws std::runtime_error if no functional object is stored. + */ + R operator()(ARGS... args) + { + if(!m_method_ptr) throw std::runtime_error("call of empty functor"); + return m_method_ptr(&m_storage, m_function_ptr, args...); + } - union - { - typename std::aligned_storage::type - m_storage; - func_ptr_type m_function_ptr; - }; +private: + FixedFunction& operator=(const FixedFunction&) = delete; + FixedFunction(const FixedFunction&) = delete; - typedef R (*method_type)( - void* object_ptr, func_ptr_type free_func_ptr, ARGS... args); - method_type m_method_ptr; + union + { + typename std::aligned_storage::type + m_storage; + func_ptr_type m_function_ptr; + }; - typedef void (*alloc_type)(void* storage_ptr, void* object_ptr); - alloc_type m_alloc_ptr; + typedef R (*method_type)( + void* object_ptr, func_ptr_type free_func_ptr, ARGS... args); + method_type m_method_ptr; - void moveFromOther(FixedFunction& o) - { - if(this == &o) return; + typedef void (*alloc_type)(void* storage_ptr, void* object_ptr); + alloc_type m_alloc_ptr; - if(m_alloc_ptr) - { - m_alloc_ptr(&m_storage, nullptr); - m_alloc_ptr = nullptr; - } - else - { - m_function_ptr = nullptr; - } + void moveFromOther(FixedFunction& o) + { + if(this == &o) return; - m_method_ptr = o.m_method_ptr; - o.m_method_ptr = nullptr; + if(m_alloc_ptr) + { + m_alloc_ptr(&m_storage, nullptr); + m_alloc_ptr = nullptr; + } + else + { + m_function_ptr = nullptr; + } - if(o.m_alloc_ptr) - { - m_alloc_ptr = o.m_alloc_ptr; - m_alloc_ptr(&m_storage, &o.m_storage); - } - else - { - m_function_ptr = o.m_function_ptr; - } + m_method_ptr = o.m_method_ptr; + o.m_method_ptr = nullptr; + + if(o.m_alloc_ptr) + { + m_alloc_ptr = o.m_alloc_ptr; + m_alloc_ptr(&m_storage, &o.m_storage); } - }; + else + { + m_function_ptr = o.m_function_ptr; + } + } +}; + } diff --git a/include/thread_pool/mpsc_bounded_queue.hpp b/include/thread_pool/mpsc_bounded_queue.hpp index 1d9c0c95..b06ee083 100644 --- a/include/thread_pool/mpsc_bounded_queue.hpp +++ b/include/thread_pool/mpsc_bounded_queue.hpp @@ -43,191 +43,200 @@ namespace tp { + +/** + * @brief The MPMCBoundedQueue class implements bounded + * multi-producers/multi-consumers lock-free queue. + * Doesn't accept non-movable types as T. + * Inspired by Dmitry Vyukov's mpmc queue. + * http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue + */ +template +class MPMCBoundedQueue +{ + static_assert( + std::is_move_constructible::value, "Should be of movable type"); + +public: /** - * @brief The MPMCBoundedQueue class implements bounded - * multi-producers/multi-consumers lock-free queue. - * Doesn't accept non-movable types as T. - * Inspired by Dmitry Vyukov's mpmc queue. - * http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue + * @brief MPMCBoundedQueue Constructor. + * @param size Power of 2 number - queue length. + * @throws std::invalid_argument if size is bad. */ - template - class MPMCBoundedQueue - { - static_assert( - std::is_move_constructible::value, "Should be of movable type"); - - public: - /** - * @brief MPMCBoundedQueue Constructor. - * @param size Power of 2 number - queue length. - * @throws std::invalid_argument if size is bad. - */ - explicit MPMCBoundedQueue(size_t size); - - /** - * @brief push Push data to queue. - * @param data Data to be pushed. - * @return true on success. - */ - template - bool push(U&& data); - - /** - * @brief pop Pop data from queue. - * @param data Place to store popped data. - * @return true on sucess. - */ - bool pop(T& data); - - private: - MPMCBoundedQueue(const MPMCBoundedQueue&) = delete; - MPMCBoundedQueue& operator=(const MPMCBoundedQueue&) = delete; - - struct Cell - { - std::atomic sequence; - T data; + explicit MPMCBoundedQueue(size_t size); - Cell() = default; + /** + * @brief Move ctor implementation. + */ + MPMCBoundedQueue(MPMCBoundedQueue&& rhs) noexcept; - Cell(const Cell&) = delete; - Cell& operator=(const Cell&) = delete; + /** + * @brief Move assignment implementaion. + */ + MPMCBoundedQueue& operator=(MPMCBoundedQueue&& rhs) noexcept; - Cell(Cell&& rhs) - : sequence(rhs.sequence.load()), data(std::move(rhs.data)) - { - } + /** + * @brief push Push data to queue. + * @param data Data to be pushed. + * @return true on success. + */ + template + bool push(U&& data); - Cell& operator=(Cell&& rhs) - { - sequence = rhs.sequence.load(); - data = std::move(rhs.data); + /** + * @brief pop Pop data from queue. + * @param data Place to store popped data. + * @return true on sucess. + */ + bool pop(T& data); - return *this; - } - }; - - public: - MPMCBoundedQueue(MPMCBoundedQueue&& rhs) - : m_buffer(std::move(rhs.m_buffer)), - m_buffer_mask(std::move(rhs.m_buffer_mask)), - m_enqueue_pos(rhs.m_enqueue_pos.load()), - m_dequeue_pos(rhs.m_dequeue_pos.load()) +private: + struct Cell + { + std::atomic sequence; + T data; + + Cell() = default; + + Cell(const Cell&) = delete; + Cell& operator=(const Cell&) = delete; + + Cell(Cell&& rhs) + : sequence(rhs.sequence.load()), data(std::move(rhs.data)) { } - MPMCBoundedQueue& operator=(MPMCBoundedQueue&& rhs) + Cell& operator=(Cell&& rhs) { - m_buffer = std::move(rhs.m_buffer); - m_buffer_mask = std::move(rhs.m_buffer_mask); - m_enqueue_pos = rhs.m_enqueue_pos.load(); - m_dequeue_pos = rhs.m_dequeue_pos.load(); + sequence = rhs.sequence.load(); + data = std::move(rhs.data); return *this; } - - private: - typedef char Cacheline[64]; - - Cacheline pad0; - std::vector m_buffer; - /* const */ size_t m_buffer_mask; - Cacheline pad1; - std::atomic m_enqueue_pos; - Cacheline pad2; - std::atomic m_dequeue_pos; - Cacheline pad3; }; +private: + typedef char Cacheline[64]; + + Cacheline pad0; + std::vector m_buffer; + /* const */ size_t m_buffer_mask; + Cacheline pad1; + std::atomic m_enqueue_pos; + Cacheline pad2; + std::atomic m_dequeue_pos; + Cacheline pad3; +}; - /// Implementation - template - inline MPMCBoundedQueue::MPMCBoundedQueue(size_t size) - : m_buffer(size), m_buffer_mask(size - 1), m_enqueue_pos(0), - m_dequeue_pos(0) +/// Implementation + +template +inline MPMCBoundedQueue::MPMCBoundedQueue(size_t size) + : m_buffer(size), m_buffer_mask(size - 1), m_enqueue_pos(0), + m_dequeue_pos(0) +{ + bool size_is_power_of_2 = (size >= 2) && ((size & (size - 1)) == 0); + if(!size_is_power_of_2) { - bool size_is_power_of_2 = (size >= 2) && ((size & (size - 1)) == 0); - if(!size_is_power_of_2) - { - throw std::invalid_argument("buffer size should be a power of 2"); - } + throw std::invalid_argument("buffer size should be a power of 2"); + } - for(size_t i = 0; i < size; ++i) - { - m_buffer[i].sequence = i; - } + for(size_t i = 0; i < size; ++i) + { + m_buffer[i].sequence = i; } +} - template - template - inline bool MPMCBoundedQueue::push(U&& data) +template +inline MPMCBoundedQueue::MPMCBoundedQueue(MPMCBoundedQueue&& rhs) noexcept +{ + *this = rhs; +} + +template +inline MPMCBoundedQueue& MPMCBoundedQueue::operator=(MPMCBoundedQueue&& rhs) noexcept +{ + if (this != &rhs) + { + m_buffer = std::move(rhs.m_buffer); + m_buffer_mask = std::move(rhs.m_buffer_mask); + m_enqueue_pos = rhs.m_enqueue_pos.load(); + m_dequeue_pos = rhs.m_dequeue_pos.load(); + } + return *this; +} + +template +template +inline bool MPMCBoundedQueue::push(U&& data) +{ + Cell* cell; + size_t pos = m_enqueue_pos.load(std::memory_order_relaxed); + for(;;) { - Cell* cell; - size_t pos = m_enqueue_pos.load(std::memory_order_relaxed); - for(;;) + cell = &m_buffer[pos & m_buffer_mask]; + size_t seq = cell->sequence.load(std::memory_order_acquire); + intptr_t dif = (intptr_t)seq - (intptr_t)pos; + if(dif == 0) { - cell = &m_buffer[pos & m_buffer_mask]; - size_t seq = cell->sequence.load(std::memory_order_acquire); - intptr_t dif = (intptr_t)seq - (intptr_t)pos; - if(dif == 0) + if(m_enqueue_pos.compare_exchange_weak( + pos, pos + 1, std::memory_order_relaxed)) { - if(m_enqueue_pos.compare_exchange_weak( - pos, pos + 1, std::memory_order_relaxed)) - { - break; - } - } - else if(dif < 0) - { - return false; - } - else - { - pos = m_enqueue_pos.load(std::memory_order_relaxed); + break; } } + else if(dif < 0) + { + return false; + } + else + { + pos = m_enqueue_pos.load(std::memory_order_relaxed); + } + } - cell->data = std::forward(data); + cell->data = std::forward(data); - cell->sequence.store(pos + 1, std::memory_order_release); + cell->sequence.store(pos + 1, std::memory_order_release); - return true; - } + return true; +} - template - inline bool MPMCBoundedQueue::pop(T& data) +template +inline bool MPMCBoundedQueue::pop(T& data) +{ + Cell* cell; + size_t pos = m_dequeue_pos.load(std::memory_order_relaxed); + for(;;) { - Cell* cell; - size_t pos = m_dequeue_pos.load(std::memory_order_relaxed); - for(;;) + cell = &m_buffer[pos & m_buffer_mask]; + size_t seq = cell->sequence.load(std::memory_order_acquire); + intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1); + if(dif == 0) { - cell = &m_buffer[pos & m_buffer_mask]; - size_t seq = cell->sequence.load(std::memory_order_acquire); - intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1); - if(dif == 0) - { - if(m_dequeue_pos.compare_exchange_weak( - pos, pos + 1, std::memory_order_relaxed)) - { - break; - } - } - else if(dif < 0) - { - return false; - } - else + if(m_dequeue_pos.compare_exchange_weak( + pos, pos + 1, std::memory_order_relaxed)) { - pos = m_dequeue_pos.load(std::memory_order_relaxed); + break; } } + else if(dif < 0) + { + return false; + } + else + { + pos = m_dequeue_pos.load(std::memory_order_relaxed); + } + } - data = std::move(cell->data); + data = std::move(cell->data); - cell->sequence.store( - pos + m_buffer_mask + 1, std::memory_order_release); + cell->sequence.store( + pos + m_buffer_mask + 1, std::memory_order_release); + + return true; +} - return true; - } } diff --git a/include/thread_pool/thread_pool.hpp b/include/thread_pool/thread_pool.hpp index e854eb3a..2016f9a4 100644 --- a/include/thread_pool/thread_pool.hpp +++ b/include/thread_pool/thread_pool.hpp @@ -1,168 +1,182 @@ -#pragma once +#pragma once + +#include -#include #include -#include #include +#include #include -#include -#include -#include "./worker.hpp" namespace tp { - /** - * @brief The ThreadPoolOptions struct provides construction options for - * ThreadPool. - */ - struct ThreadPoolOptions + +/** + * @brief The ThreadPoolOptions struct provides construction options for + * ThreadPool. + */ +struct ThreadPoolOptions +{ + enum { - enum - { - AUTODETECT = 0 - }; - - std::size_t threads_count = AUTODETECT; - std::size_t worker_queue_size = 1024; + AUTODETECT = 0 }; + size_t threads_count = AUTODETECT; + size_t worker_queue_size = 1024; +}; + +/** + * @brief The ThreadPool class implements thread pool pattern. + * It is highly scalable and fast. + * It is header only. + * It implements both work-stealing and work-distribution balancing + * startegies. + * It implements cooperative scheduling strategy for tasks. + */ +template +class ThreadPoolImpl { +public: /** - * @brief The ThreadPool class implements thread pool pattern. - * It is highly scalable and fast. - * It is header only. - * It implements both work-stealing and work-distribution balancing - * startegies. - * It implements cooperative scheduling strategy for tasks. + * @brief ThreadPool Construct and start new thread pool. + * @param options Creation options. */ - template - class ThreadPool - { - public: - /** - * @brief ThreadPool Construct and start new thread pool. - * @param options Creation options. - */ - explicit ThreadPool( - const ThreadPoolOptions& options = ThreadPoolOptions()); - - /** - * @brief ~ThreadPool Stop all workers and destroy thread pool. - */ - ~ThreadPool(); - - private: - // TODO: - template - bool try_post(Handler&& handler); - - public: - /** - * @brief post Post piece of job to thread pool. - * @param handler Handler to be called from thread pool worker. It has - * to be - * callable as 'handler()'. - * @throws std::overflow_error if worker's queue is full. - * @note All exceptions thrown by handler will be suppressed. Use - * 'process()' to get result of handler's - * execution or exception thrown. - */ - template - void post(Handler&& handler); - - - private: - ThreadPool(const ThreadPool&) = delete; - ThreadPool& operator=(const ThreadPool&) = delete; - - public: - ThreadPool(ThreadPool&& rhs) - : m_workers(std::move(rhs.m_workers)), - m_next_worker(rhs.m_next_worker.load()) - { - } - - ThreadPool& operator=(ThreadPool&& rhs) - { - m_workers = std::move(rhs.m_workers); - m_next_worker = rhs.m_next_worker.load(); - - return *this; - } - - private: - Worker& getWorker(); - - std::vector> m_workers; - std::atomic m_next_worker; - }; + explicit ThreadPoolImpl( + const ThreadPoolOptions& options = ThreadPoolOptions()); + + /** + * @brief Move ctor implementation. + */ + ThreadPoolImpl(ThreadPoolImpl&& rhs) noexcept; + + /** + * @brief ~ThreadPool Stop all workers and destroy thread pool. + */ + ~ThreadPoolImpl(); + + /** + * @brief Move assignment implementaion. + */ + ThreadPoolImpl& operator=(ThreadPoolImpl&& rhs) noexcept; + + /** + * @brief post Try post job to thread pool. + * @param handler Handler to be called from thread pool worker. It has + * to be callable as 'handler()'. + * @return 'true' on success, false otherwise. + * @note All exceptions thrown by handler will be suppressed. + */ + template + bool tryPost(Handler&& handler); + + /** + * @brief post Post job to thread pool. + * @param handler Handler to be called from thread pool worker. It has + * to be callable as 'handler()'. + * @throw std::overflow_error if worker's queue is full. + * @note All exceptions thrown by handler will be suppressed. + */ + template + void post(Handler&& handler); + +private: + Worker& getWorker(); + + std::vector>> m_workers; + std::atomic m_next_worker; +}; + +using ThreadPool = ThreadPoolImpl<128>; - /// Implementation +/// Implementation - template - inline ThreadPool::ThreadPool(const ThreadPoolOptions& options) - : m_next_worker(0) +template +inline ThreadPoolImpl::ThreadPoolImpl( + const ThreadPoolOptions& options) + : m_next_worker(0) +{ + size_t workers_count = options.threads_count; + + if(ThreadPoolOptions::AUTODETECT == options.threads_count) { - size_t workers_count = options.threads_count; - - if(ThreadPoolOptions::AUTODETECT == options.threads_count) - { - workers_count = std::thread::hardware_concurrency(); - } - - if(0 == workers_count) - { - workers_count = 1; - } - - m_workers.resize(workers_count); - for(auto& worker_ptr : m_workers) - { - worker_ptr.reset(new Worker(options.worker_queue_size)); - } - - for(size_t i = 0; i < m_workers.size(); ++i) - { - Worker* steal_donor = m_workers[(i + 1) % m_workers.size()].get(); - m_workers[i]->start(i, steal_donor); - } + workers_count = std::thread::hardware_concurrency(); } - template - inline ThreadPool::~ThreadPool() + if(0 == workers_count) { - for(auto& worker_ptr : m_workers) - { - worker_ptr->stop(); - } + workers_count = 1; } - template - template - inline bool ThreadPool::try_post(Handler&& handler) + m_workers.resize(workers_count); + for(auto& worker_ptr : m_workers) { - return getWorker().post(std::forward(handler)); + worker_ptr.reset(new Worker(options.worker_queue_size)); } - template - template - inline void ThreadPool::post(Handler&& handler) + for(size_t i = 0; i < m_workers.size(); ++i) + { + Worker* steal_donor = + m_workers[(i + 1) % m_workers.size()].get(); + m_workers[i]->start(i, steal_donor); + } +} + +template +inline ThreadPoolImpl::ThreadPoolImpl(ThreadPoolImpl&& rhs) noexcept +{ + *this = rhs; +} + +template +inline ThreadPoolImpl::~ThreadPoolImpl() +{ + for (auto& worker_ptr : m_workers) + { + worker_ptr->stop(); + } +} + +template +inline ThreadPoolImpl& +ThreadPoolImpl::operator=(ThreadPoolImpl&& rhs) noexcept +{ + if (this != &rhs) { - const auto ok = try_post(std::forward(handler)); - assert(ok); + m_workers = std::move(rhs.m_workers); + m_next_worker = rhs.m_next_worker.load(); } + return *this; +} + +template +template +inline bool ThreadPoolImpl::tryPost(Handler&& handler) +{ + return getWorker().post(std::forward(handler)); +} - template - inline Worker& ThreadPool::getWorker() +template +template +inline void ThreadPoolImpl::post(Handler&& handler) +{ + const auto ok = tryPost(std::forward(handler)); + if (!ok) { - auto id = Worker::getWorkerIdForCurrentThread(); + throw std::runtime_error("thread pool queue is full"); + } +} - if(id > m_workers.size()) - { - id = m_next_worker.fetch_add(1, std::memory_order_relaxed) % - m_workers.size(); - } +template +inline Worker& ThreadPoolImpl::getWorker() +{ + auto id = Worker::getWorkerIdForCurrentThread(); - return *m_workers[id]; + if (id > m_workers.size()) + { + id = m_next_worker.fetch_add(1, std::memory_order_relaxed) % + m_workers.size(); } + + return *m_workers[id]; +} } diff --git a/include/thread_pool/worker.hpp b/include/thread_pool/worker.hpp index 7b6a6a43..ab2e8117 100644 --- a/include/thread_pool/worker.hpp +++ b/include/thread_pool/worker.hpp @@ -1,163 +1,184 @@ #pragma once +#include +#include + #include #include -#include "./fixed_function.hpp" -#include "./mpsc_bounded_queue.hpp" namespace tp { + +/** + * @brief The Worker class owns task queue and executing thread. + * In executing thread it tries to pop task from queue. If queue is empty + * then it tries to steal task from the sibling worker. If stealing was + * unsuccessful + * then spins with one millisecond delay. + */ +template +class Worker +{ +public: + using Task = FixedFunction; + /** - * @brief The Worker class owns task queue and executing thread. - * In executing thread it tries to pop task from queue. If queue is empty - * then it tries to steal task from the sibling worker. If stealing was - * unsuccessful - * then spins with one millisecond delay. + * @brief Worker Constructor. + * @param queue_size Length of undelaying task queue. */ - class Worker - { - public: - using Task = FixedFunction; - - /** - * @brief Worker Constructor. - * @param queue_size Length of undelaying task queue. - */ - explicit Worker(size_t queue_size); - - /** - * @brief start Create the executing thread and start tasks execution. - * @param id Worker ID. - * @param steal_donor Sibling worker to steal task from it. - */ - void start(size_t id, Worker* steal_donor); - - /** - * @brief stop Stop all worker's thread and stealing activity. - * Waits until the executing thread became finished. - */ - void stop(); - - /** - * @brief post Post task to queue. - * @param handler Handler to be executed in executing thread. - * @return true on success. - */ - template - bool post(Handler&& handler); - - /** - * @brief steal Steal one task from this worker queue. - * @param task Place for stealed task to be stored. - * @return true on success. - */ - bool steal(Task& task); - - /** - * @brief getWorkerIdForCurrentThread Return worker ID associated with - * current thread if exists. - * @return Worker ID. - */ - static size_t getWorkerIdForCurrentThread(); - - private: - Worker(const Worker&) = delete; - Worker& operator=(const Worker&) = delete; - - public: - Worker(Worker&& rhs) - : m_queue(std::move(rhs.m_queue)), - m_running_flag(rhs.m_running_flag.load()), - m_thread(std::move(rhs.m_thread)) - { - } + explicit Worker(size_t queue_size); - Worker& operator=(Worker&& rhs) - { - m_queue = std::move(rhs.m_queue); - m_running_flag = rhs.m_running_flag.load(); - m_thread = std::move(rhs.m_thread); + /** + * @brief Move ctor implementation. + */ + Worker(Worker&& rhs) noexcept; - return *this; - } + /** + * @brief Move assignment implementaion. + */ + Worker& operator=(Worker&& rhs) noexcept; - private: - /** - * @brief threadFunc Executing thread function. - * @param id Worker ID to be associated with this thread. - * @param steal_donor Sibling worker to steal task from it. - */ - void threadFunc(size_t id, Worker* steal_donor); + /** + * @brief start Create the executing thread and start tasks execution. + * @param id Worker ID. + * @param steal_donor Sibling worker to steal task from it. + */ + void start(size_t id, Worker* steal_donor); - MPMCBoundedQueue m_queue; - std::atomic m_running_flag; - std::thread m_thread; - }; + /** + * @brief stop Stop all worker's thread and stealing activity. + * Waits until the executing thread became finished. + */ + void stop(); + /** + * @brief post Post task to queue. + * @param handler Handler to be executed in executing thread. + * @return true on success. + */ + template + bool post(Handler&& handler); - /// Implementation + /** + * @brief steal Steal one task from this worker queue. + * @param task Place for stealed task to be stored. + * @return true on success. + */ + bool steal(Task& task); - namespace detail - { - inline size_t* thread_id() - { - static thread_local size_t tss_id = -1u; - return &tss_id; - } - } + /** + * @brief getWorkerIdForCurrentThread Return worker ID associated with + * current thread if exists. + * @return Worker ID. + */ + static size_t getWorkerIdForCurrentThread(); - inline Worker::Worker(size_t queue_size) - : m_queue(queue_size), m_running_flag(true) - { - } +private: + /** + * @brief threadFunc Executing thread function. + * @param id Worker ID to be associated with this thread. + * @param steal_donor Sibling worker to steal task from it. + */ + void threadFunc(size_t id, Worker* steal_donor); - inline void Worker::stop() - { - m_running_flag.store(false, std::memory_order_relaxed); - m_thread.join(); - } + MPMCBoundedQueue m_queue; + std::atomic m_running_flag; + std::thread m_thread; +}; - inline void Worker::start(size_t id, Worker* steal_donor) - { - m_thread = std::thread(&Worker::threadFunc, this, id, steal_donor); - } - inline size_t Worker::getWorkerIdForCurrentThread() - { - return *detail::thread_id(); - } +/// Implementation - template - inline bool Worker::post(Handler&& handler) +namespace detail +{ + inline size_t* thread_id() { - return m_queue.push(std::forward(handler)); + static thread_local size_t tss_id = -1u; + return &tss_id; } +} + +template +inline Worker::Worker(size_t queue_size) + : m_queue(queue_size), m_running_flag(true) +{ +} + +template +inline Worker::Worker(Worker&& rhs) noexcept +{ + *this = rhs; +} - inline bool Worker::steal(Task& task) +template +inline Worker& Worker::operator=(Worker&& rhs) noexcept +{ + if (this != &rhs) { - return m_queue.pop(task); + m_queue = std::move(rhs.m_queue); + m_running_flag = rhs.m_running_flag.load(); + m_thread = std::move(rhs.m_thread); } + return *this; +} - inline void Worker::threadFunc(size_t id, Worker* steal_donor) - { - *detail::thread_id() = id; +template +inline void Worker::stop() +{ + m_running_flag.store(false, std::memory_order_relaxed); + m_thread.join(); +} + +template +inline void Worker::start(size_t id, Worker* steal_donor) +{ + m_thread = std::thread(&Worker::threadFunc, this, id, steal_donor); +} - Task handler; +template +inline size_t Worker::getWorkerIdForCurrentThread() +{ + return *detail::thread_id(); +} + +template +template +inline bool Worker::post(Handler&& handler) +{ + return m_queue.push(std::forward(handler)); +} + +template +inline bool Worker::steal(Task& task) +{ + return m_queue.pop(task); +} + +template +inline void Worker::threadFunc(size_t id, Worker* steal_donor) +{ + *detail::thread_id() = id; + + Task handler; - while(m_running_flag.load(std::memory_order_relaxed)) - if(m_queue.pop(handler) || steal_donor->steal(handler)) + while (m_running_flag.load(std::memory_order_relaxed)) + { + if (m_queue.pop(handler) || steal_donor->steal(handler)) + { + try { - try - { - handler(); - } - catch(...) - { - } + handler(); } - else + catch(...) { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + // supress all exceptions } + } + else + { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } } } + +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index a1a62930..488787e4 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -1,7 +1,6 @@ #tests -include_directories("${THREAD_POOL_CPP_INC_DIR}") -include_directories("${CMAKE_CURRENT_SOURCE_DIR}") +include_directories(${CMAKE_CURRENT_SOURCE_DIR}) add_executable(fixed_function_test fixed_function.t.cpp) diff --git a/tests/thread_pool.t.cpp b/tests/thread_pool.t.cpp index e94aa6df..1dc1c85e 100644 --- a/tests/thread_pool.t.cpp +++ b/tests/thread_pool.t.cpp @@ -8,15 +8,13 @@ using namespace tp; -using ThreadPoolStd = ThreadPool<>; - int main() { std::cout << "*** Testing TP ***" << std::endl; doTest("post job", []() { - ThreadPoolStd pool; + ThreadPool pool; std::packaged_task t([]() { @@ -31,37 +29,4 @@ int main() ASSERT(42 == r.get()); }); - try { - ASSERT(r.get() == 42 && !"should not be called, exception expected"); - } catch (const my_exception &e) { - } - }); - - doTest("post job to threadpool with onStart/onStop", []() { - std::atomic someValue{0}; - ThreadPoolOptions options; - options.onStart = [&someValue](){ ++someValue; }; - options.onStop = [&someValue](){ --someValue; }; - - if (true) { - ThreadPool pool{options}; - - std::packaged_task t([&someValue](){ - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - return someValue.load(); - }); - - std::future r = t.get_future(); - - pool.post(t); - - const auto result = r.get(); - - ASSERT(0 < result); - ASSERT(pool.getWorkerCount() == result); - } - - ASSERT(0 == someValue); - }); - } diff --git a/tests/thread_pool2.t.cpp b/tests/thread_pool2.t.cpp index ac45eaed..c3b094f4 100644 --- a/tests/thread_pool2.t.cpp +++ b/tests/thread_pool2.t.cpp @@ -9,5 +9,5 @@ size_t getWorkerIdForCurrentThread() size_t getWorkerIdForCurrentThread2() { - return Worker::getWorkerIdForCurrentThread(); + return Worker<128>::getWorkerIdForCurrentThread(); }