Skip to content

Commit

Permalink
Merge remote-tracking branch 'to_merge1/master'
Browse files Browse the repository at this point in the history
Conflicts:
	tests/thread_pool.t.cpp
	thread_pool/fixed_function.hpp
	thread_pool/thread_pool.hpp
	thread_pool/worker.hpp
  • Loading branch information
inkooboo committed May 16, 2017
2 parents dd570aa + d491809 commit 5d120de
Show file tree
Hide file tree
Showing 4 changed files with 338 additions and 3 deletions.
39 changes: 36 additions & 3 deletions tests/thread_pool.t.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
#include <thread_pool/thread_pool.hpp>
#include <test.hpp>

#include <thread>
#include <future>
#include <functional>
#include <sstream>
#include <thread>
#include <tuple>
#include <memory>

using namespace tp;

Expand All @@ -31,4 +30,38 @@ int main()

ASSERT(42 == r.get());
});

try {
ASSERT(r.get() == 42 && !"should not be called, exception expected");
} catch (const my_exception &e) {
}
});

doTest("post job to threadpool with onStart/onStop", []() {
std::atomic<int> someValue{0};
ThreadPoolOptions options;
options.onStart = [&someValue](){ ++someValue; };
options.onStop = [&someValue](){ --someValue; };

if (true) {
ThreadPool pool{options};

std::packaged_task<int()> t([&someValue](){
std::this_thread::sleep_for(std::chrono::milliseconds(1));
return someValue.load();
});

std::future<int> r = t.get_future();

pool.post(t);

const auto result = r.get();

ASSERT(0 < result);
ASSERT(pool.getWorkerCount() == result);
}

ASSERT(0 == someValue);
});

}
146 changes: 146 additions & 0 deletions thread_pool/fixed_function.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
#ifndef FIXED_FUNCTION_HPP
#define FIXED_FUNCTION_HPP

#include <type_traits>
#include <cstring>
#include <stdexcept>
#include <utility>

/**
* @brief The FixedFunction<R(ARGS...), STORAGE_SIZE> class implements functional object.
* This function is analog of 'std::function' with limited capabilities:
* - It supports only move semantics.
* - The size of functional objects is limited to storage size.
* Due to limitations above it is much faster on creation and copying than std::function.
*/
template <typename SIGNATURE, size_t STORAGE_SIZE = 64>
class FixedFunction;

template <typename R, typename... ARGS, size_t STORAGE_SIZE>
class FixedFunction<R(ARGS...), STORAGE_SIZE> {

typedef R (*func_ptr_type)(ARGS...);

public:
FixedFunction()
: m_function_ptr(nullptr)
, m_method_ptr(nullptr)
, m_alloc_ptr(nullptr)
{
}

/**
* @brief FixedFunction Constructor from functional object.
* @param object Functor object will be stored in the internal storage
* using move constructor. Unmovable objects are prohibited explicitly.
*/
template <typename FUNC>
FixedFunction(FUNC &&object)
: FixedFunction()
{
typedef typename std::remove_reference<FUNC>::type unref_type;

static_assert(sizeof(unref_type) < STORAGE_SIZE,
"functional object doesn't fit into internal storage");

static_assert(std::is_move_constructible<unref_type>::value, "Should be of movable type");

m_method_ptr = [](void *object_ptr, func_ptr_type, ARGS... args) -> R {
return static_cast<unref_type *>(object_ptr)->operator()(args...);
};

m_alloc_ptr = [](void *storage_ptr, void *object_ptr) {
if (object_ptr) {
unref_type *object = static_cast<unref_type *>(object_ptr);
new (storage_ptr) unref_type(std::move(*object));
} else {
static_cast<unref_type *>(storage_ptr)->~unref_type();
}
};

m_alloc_ptr(&m_storage, &object);
}

/**
* @brief FixedFunction Constructor from free function or static member.
*/
template <typename RET, typename... PARAMS>
FixedFunction(RET(*func_ptr)(PARAMS...))
: FixedFunction()
{
m_function_ptr = func_ptr;
m_method_ptr = [](void *, func_ptr_type f_ptr, ARGS... args) -> R {
return static_cast<RET(*)(PARAMS...)>(f_ptr)(args...);
};
}

FixedFunction(FixedFunction &&o)
: FixedFunction()
{
moveFromOther(o);
}

FixedFunction & operator=(FixedFunction &&o)
{
moveFromOther(o);
return *this;
}

~FixedFunction()
{
if (m_alloc_ptr)
m_alloc_ptr(&m_storage, nullptr);
}

/**
* @brief operator () Execute stored functional object.
* @throws std::runtime_error if no functional object is stored.
*/
R operator()(ARGS... args)
{
if (!m_method_ptr)
throw std::runtime_error("call of empty functor");
return m_method_ptr(&m_storage, m_function_ptr, args...);
}

private:
FixedFunction & operator=(const FixedFunction &) = delete;
FixedFunction(const FixedFunction &) = delete;

union {
typename std::aligned_storage<STORAGE_SIZE, sizeof(size_t)>::type m_storage;
func_ptr_type m_function_ptr;
};

typedef R(*method_type)(void *object_ptr, func_ptr_type free_func_ptr, ARGS... args);
method_type m_method_ptr;

typedef void(*alloc_type)(void *storage_ptr, void *object_ptr);
alloc_type m_alloc_ptr;

void moveFromOther(FixedFunction &o)
{
if (this == &o)
return;

if (m_alloc_ptr) {
m_alloc_ptr(&m_storage, nullptr);
m_alloc_ptr = nullptr;
} else {
m_function_ptr = nullptr;
}

m_method_ptr = o.m_method_ptr;
o.m_method_ptr = nullptr;

if (o.m_alloc_ptr) {
m_alloc_ptr = o.m_alloc_ptr;
m_alloc_ptr(&m_storage, &o.m_storage);
} else {
m_function_ptr = o.m_function_ptr;
}
}
};


#endif
149 changes: 149 additions & 0 deletions thread_pool/thread_pool.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
#ifndef THREAD_POOL_HPP
#define THREAD_POOL_HPP

#include <worker.hpp>
#include <atomic>
#include <stdexcept>
#include <memory>
#include <vector>
#include <future>
#include <type_traits>

/**
* @brief The ThreadPoolOptions struct provides construction options for ThreadPool.
*/
struct ThreadPoolOptions {

size_t threads_count{std::thread::hardware_concurrency()};
size_t worker_queue_size = 1024;
Worker<>::OnStart onStart;
Worker<>::OnStop onStop;
};

/**
* @brief The ThreadPool class implements thread pool pattern.
* It is highly scalable and fast.
* It is header only.
* It implements both work-stealing and work-distribution balancing startegies.
* It implements cooperative scheduling strategy for tasks.
*/

template <size_t STORAGE_SIZE = 128>
class ThreadPool {
public:

typedef Worker<STORAGE_SIZE> FixedWorker;

/**
* @brief ThreadPool Construct and start new thread pool.
* @param options Creation options.
*/
explicit ThreadPool(const ThreadPoolOptions &options = ThreadPoolOptions());

/**
* @brief ~ThreadPool Stop all workers and destroy thread pool.
*/
~ThreadPool();

/**
* @brief post Post piece of job to thread pool.
* @param handler Handler to be called from thread pool worker. It has to be callable as 'handler()'.
* @throws std::overflow_error if worker's queue is full.
* @note All exceptions thrown by handler will be suppressed. Use 'process()' to get result of handler's
* execution or exception thrown.
*/
template <typename Handler>
void post(Handler &&handler)
{
if (!getWorker().post(std::forward<Handler>(handler))) {
throw std::overflow_error("worker queue is full");
}
}

/**
* @brief process Post piece of job to thread pool and get future for this job.
* @param handler Handler to be called from thread pool worker. It has to be callable as 'handler()'.
* @return Future which hold handler result or exception thrown.
* @throws std::overflow_error if worker's queue is full.
* @note This method of posting job to thread pool is much slower than 'post()' due to std::future and
* std::packaged_task construction overhead.
*/
template <typename Handler, typename R = typename std::result_of<Handler()>::type>
typename std::future<R> process(Handler &&handler)
{
std::packaged_task<R()> task([handler = std::move(handler)] () {
return handler();
});

std::future<R> result = task.get_future();

if (!getWorker().post(task)) {
throw std::overflow_error("worker queue is full");
}

return result;
}

/**
* @brief getWorkerCount Returns the number of workers created by the thread pool
* @return Worker ID.
*/
size_t getWorkerCount() const;

private:
ThreadPool(const ThreadPool&) = delete;
ThreadPool & operator=(const ThreadPool&) = delete;

FixedWorker & getWorker();

std::vector<std::unique_ptr<FixedWorker>> m_workers;
std::atomic<size_t> m_next_worker;
};


/// Implementation

template <size_t STORAGE_SIZE>
inline ThreadPool<STORAGE_SIZE>::ThreadPool(const ThreadPoolOptions &options)
: m_next_worker(0)
{
size_t workers_count = options.threads_count;

if (0 == workers_count) {
workers_count = 1;
}

m_workers.resize(workers_count);
for (auto &worker_ptr : m_workers) {
worker_ptr.reset(new FixedWorker(options.worker_queue_size));
}

for (size_t i = 0; i < m_workers.size(); ++i) {
FixedWorker *steal_donor = m_workers[(i + 1) % m_workers.size()].get();
m_workers[i]->start(i, steal_donor, options.onStart, options.onStop);
}
}

template <size_t STORAGE_SIZE>
inline ThreadPool<STORAGE_SIZE>::~ThreadPool()
{
for (auto &worker_ptr : m_workers) {
worker_ptr->stop();
}
}

template <size_t STORAGE_SIZE>
inline typename ThreadPool<STORAGE_SIZE>::FixedWorker& ThreadPool<STORAGE_SIZE>::getWorker()
{
size_t id = FixedWorker::getWorkerIdForCurrentThread();

if (id > m_workers.size()) {
id = m_next_worker.fetch_add(1, std::memory_order_relaxed) % m_workers.size();
}

return *m_workers[id];
}


#endif

7 changes: 7 additions & 0 deletions thread_pool/worker.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#include <worker.hpp>

size_t * Worker::thread_id()
{
static thread_local size_t tss_id = -1u;
return &tss_id;
}

0 comments on commit 5d120de

Please sign in to comment.