diff --git a/muduo/net/EventLoopThreadPool.cc b/muduo/net/EventLoopThreadPool.cc index 1ee594954..8b4617f9f 100644 --- a/muduo/net/EventLoopThreadPool.cc +++ b/muduo/net/EventLoopThreadPool.cc @@ -20,8 +20,7 @@ EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop, const string& name : baseLoop_(baseLoop), name_(nameArg), started_(false), - numThreads_(0), - next_(0) + numThreads_(0) { } @@ -60,12 +59,8 @@ EventLoop* EventLoopThreadPool::getNextLoop() if (!loops_.empty()) { // round-robin - loop = loops_[next_]; - ++next_; - if (implicit_cast(next_) >= loops_.size()) - { - next_ = 0; - } + int64_t next = next_.getAndAdd(1); + loop = loops_[implicit_cast(next) % loops_.size()]; } return loop; } diff --git a/muduo/net/EventLoopThreadPool.h b/muduo/net/EventLoopThreadPool.h index 738914875..b0a4e545e 100644 --- a/muduo/net/EventLoopThreadPool.h +++ b/muduo/net/EventLoopThreadPool.h @@ -13,6 +13,7 @@ #include "muduo/base/noncopyable.h" #include "muduo/base/Types.h" +#include "muduo/base/Atomic.h" #include #include @@ -58,7 +59,7 @@ class EventLoopThreadPool : noncopyable string name_; bool started_; int numThreads_; - int next_; + AtomicInt64 next_; std::vector> threads_; std::vector loops_; }; diff --git a/muduo/net/TcpServer.cc b/muduo/net/TcpServer.cc index e4f8e249d..280b31a7e 100644 --- a/muduo/net/TcpServer.cc +++ b/muduo/net/TcpServer.cc @@ -28,6 +28,7 @@ TcpServer::TcpServer(EventLoop* loop, name_(nameArg), acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)), threadPool_(new EventLoopThreadPool(loop, name_)), + sharedThreadPool_(NULL), connectionCallback_(defaultConnectionCallback), messageCallback_(defaultMessageCallback), nextConnId_(1) @@ -36,6 +37,16 @@ TcpServer::TcpServer(EventLoop* loop, std::bind(&TcpServer::newConnection, this, _1, _2)); } +TcpServer::TcpServer(EventLoop* loop, + const InetAddress& listenAddr, + const string& nameArg, + EventLoopThreadPool* sharedThreadPool, + Option option) + : TcpServer(loop, listenAddr, nameArg, option) +{ + sharedThreadPool_ = CHECK_NOTNULL(sharedThreadPool); +} + TcpServer::~TcpServer() { loop_->assertInLoopThread(); @@ -60,7 +71,15 @@ void TcpServer::start() { if (started_.getAndSet(1) == 0) { - threadPool_->start(threadInitCallback_); + if (sharedThreadPool_ != NULL) + { + // the owner must start this ThreadPool before starting TcpServer + assert(sharedThreadPool_->started()); + } + else + { + threadPool_->start(threadInitCallback_); + } assert(!acceptor_->listening()); loop_->runInLoop( @@ -71,7 +90,7 @@ void TcpServer::start() void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) { loop_->assertInLoopThread(); - EventLoop* ioLoop = threadPool_->getNextLoop(); + EventLoop* ioLoop = sharedThreadPool_ != NULL ? sharedThreadPool_->getNextLoop() : threadPool_->getNextLoop(); char buf[64]; snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_); ++nextConnId_; diff --git a/muduo/net/TcpServer.h b/muduo/net/TcpServer.h index 3fbfead5c..1552faa71 100644 --- a/muduo/net/TcpServer.h +++ b/muduo/net/TcpServer.h @@ -45,6 +45,11 @@ class TcpServer : noncopyable const InetAddress& listenAddr, const string& nameArg, Option option = kNoReusePort); + TcpServer(EventLoop* loop, + const InetAddress& listenAddr, + const string& nameArg, + EventLoopThreadPool* sharedThreadPool, + Option option = kNoReusePort); ~TcpServer(); // force out-line dtor, for std::unique_ptr members. const string& ipPort() const { return ipPort_; } @@ -104,6 +109,7 @@ class TcpServer : noncopyable const string name_; std::unique_ptr acceptor_; // avoid revealing Acceptor std::shared_ptr threadPool_; + EventLoopThreadPool* sharedThreadPool_; ConnectionCallback connectionCallback_; MessageCallback messageCallback_; WriteCompleteCallback writeCompleteCallback_;