Skip to content

Commit

Permalink
Merge pull request #4 from headupinclouds/pr.template.storage.size
Browse files Browse the repository at this point in the history
Pr.template.storage.size
  • Loading branch information
headupinclouds committed Feb 3, 2016
2 parents 6509fce + 01a9a5d commit d491809
Show file tree
Hide file tree
Showing 12 changed files with 550 additions and 398 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ project(thread-pool-cpp)
cmake_minimum_required(VERSION 2.8)

ADD_DEFINITIONS(
-std=c++14 -Wall -Werror -O3
-std=c++1y -Wall -Werror -O3
)

include_directories(${CMAKE_CURRENT_SOURCE_DIR}/thread_pool)
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@ Post job to thread pool is much faster than for boost::asio based thread pool.

See benchmark/benchmark.cpp for benchmark code.

All code except [MPMCBoundedQueue](https://github.com/inkooboo/thread-pool-cpp/blob/master/thread_pool/mpsc_bounded_queue.hpp)
is under MIT license.

164 changes: 82 additions & 82 deletions benchmark/asio_thread_pool.hpp
Original file line number Diff line number Diff line change
@@ -1,82 +1,82 @@
#ifndef ASIO_THREAD_POOL_HPP
#define ASIO_THREAD_POOL_HPP

#include <boost/asio.hpp>

#include <functional>
#include <thread>
#include <vector>
#include <memory>

class AsioThreadPool
{
public:
inline AsioThreadPool(size_t threads);

inline ~AsioThreadPool()
{
stop();
}

inline void joinThreadPool();

template <typename Handler>
inline void post(Handler &&handler)
{
m_io_svc.post(handler);
}

private:
inline void start();
inline void stop();
inline void worker_thread_func();

boost::asio::io_service m_io_svc;
std::unique_ptr<boost::asio::io_service::work> m_work;

std::vector<std::thread> m_threads;
};

inline AsioThreadPool::AsioThreadPool(size_t threads)
: m_threads(threads)
{
start();
}

inline void AsioThreadPool::start()
{
m_work.reset(new boost::asio::io_service::work(m_io_svc));

for (auto &i : m_threads)
{
i = std::thread(&AsioThreadPool::worker_thread_func, this);
}

}

inline void AsioThreadPool::stop()
{
m_work.reset();

m_io_svc.stop();

for (auto &i : m_threads)
{
if (i.joinable())
{
i.join();
}
}
}

inline void AsioThreadPool::joinThreadPool()
{
m_io_svc.run();
}

inline void AsioThreadPool::worker_thread_func()
{
joinThreadPool();
}

#endif
#ifndef ASIO_THREAD_POOL_HPP
#define ASIO_THREAD_POOL_HPP

#include <boost/asio.hpp>

#include <functional>
#include <thread>
#include <vector>
#include <memory>

class AsioThreadPool
{
public:
inline AsioThreadPool(size_t threads);

inline ~AsioThreadPool()
{
stop();
}

inline void joinThreadPool();

template <typename Handler>
inline void post(Handler &&handler)
{
m_io_svc.post(handler);
}

private:
inline void start();
inline void stop();
inline void worker_thread_func();

boost::asio::io_service m_io_svc;
std::unique_ptr<boost::asio::io_service::work> m_work;

std::vector<std::thread> m_threads;
};

inline AsioThreadPool::AsioThreadPool(size_t threads)
: m_threads(threads)
{
start();
}

inline void AsioThreadPool::start()
{
m_work.reset(new boost::asio::io_service::work(m_io_svc));

for (auto &i : m_threads)
{
i = std::thread(&AsioThreadPool::worker_thread_func, this);
}

}

inline void AsioThreadPool::stop()
{
m_work.reset();

m_io_svc.stop();

for (auto &i : m_threads)
{
if (i.joinable())
{
i.join();
}
}
}

inline void AsioThreadPool::joinThreadPool()
{
m_io_svc.run();
}

inline void AsioThreadPool::worker_thread_func()
{
joinThreadPool();
}

#endif
2 changes: 1 addition & 1 deletion tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ add_custom_command(
COMMAND ./fixed_function_test
)

add_executable(thread_pool_test thread_pool.t.cpp)
add_executable(thread_pool_test thread_pool.t.cpp thread_pool2.t.cpp)
target_link_libraries(thread_pool_test pthread)
add_custom_command(
TARGET thread_pool_test
Expand Down
35 changes: 27 additions & 8 deletions tests/fixed_function.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ void print_overhead() {
<< " overhead is " << float(f_s - t_s)/t_s * 100 << "%\n";
}

static std::string str_fun() {
return "123";
}

int main()
{
Expand All @@ -63,37 +66,53 @@ int main()
print_overhead<char[64]>();
print_overhead<char[128]>();

doTest("alloc/dealloc", [](){
doTest("alloc/dealloc", []() {
static size_t def = 0;
static size_t cop = 0;
static size_t mov = 0;
static size_t cop_ass = 0;
static size_t mov_ass = 0;
static size_t destroyed = 0;
struct cnt {
std::string payload = "xyz";
std::string payload;
cnt() { def++; }
cnt(const cnt&) { cop++; }
cnt(cnt&&) { mov++; }
cnt & operator=(const cnt&) { cop_ass++; return *this; }
cnt & operator=(cnt&&) { mov_ass++; return *this; }
cnt(const cnt &o) { payload = o.payload; cop++;}
cnt(cnt &&o) { payload = std::move(o.payload); mov++;}
cnt & operator=(const cnt &o) { payload = o.payload; cop_ass++; return *this; }
cnt & operator=(cnt &&o) { payload = std::move(o.payload); mov_ass++; return *this; }
~cnt() { destroyed++; }
std::string operator()() { return payload; }
};

{
cnt c1;
c1.payload = "xyz";
FixedFunction<std::string()> f1(c1);
ASSERT(std::string("xyz") == f1());

FixedFunction<std::string()> f2;
f2 = std::move(f1);
ASSERT(std::string("xyz") == f2());

FixedFunction<std::string()> f3(std::move(f2));
ASSERT(std::string("xyz") == f3());

FixedFunction<std::string()> f4(str_fun);
ASSERT(std::string("123") == f4());

f4 = std::move(f3);
ASSERT(std::string("xyz") == f4());

cnt c2;
c2.payload = "qwe";
f4 = std::move(FixedFunction<std::string()>(c2));
ASSERT(std::string("qwe") == f4());
}

ASSERT(def + cop + mov == destroyed);
ASSERT(1 == def);
ASSERT(2 == def);
ASSERT(0 == cop);
ASSERT(3 == mov);
ASSERT(6 == mov);
ASSERT(0 == cop_ass);
ASSERT(0 == mov_ass);
});
Expand Down
28 changes: 28 additions & 0 deletions tests/thread_pool.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <thread>
#include <future>
#include <functional>
#include <memory>

int main() {
std::cout << "*** Testing ThreadPool ***" << std::endl;
Expand Down Expand Up @@ -50,5 +51,32 @@ int main() {
} catch (const my_exception &e) {
}
});

doTest("post job to threadpool with onStart/onStop", []() {
std::atomic<int> someValue{0};
ThreadPoolOptions options;
options.onStart = [&someValue](){ ++someValue; };
options.onStop = [&someValue](){ --someValue; };

if (true) {
ThreadPool pool{options};

std::packaged_task<int()> t([&someValue](){
std::this_thread::sleep_for(std::chrono::milliseconds(1));
return someValue.load();
});

std::future<int> r = t.get_future();

pool.post(t);

const auto result = r.get();

ASSERT(0 < result);
ASSERT(pool.getWorkerCount() == result);
}

ASSERT(0 == someValue);
});

}
11 changes: 11 additions & 0 deletions tests/thread_pool2.t.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#include <worker.hpp>

size_t getWorkerIdForCurrentThread()
{
return *detail::thread_id();
}

size_t getWorkerIdForCurrentThread2()
{
return Worker::getWorkerIdForCurrentThread();
}
Loading

0 comments on commit d491809

Please sign in to comment.