- 
                Notifications
    You must be signed in to change notification settings 
- Fork 576
          feat: replace worker connection's mutex with tbb::concurrent_hash_map
          #2461
        
          New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: unstable
Are you sure you want to change the base?
Changes from 15 commits
3d495ef
              f6192d4
              4416da7
              fe89194
              1104241
              effbcf9
              c0d9efb
              3c4205a
              c17ae9a
              9f1b707
              8385b41
              14a0deb
              f4bbded
              797c9d9
              4eff1e8
              2027b03
              e3d992d
              49d6792
              8011d59
              20678a5
              b17dcec
              0f3f8bd
              afccc88
              9714558
              1665dab
              65648bf
              261812d
              0b64679
              ee88850
              1484808
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|  | @@ -47,6 +47,8 @@ | |||||||||||||||||||||||||||||||||||
| #include <list> | ||||||||||||||||||||||||||||||||||||
| #include <utility> | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| #include "oneapi/tbb/parallel_for.h" | ||||||||||||||||||||||||||||||||||||
| #include "oneapi/tbb/parallel_reduce.h" | ||||||||||||||||||||||||||||||||||||
| #include "redis_connection.h" | ||||||||||||||||||||||||||||||||||||
| #include "redis_request.h" | ||||||||||||||||||||||||||||||||||||
| #include "server.h" | ||||||||||||||||||||||||||||||||||||
|  | @@ -79,14 +81,15 @@ Worker::~Worker() { | |||||||||||||||||||||||||||||||||||
| std::vector<redis::Connection *> conns; | ||||||||||||||||||||||||||||||||||||
| conns.reserve(conns_.size() + monitor_conns_.size()); | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| for (const auto &iter : conns_) { | ||||||||||||||||||||||||||||||||||||
| conns.emplace_back(iter.second); | ||||||||||||||||||||||||||||||||||||
| for (const auto &[fd, conn] : conns_) { | ||||||||||||||||||||||||||||||||||||
| conns.emplace_back(conn); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| for (const auto &iter : monitor_conns_) { | ||||||||||||||||||||||||||||||||||||
| conns.emplace_back(iter.second); | ||||||||||||||||||||||||||||||||||||
| for (const auto &[fd, conn] : monitor_conns_) { | ||||||||||||||||||||||||||||||||||||
| conns.emplace_back(conn); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| for (const auto &iter : conns) { | ||||||||||||||||||||||||||||||||||||
| iter->Close(); | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| for (auto conn : conns) { | ||||||||||||||||||||||||||||||||||||
| conn->Close(); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| timer_.reset(); | ||||||||||||||||||||||||||||||||||||
|  | @@ -311,9 +314,7 @@ void Worker::Stop(uint32_t wait_seconds) { | |||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| Status Worker::AddConnection(redis::Connection *c) { | ||||||||||||||||||||||||||||||||||||
| std::unique_lock<std::mutex> lock(conns_mu_); | ||||||||||||||||||||||||||||||||||||
| auto iter = conns_.find(c->GetFD()); | ||||||||||||||||||||||||||||||||||||
| if (iter != conns_.end()) { | ||||||||||||||||||||||||||||||||||||
| if (ConnMap::const_accessor accessor; conns_.find(accessor, c->GetFD())) { | ||||||||||||||||||||||||||||||||||||
| return {Status::NotOK, "connection was exists"}; | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
|  | @@ -323,7 +324,8 @@ Status Worker::AddConnection(redis::Connection *c) { | |||||||||||||||||||||||||||||||||||
| return {Status::NotOK, "max number of clients reached"}; | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| conns_.emplace(c->GetFD(), c); | ||||||||||||||||||||||||||||||||||||
| ConnMap::accessor accessor; | ||||||||||||||||||||||||||||||||||||
| conns_.insert(accessor, std::make_pair(c->GetFD(), c)); | ||||||||||||||||||||||||||||||||||||
| uint64_t id = srv->GetClientID(); | ||||||||||||||||||||||||||||||||||||
| c->SetID(id); | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
|  | @@ -333,18 +335,17 @@ Status Worker::AddConnection(redis::Connection *c) { | |||||||||||||||||||||||||||||||||||
| redis::Connection *Worker::removeConnection(int fd) { | ||||||||||||||||||||||||||||||||||||
| redis::Connection *conn = nullptr; | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| std::unique_lock<std::mutex> lock(conns_mu_); | ||||||||||||||||||||||||||||||||||||
| auto iter = conns_.find(fd); | ||||||||||||||||||||||||||||||||||||
| if (iter != conns_.end()) { | ||||||||||||||||||||||||||||||||||||
| conn = iter->second; | ||||||||||||||||||||||||||||||||||||
| conns_.erase(iter); | ||||||||||||||||||||||||||||||||||||
| if (ConnMap::accessor accessor; conns_.find(accessor, fd)) { | ||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||
| conn = accessor->second; | ||||||||||||||||||||||||||||||||||||
| conns_.erase(accessor); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| srv->DecrClientNum(); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| iter = monitor_conns_.find(fd); | ||||||||||||||||||||||||||||||||||||
| if (iter != monitor_conns_.end()) { | ||||||||||||||||||||||||||||||||||||
| conn = iter->second; | ||||||||||||||||||||||||||||||||||||
| monitor_conns_.erase(iter); | ||||||||||||||||||||||||||||||||||||
| if (ConnMap::accessor accessor; monitor_conns_.find(accessor, fd)) { | ||||||||||||||||||||||||||||||||||||
| conn = accessor->second; | ||||||||||||||||||||||||||||||||||||
| monitor_conns_.erase(accessor); | ||||||||||||||||||||||||||||||||||||
| srv->DecrClientNum(); | ||||||||||||||||||||||||||||||||||||
| srv->DecrMonitorClientNum(); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|  | @@ -409,31 +410,27 @@ void Worker::FreeConnection(redis::Connection *conn) { | |||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| void Worker::FreeConnectionByID(int fd, uint64_t id) { | ||||||||||||||||||||||||||||||||||||
| std::unique_lock<std::mutex> lock(conns_mu_); | ||||||||||||||||||||||||||||||||||||
| auto iter = conns_.find(fd); | ||||||||||||||||||||||||||||||||||||
| if (iter != conns_.end() && iter->second->GetID() == id) { | ||||||||||||||||||||||||||||||||||||
| if (ConnMap::accessor accessor; conns_.find(accessor, fd)) { | ||||||||||||||||||||||||||||||||||||
| if (rate_limit_group_ != nullptr) { | ||||||||||||||||||||||||||||||||||||
| bufferevent_remove_from_rate_limit_group(iter->second->GetBufferEvent()); | ||||||||||||||||||||||||||||||||||||
| bufferevent_remove_from_rate_limit_group(accessor->second->GetBufferEvent()); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| delete iter->second; | ||||||||||||||||||||||||||||||||||||
| conns_.erase(iter); | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| delete accessor->second; | ||||||||||||||||||||||||||||||||||||
| conns_.erase(accessor); | ||||||||||||||||||||||||||||||||||||
|         
                  mapleFU marked this conversation as resolved.
              Show resolved
            Hide resolved | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| srv->DecrClientNum(); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| iter = monitor_conns_.find(fd); | ||||||||||||||||||||||||||||||||||||
| if (iter != monitor_conns_.end() && iter->second->GetID() == id) { | ||||||||||||||||||||||||||||||||||||
| delete iter->second; | ||||||||||||||||||||||||||||||||||||
| monitor_conns_.erase(iter); | ||||||||||||||||||||||||||||||||||||
| if (ConnMap::accessor accessor; monitor_conns_.find(accessor, fd)) { | ||||||||||||||||||||||||||||||||||||
| delete accessor->second; | ||||||||||||||||||||||||||||||||||||
|         
                  mapleFU marked this conversation as resolved.
              Show resolved
            Hide resolved | ||||||||||||||||||||||||||||||||||||
| monitor_conns_.erase(accessor); | ||||||||||||||||||||||||||||||||||||
| srv->DecrClientNum(); | ||||||||||||||||||||||||||||||||||||
| srv->DecrMonitorClientNum(); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| Status Worker::EnableWriteEvent(int fd) { | ||||||||||||||||||||||||||||||||||||
| std::unique_lock<std::mutex> lock(conns_mu_); | ||||||||||||||||||||||||||||||||||||
| auto iter = conns_.find(fd); | ||||||||||||||||||||||||||||||||||||
| if (iter != conns_.end()) { | ||||||||||||||||||||||||||||||||||||
| auto bev = iter->second->GetBufferEvent(); | ||||||||||||||||||||||||||||||||||||
| if (ConnMap::const_accessor accessor; conns_.find(accessor, fd)) { | ||||||||||||||||||||||||||||||||||||
| auto bev = accessor->second->GetBufferEvent(); | ||||||||||||||||||||||||||||||||||||
| bufferevent_enable(bev, EV_WRITE); | ||||||||||||||||||||||||||||||||||||
| return Status::OK(); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|  | @@ -442,115 +439,146 @@ Status Worker::EnableWriteEvent(int fd) { | |||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| Status Worker::Reply(int fd, const std::string &reply) { | ||||||||||||||||||||||||||||||||||||
| std::unique_lock<std::mutex> lock(conns_mu_); | ||||||||||||||||||||||||||||||||||||
| auto iter = conns_.find(fd); | ||||||||||||||||||||||||||||||||||||
| if (iter != conns_.end()) { | ||||||||||||||||||||||||||||||||||||
| iter->second->SetLastInteraction(); | ||||||||||||||||||||||||||||||||||||
| redis::Reply(iter->second->Output(), reply); | ||||||||||||||||||||||||||||||||||||
| if (ConnMap::accessor accessor; conns_.find(accessor, fd)) { | ||||||||||||||||||||||||||||||||||||
| accessor->second->SetLastInteraction(); | ||||||||||||||||||||||||||||||||||||
| redis::Reply(accessor->second->Output(), reply); | ||||||||||||||||||||||||||||||||||||
| return Status::OK(); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| return {Status::NotOK, "connection doesn't exist"}; | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| void Worker::BecomeMonitorConn(redis::Connection *conn) { | ||||||||||||||||||||||||||||||||||||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @mapleFU , Sorry for delay reply. 
        Suggested change
       
 Best Regards, | ||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||
| std::lock_guard<std::mutex> guard(conns_mu_); | ||||||||||||||||||||||||||||||||||||
| conns_.erase(conn->GetFD()); | ||||||||||||||||||||||||||||||||||||
| monitor_conns_[conn->GetFD()] = conn; | ||||||||||||||||||||||||||||||||||||
| if (ConnMap::accessor accessor; conns_.find(accessor, conn->GetFD())) { | ||||||||||||||||||||||||||||||||||||
|         
                  mapleFU marked this conversation as resolved.
              Show resolved
            Hide resolved | ||||||||||||||||||||||||||||||||||||
| conns_.erase(accessor); | ||||||||||||||||||||||||||||||||||||
| accessor.release(); | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| if (ConnMap::accessor accessor; monitor_conns_.find(accessor, conn->GetFD())) { | ||||||||||||||||||||||||||||||||||||
| accessor->second = conn; | ||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||
| monitor_conns_.insert(accessor, std::make_pair(conn->GetFD(), conn)); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| srv->IncrMonitorClientNum(); | ||||||||||||||||||||||||||||||||||||
| conn->EnableFlag(redis::Connection::kMonitor); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| void Worker::QuitMonitorConn(redis::Connection *conn) { | ||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||
| std::lock_guard<std::mutex> guard(conns_mu_); | ||||||||||||||||||||||||||||||||||||
| monitor_conns_.erase(conn->GetFD()); | ||||||||||||||||||||||||||||||||||||
| conns_[conn->GetFD()] = conn; | ||||||||||||||||||||||||||||||||||||
| if (ConnMap::accessor accessor; monitor_conns_.find(accessor, conn->GetFD())) { | ||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||
| monitor_conns_.erase(accessor); | ||||||||||||||||||||||||||||||||||||
| accessor.release(); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| if (ConnMap::accessor accessor; conns_.find(accessor, conn->GetFD())) { | ||||||||||||||||||||||||||||||||||||
| accessor->second = conn; | ||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||
| conns_.insert(accessor, std::make_pair(conn->GetFD(), conn)); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| srv->DecrMonitorClientNum(); | ||||||||||||||||||||||||||||||||||||
| conn->DisableFlag(redis::Connection::kMonitor); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| void Worker::FeedMonitorConns(redis::Connection *conn, const std::string &response) { | ||||||||||||||||||||||||||||||||||||
| std::unique_lock<std::mutex> lock(conns_mu_); | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| for (const auto &iter : monitor_conns_) { | ||||||||||||||||||||||||||||||||||||
| if (conn == iter.second) continue; // skip the monitor command | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| if (conn->GetNamespace() == iter.second->GetNamespace() || iter.second->GetNamespace() == kDefaultNamespace) { | ||||||||||||||||||||||||||||||||||||
| iter.second->Reply(response); | ||||||||||||||||||||||||||||||||||||
| tbb::parallel_for(monitor_conns_.range(), [conn, response](const ConnMap::range_type &range) { | ||||||||||||||||||||||||||||||||||||
| for (auto &it : range) { | ||||||||||||||||||||||||||||||||||||
| const auto &value = it.second; | ||||||||||||||||||||||||||||||||||||
| if (conn == value) continue; // skip the monitor command | ||||||||||||||||||||||||||||||||||||
| if (conn->GetNamespace() == value->GetNamespace() || value->GetNamespace() == kDefaultNamespace) { | ||||||||||||||||||||||||||||||||||||
| value->Reply(response); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| std::string Worker::GetClientsStr() { | ||||||||||||||||||||||||||||||||||||
| std::unique_lock<std::mutex> lock(conns_mu_); | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| std::string output; | ||||||||||||||||||||||||||||||||||||
| for (const auto &iter : conns_) { | ||||||||||||||||||||||||||||||||||||
| redis::Connection *conn = iter.second; | ||||||||||||||||||||||||||||||||||||
| output.append(conn->ToString()); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| return output; | ||||||||||||||||||||||||||||||||||||
| return tbb::parallel_reduce( | ||||||||||||||||||||||||||||||||||||
| conns_.range(), std::string{}, | ||||||||||||||||||||||||||||||||||||
| [](const ConnMap::range_type &range, std::string &&result) { | ||||||||||||||||||||||||||||||||||||
|         
                  mapleFU marked this conversation as resolved.
              Outdated
          
            Show resolved
            Hide resolved | ||||||||||||||||||||||||||||||||||||
| for (auto &it : range) { | ||||||||||||||||||||||||||||||||||||
| result.append(it.second->ToString()); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| return result; | ||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||
| [](const std::string &lhs, const std::string &rhs) { | ||||||||||||||||||||||||||||||||||||
| std::string result = lhs; | ||||||||||||||||||||||||||||||||||||
| result.append(rhs); | ||||||||||||||||||||||||||||||||||||
| return result; | ||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| void Worker::KillClient(redis::Connection *self, uint64_t id, const std::string &addr, uint64_t type, bool skipme, | ||||||||||||||||||||||||||||||||||||
| int64_t *killed) { | ||||||||||||||||||||||||||||||||||||
| std::lock_guard<std::mutex> guard(conns_mu_); | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| for (const auto &iter : conns_) { | ||||||||||||||||||||||||||||||||||||
| redis::Connection *conn = iter.second; | ||||||||||||||||||||||||||||||||||||
| if (skipme && self == conn) continue; | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| // no need to kill the client again if the kCloseAfterReply flag is set | ||||||||||||||||||||||||||||||||||||
| if (conn->IsFlagEnabled(redis::Connection::kCloseAfterReply)) { | ||||||||||||||||||||||||||||||||||||
| continue; | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| if ((type & conn->GetClientType()) || | ||||||||||||||||||||||||||||||||||||
| (!addr.empty() && (conn->GetAddr() == addr || conn->GetAnnounceAddr() == addr)) || | ||||||||||||||||||||||||||||||||||||
| (id != 0 && conn->GetID() == id)) { | ||||||||||||||||||||||||||||||||||||
| conn->EnableFlag(redis::Connection::kCloseAfterReply); | ||||||||||||||||||||||||||||||||||||
| // enable write event to notify worker wake up ASAP, and remove the connection | ||||||||||||||||||||||||||||||||||||
| if (!conn->IsFlagEnabled(redis::Connection::kSlave)) { // don't enable any event in slave connection | ||||||||||||||||||||||||||||||||||||
| auto bev = conn->GetBufferEvent(); | ||||||||||||||||||||||||||||||||||||
| bufferevent_enable(bev, EV_WRITE); | ||||||||||||||||||||||||||||||||||||
| for (const auto key : getConnFds()) { | ||||||||||||||||||||||||||||||||||||
| if (ConnMap::accessor accessor; conns_.find(accessor, key)) { | ||||||||||||||||||||||||||||||||||||
| auto conn = accessor->second; | ||||||||||||||||||||||||||||||||||||
| if (skipme && self == conn) continue; | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| // no need to kill the client again if the kCloseAfterReply flag is set | ||||||||||||||||||||||||||||||||||||
| if (conn->IsFlagEnabled(redis::Connection::kCloseAfterReply)) { | ||||||||||||||||||||||||||||||||||||
| continue; | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| if ((type & conn->GetClientType()) || | ||||||||||||||||||||||||||||||||||||
| (!addr.empty() && (conn->GetAddr() == addr || conn->GetAnnounceAddr() == addr)) || | ||||||||||||||||||||||||||||||||||||
| (id != 0 && conn->GetID() == id)) { | ||||||||||||||||||||||||||||||||||||
| conn->EnableFlag(redis::Connection::kCloseAfterReply); | ||||||||||||||||||||||||||||||||||||
| // enable write event to notify worker wake up ASAP, and remove the connection | ||||||||||||||||||||||||||||||||||||
| if (!conn->IsFlagEnabled(redis::Connection::kSlave)) { // don't enable any event in slave connection | ||||||||||||||||||||||||||||||||||||
| auto bev = conn->GetBufferEvent(); | ||||||||||||||||||||||||||||||||||||
| bufferevent_enable(bev, EV_WRITE); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| (*killed)++; | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| (*killed)++; | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| void Worker::KickoutIdleClients(int timeout) { | ||||||||||||||||||||||||||||||||||||
| std::vector<std::pair<int, uint64_t>> to_be_killed_conns; | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||
| std::lock_guard<std::mutex> guard(conns_mu_); | ||||||||||||||||||||||||||||||||||||
| if (conns_.empty()) { | ||||||||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| auto fd_list = getConnFds(); | ||||||||||||||||||||||||||||||||||||
| if (fd_list.empty()) { | ||||||||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| int iterations = std::min(static_cast<int>(conns_.size()), 50); | ||||||||||||||||||||||||||||||||||||
| auto iter = conns_.upper_bound(last_iter_conn_fd_); | ||||||||||||||||||||||||||||||||||||
| while (iterations--) { | ||||||||||||||||||||||||||||||||||||
| if (iter == conns_.end()) iter = conns_.begin(); | ||||||||||||||||||||||||||||||||||||
| if (static_cast<int>(iter->second->GetIdleTime()) >= timeout) { | ||||||||||||||||||||||||||||||||||||
| to_be_killed_conns.emplace_back(iter->first, iter->second->GetID()); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| iter++; | ||||||||||||||||||||||||||||||||||||
| std::set<int> fds(fd_list.cbegin(), fd_list.cend()); | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| int iterations = std::min(static_cast<int>(conns_.size()), 50); | ||||||||||||||||||||||||||||||||||||
| auto iter = fds.upper_bound(last_iter_conn_fd_); | ||||||||||||||||||||||||||||||||||||
| while (iterations--) { | ||||||||||||||||||||||||||||||||||||
| if (iter == fds.end()) { | ||||||||||||||||||||||||||||||||||||
| iter = fds.begin(); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| if (ConnMap::const_accessor accessor; | ||||||||||||||||||||||||||||||||||||
| conns_.find(accessor, *iter) && static_cast<int>(accessor->second->GetIdleTime()) >= timeout) { | ||||||||||||||||||||||||||||||||||||
| to_be_killed_conns.emplace_back(accessor->first, accessor->second->GetID()); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| iter--; | ||||||||||||||||||||||||||||||||||||
| last_iter_conn_fd_ = iter->first; | ||||||||||||||||||||||||||||||||||||
| iter++; | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| iter--; | ||||||||||||||||||||||||||||||||||||
| last_iter_conn_fd_ = *iter; | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| for (const auto &conn : to_be_killed_conns) { | ||||||||||||||||||||||||||||||||||||
| FreeConnectionByID(conn.first, conn.second); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| std::vector<int> Worker::getConnFds() const { | ||||||||||||||||||||||||||||||||||||
| return tbb::parallel_reduce( | ||||||||||||||||||||||||||||||||||||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO parallel_for looks like some cpu-bound operations, and would it occupies more threads than expected here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @mapleFU , Best Regards, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 
 I think it's worth doing if we don't traverse all  | ||||||||||||||||||||||||||||||||||||
| conns_.range(), std::vector<int>{}, | ||||||||||||||||||||||||||||||||||||
| [](const ConnMap::const_range_type &range, std::vector<int> result) { | ||||||||||||||||||||||||||||||||||||
| for (const auto &fd : range) { | ||||||||||||||||||||||||||||||||||||
| result.emplace_back(fd.first); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| return result; | ||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||
| [](const std::vector<int> &lhs, const std::vector<int> &rhs) { | ||||||||||||||||||||||||||||||||||||
| std::vector<int> result = lhs; | ||||||||||||||||||||||||||||||||||||
| result.insert(result.end(), rhs.begin(), rhs.end()); | ||||||||||||||||||||||||||||||||||||
| return result; | ||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| void WorkerThread::Start() { | ||||||||||||||||||||||||||||||||||||
| auto s = util::CreateThread("worker", [this] { this->worker_->Run(std::this_thread::get_id()); }); | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
|  | @@ -564,6 +592,25 @@ void WorkerThread::Start() { | |||||||||||||||||||||||||||||||||||
| LOG(INFO) << "[worker] Thread #" << t_.get_id() << " started"; | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| std::map<int, redis::Connection *> Worker::GetConnections() const { | ||||||||||||||||||||||||||||||||||||
| std::map<int, redis::Connection *> result; | ||||||||||||||||||||||||||||||||||||
| result = tbb::parallel_reduce( | ||||||||||||||||||||||||||||||||||||
| conns_.range(), result, | ||||||||||||||||||||||||||||||||||||
| [](const ConnMap::const_range_type &range, std::map<int, redis::Connection *> &&tmp_result) { | ||||||||||||||||||||||||||||||||||||
| // std::map<int, redis::Connection *> tmp_result; | ||||||||||||||||||||||||||||||||||||
| for (auto &it : range) { | ||||||||||||||||||||||||||||||||||||
| tmp_result.emplace(it.first, it.second); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| return tmp_result; | ||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||
| [](const std::map<int, redis::Connection *> &lhs, const std::map<int, redis::Connection *> &rhs) { | ||||||||||||||||||||||||||||||||||||
| std::map<int, redis::Connection *> result = lhs; | ||||||||||||||||||||||||||||||||||||
| result.insert(rhs.cbegin(), rhs.cend()); | ||||||||||||||||||||||||||||||||||||
| return result; | ||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||
| return result; | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| void WorkerThread::Stop(uint32_t wait_seconds) { worker_->Stop(wait_seconds); } | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
| void WorkerThread::Join() { | ||||||||||||||||||||||||||||||||||||
|  | ||||||||||||||||||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.