Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3d495ef
remove related mutex with tbb hashmap
LindaSummer Jul 28, 2024
f6192d4
add mutex for erase and iteration
LindaSummer Jul 30, 2024
4416da7
remove useless code
LindaSummer Aug 1, 2024
fe89194
use tbb_parallel function rather than legacy for loop
LindaSummer Aug 4, 2024
1104241
remove useless headers
LindaSummer Aug 4, 2024
effbcf9
Merge branch 'unstable' into feature/tbb_worker_with_mutex
LindaSummer Aug 4, 2024
c0d9efb
refactor duplicated code
LindaSummer Aug 7, 2024
3c4205a
Merge branch 'unstable' into feature/tbb_worker_with_mutex
LindaSummer Aug 7, 2024
c17ae9a
Merge branch 'unstable' into feature/tbb_worker_with_mutex
LindaSummer Aug 8, 2024
9f1b707
remove reduandant concurrency protection in dtor
LindaSummer Aug 9, 2024
8385b41
Merge branch 'unstable' into feature/tbb_worker_with_mutex
LindaSummer Aug 9, 2024
14a0deb
Merge branch 'unstable' into feature/tbb_worker_with_mutex
LindaSummer Aug 9, 2024
f4bbded
fix sonar cloud analysis issue
LindaSummer Aug 9, 2024
797c9d9
Merge branch 'feature/tbb_worker_with_mutex' of github.com:LindaSumme…
LindaSummer Aug 9, 2024
4eff1e8
Merge branch 'unstable' into feature/tbb_worker_with_mutex
LindaSummer Aug 9, 2024
2027b03
make value delete after accessor erased
LindaSummer Aug 9, 2024
e3d992d
Merge branch 'feature/tbb_worker_with_mutex' of github.com:LindaSumme…
LindaSummer Aug 9, 2024
49d6792
make tbb header including use `<>`
LindaSummer Aug 9, 2024
8011d59
fix `monitor_conns` inserting behavior and refactor useless code.
LindaSummer Aug 11, 2024
20678a5
make deleting item before accessor erasing
LindaSummer Aug 11, 2024
b17dcec
Merge branch 'unstable' into feature/tbb_worker_with_mutex
LindaSummer Aug 11, 2024
0f3f8bd
fix sonar issue: `Worker::~Worker()` can keep the original code
LindaSummer Aug 12, 2024
afccc88
Merge branch 'feature/tbb_worker_with_mutex' of github.com:LindaSumme…
LindaSummer Aug 12, 2024
9714558
Merge branch 'unstable' into feature/tbb_worker_with_mutex
LindaSummer Aug 14, 2024
1665dab
fix typo and remove useless debug comment
LindaSummer Aug 15, 2024
65648bf
Merge branch 'unstable' into feature/tbb_worker_with_mutex
LindaSummer Aug 20, 2024
261812d
add thread constraint for tbb parallel function.
LindaSummer Sep 2, 2024
0b64679
Merge branch 'unstable' into feature/tbb_worker_with_mutex
LindaSummer Sep 2, 2024
ee88850
fix free connection issue.
LindaSummer Sep 3, 2024
1484808
Merge branch 'unstable' into feature/tbb_worker_with_mutex
LindaSummer Sep 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
275 changes: 172 additions & 103 deletions src/server/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
#include <list>
#include <utility>

#include "oneapi/tbb/parallel_for.h"
#include "oneapi/tbb/parallel_reduce.h"
#include "redis_connection.h"
#include "redis_request.h"
#include "server.h"
Expand Down Expand Up @@ -76,17 +78,40 @@ Worker::Worker(Server *srv, Config *config) : srv(srv), base_(event_base_new())
}

Worker::~Worker() {
std::vector<redis::Connection *> conns;
conns.reserve(conns_.size() + monitor_conns_.size());

for (const auto &iter : conns_) {
conns.emplace_back(iter.second);
}
for (const auto &iter : monitor_conns_) {
conns.emplace_back(iter.second);
auto conns = tbb::parallel_reduce(
conns_.range(), std::vector<redis::Connection *>{},
[](const ConnMap::range_type &range, std::vector<redis::Connection *> &&result) {
for (auto &it : range) {
result.push_back(it.second);
}
return result;
},
[](const std::vector<redis::Connection *> &lhs, const std::vector<redis::Connection *> &rhs) {
std::vector<redis::Connection *> result = lhs;
result.insert(result.end(), rhs.begin(), rhs.end());
return result;
});

auto monitor_conns = tbb::parallel_reduce(
monitor_conns_.range(), std::vector<redis::Connection *>{},
[](const ConnMap::range_type &range, std::vector<redis::Connection *> &&result) {
for (auto &it : range) {
result.push_back(it.second);
}
return result;
},
[](const std::vector<redis::Connection *> &lhs, const std::vector<redis::Connection *> &rhs) {
std::vector<redis::Connection *> result = lhs;
result.insert(result.end(), rhs.begin(), rhs.end());
return result;
});

for (auto conn : conns) {
conn->Close();
}
for (const auto &iter : conns) {
iter->Close();

for (auto conn : monitor_conns) {
conn->Close();
}

timer_.reset();
Expand Down Expand Up @@ -311,9 +336,7 @@ void Worker::Stop(uint32_t wait_seconds) {
}

Status Worker::AddConnection(redis::Connection *c) {
std::unique_lock<std::mutex> lock(conns_mu_);
auto iter = conns_.find(c->GetFD());
if (iter != conns_.end()) {
if (ConnMap::const_accessor accessor; conns_.find(accessor, c->GetFD())) {
return {Status::NotOK, "connection was exists"};
}

Expand All @@ -323,7 +346,8 @@ Status Worker::AddConnection(redis::Connection *c) {
return {Status::NotOK, "max number of clients reached"};
}

conns_.emplace(c->GetFD(), c);
ConnMap::accessor accessor;
conns_.insert(accessor, std::make_pair(c->GetFD(), c));
uint64_t id = srv->GetClientID();
c->SetID(id);

Expand All @@ -333,18 +357,17 @@ Status Worker::AddConnection(redis::Connection *c) {
redis::Connection *Worker::removeConnection(int fd) {
redis::Connection *conn = nullptr;

std::unique_lock<std::mutex> lock(conns_mu_);
auto iter = conns_.find(fd);
if (iter != conns_.end()) {
conn = iter->second;
conns_.erase(iter);
if (ConnMap::accessor accessor; conns_.find(accessor, fd)) {
{
conn = accessor->second;
conns_.erase(accessor);
}
srv->DecrClientNum();
}

iter = monitor_conns_.find(fd);
if (iter != monitor_conns_.end()) {
conn = iter->second;
monitor_conns_.erase(iter);
if (ConnMap::accessor accessor; monitor_conns_.find(accessor, fd)) {
conn = accessor->second;
monitor_conns_.erase(accessor);
srv->DecrClientNum();
srv->DecrMonitorClientNum();
}
Expand Down Expand Up @@ -409,31 +432,27 @@ void Worker::FreeConnection(redis::Connection *conn) {
}

void Worker::FreeConnectionByID(int fd, uint64_t id) {
std::unique_lock<std::mutex> lock(conns_mu_);
auto iter = conns_.find(fd);
if (iter != conns_.end() && iter->second->GetID() == id) {
if (ConnMap::accessor accessor; conns_.find(accessor, fd)) {
if (rate_limit_group_ != nullptr) {
bufferevent_remove_from_rate_limit_group(iter->second->GetBufferEvent());
bufferevent_remove_from_rate_limit_group(accessor->second->GetBufferEvent());
}
delete iter->second;
conns_.erase(iter);

delete accessor->second;
conns_.erase(accessor);

srv->DecrClientNum();
}

iter = monitor_conns_.find(fd);
if (iter != monitor_conns_.end() && iter->second->GetID() == id) {
delete iter->second;
monitor_conns_.erase(iter);
if (ConnMap::accessor accessor; monitor_conns_.find(accessor, fd)) {
delete accessor->second;
monitor_conns_.erase(accessor);
srv->DecrClientNum();
srv->DecrMonitorClientNum();
}
}

Status Worker::EnableWriteEvent(int fd) {
std::unique_lock<std::mutex> lock(conns_mu_);
auto iter = conns_.find(fd);
if (iter != conns_.end()) {
auto bev = iter->second->GetBufferEvent();
if (ConnMap::const_accessor accessor; conns_.find(accessor, fd)) {
auto bev = accessor->second->GetBufferEvent();
bufferevent_enable(bev, EV_WRITE);
return Status::OK();
}
Expand All @@ -442,115 +461,146 @@ Status Worker::EnableWriteEvent(int fd) {
}

Status Worker::Reply(int fd, const std::string &reply) {
std::unique_lock<std::mutex> lock(conns_mu_);
auto iter = conns_.find(fd);
if (iter != conns_.end()) {
iter->second->SetLastInteraction();
redis::Reply(iter->second->Output(), reply);
if (ConnMap::accessor accessor; conns_.find(accessor, fd)) {
accessor->second->SetLastInteraction();
redis::Reply(accessor->second->Output(), reply);
return Status::OK();
}

return {Status::NotOK, "connection doesn't exist"};
}

void Worker::BecomeMonitorConn(redis::Connection *conn) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mapleFU ,

Sorry for delay reply.
I'd like to use this way on all accessors to mitigate potential inconsistency in monitor_conns_ and conns_.

Suggested change
void Worker::BecomeMonitorConn(redis::Connection *conn) {
void Worker::BecomeMonitorConn(redis::Connection *conn) {
ConnMap::accessor accessor;
ConnMap::accessor monitor_accessor;
bool find_conn = conns_.find(accessor, conn->GetFD());
bool find_monitor = monitor_conns_.find(monitor_accessor, conn->GetFD());
if (find_conn) {
conns_.erase(accessor);
}
if (find_monitor) {
monitor_accessor->second = conn;
} else {
monitor_conns_.insert(monitor_accessor, std::make_pair(conn->GetFD(), conn));
}
srv->IncrMonitorClientNum();
conn->EnableFlag(redis::Connection::kMonitor);
}

Best Regards,
Edward

{
std::lock_guard<std::mutex> guard(conns_mu_);
conns_.erase(conn->GetFD());
monitor_conns_[conn->GetFD()] = conn;
if (ConnMap::accessor accessor; conns_.find(accessor, conn->GetFD())) {
conns_.erase(accessor);
accessor.release();

if (ConnMap::accessor accessor; monitor_conns_.find(accessor, conn->GetFD())) {
accessor->second = conn;
} else {
monitor_conns_.insert(accessor, std::make_pair(conn->GetFD(), conn));
}
}
srv->IncrMonitorClientNum();
conn->EnableFlag(redis::Connection::kMonitor);
}

void Worker::QuitMonitorConn(redis::Connection *conn) {
{
std::lock_guard<std::mutex> guard(conns_mu_);
monitor_conns_.erase(conn->GetFD());
conns_[conn->GetFD()] = conn;
if (ConnMap::accessor accessor; monitor_conns_.find(accessor, conn->GetFD())) {
{
monitor_conns_.erase(accessor);
accessor.release();
}
if (ConnMap::accessor accessor; conns_.find(accessor, conn->GetFD())) {
accessor->second = conn;
} else {
conns_.insert(accessor, std::make_pair(conn->GetFD(), conn));
}
}
srv->DecrMonitorClientNum();
conn->DisableFlag(redis::Connection::kMonitor);
}

void Worker::FeedMonitorConns(redis::Connection *conn, const std::string &response) {
std::unique_lock<std::mutex> lock(conns_mu_);

for (const auto &iter : monitor_conns_) {
if (conn == iter.second) continue; // skip the monitor command

if (conn->GetNamespace() == iter.second->GetNamespace() || iter.second->GetNamespace() == kDefaultNamespace) {
iter.second->Reply(response);
tbb::parallel_for(monitor_conns_.range(), [conn, response](const ConnMap::range_type &range) {
for (auto &it : range) {
const auto &value = it.second;
if (conn == value) continue; // skip the monitor command
if (conn->GetNamespace() == value->GetNamespace() || value->GetNamespace() == kDefaultNamespace) {
value->Reply(response);
}
}
}
});
}

std::string Worker::GetClientsStr() {
std::unique_lock<std::mutex> lock(conns_mu_);

std::string output;
for (const auto &iter : conns_) {
redis::Connection *conn = iter.second;
output.append(conn->ToString());
}

return output;
return tbb::parallel_reduce(
conns_.range(), std::string{},
[](const ConnMap::range_type &range, std::string &&result) {
for (auto &it : range) {
result.append(it.second->ToString());
}
return result;
},
[](const std::string &lhs, const std::string &rhs) {
std::string result = lhs;
result.append(rhs);
return result;
});
}

void Worker::KillClient(redis::Connection *self, uint64_t id, const std::string &addr, uint64_t type, bool skipme,
int64_t *killed) {
std::lock_guard<std::mutex> guard(conns_mu_);

for (const auto &iter : conns_) {
redis::Connection *conn = iter.second;
if (skipme && self == conn) continue;

// no need to kill the client again if the kCloseAfterReply flag is set
if (conn->IsFlagEnabled(redis::Connection::kCloseAfterReply)) {
continue;
}

if ((type & conn->GetClientType()) ||
(!addr.empty() && (conn->GetAddr() == addr || conn->GetAnnounceAddr() == addr)) ||
(id != 0 && conn->GetID() == id)) {
conn->EnableFlag(redis::Connection::kCloseAfterReply);
// enable write event to notify worker wake up ASAP, and remove the connection
if (!conn->IsFlagEnabled(redis::Connection::kSlave)) { // don't enable any event in slave connection
auto bev = conn->GetBufferEvent();
bufferevent_enable(bev, EV_WRITE);
for (const auto key : getConnFds()) {
if (ConnMap::accessor accessor; conns_.find(accessor, key)) {
auto conn = accessor->second;
if (skipme && self == conn) continue;

// no need to kill the client again if the kCloseAfterReply flag is set
if (conn->IsFlagEnabled(redis::Connection::kCloseAfterReply)) {
continue;
}
if ((type & conn->GetClientType()) ||
(!addr.empty() && (conn->GetAddr() == addr || conn->GetAnnounceAddr() == addr)) ||
(id != 0 && conn->GetID() == id)) {
conn->EnableFlag(redis::Connection::kCloseAfterReply);
// enable write event to notify worker wake up ASAP, and remove the connection
if (!conn->IsFlagEnabled(redis::Connection::kSlave)) { // don't enable any event in slave connection
auto bev = conn->GetBufferEvent();
bufferevent_enable(bev, EV_WRITE);
}
(*killed)++;
}
(*killed)++;
}
}
}

void Worker::KickoutIdleClients(int timeout) {
std::vector<std::pair<int, uint64_t>> to_be_killed_conns;

{
std::lock_guard<std::mutex> guard(conns_mu_);
if (conns_.empty()) {
return;
}
auto fd_list = getConnFds();
if (fd_list.empty()) {
return;
}

int iterations = std::min(static_cast<int>(conns_.size()), 50);
auto iter = conns_.upper_bound(last_iter_conn_fd_);
while (iterations--) {
if (iter == conns_.end()) iter = conns_.begin();
if (static_cast<int>(iter->second->GetIdleTime()) >= timeout) {
to_be_killed_conns.emplace_back(iter->first, iter->second->GetID());
}
iter++;
std::set<int> fds(fd_list.cbegin(), fd_list.cend());

int iterations = std::min(static_cast<int>(conns_.size()), 50);
auto iter = fds.upper_bound(last_iter_conn_fd_);
while (iterations--) {
if (iter == fds.end()) {
iter = fds.begin();
}
if (ConnMap::const_accessor accessor;
conns_.find(accessor, *iter) && static_cast<int>(accessor->second->GetIdleTime()) >= timeout) {
to_be_killed_conns.emplace_back(accessor->first, accessor->second->GetID());
}
iter--;
last_iter_conn_fd_ = iter->first;
iter++;
}
iter--;
last_iter_conn_fd_ = *iter;

for (const auto &conn : to_be_killed_conns) {
FreeConnectionByID(conn.first, conn.second);
}
}

std::vector<int> Worker::getConnFds() const {
return tbb::parallel_reduce(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO parallel_for looks like some cpu-bound operations, and would it occupies more threads than expected here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mapleFU ,
Thanks for your review suggestion.
I read the tbb's official document, and find the schedule uses all cores on default.
Maybe we should set a limitation for tbb schedule.
In fact, I'm not sure do we really need tbb hashmap to replace current mutex now. 🤔

Best Regards,
Edward

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, I'm not sure do we really need tbb hashmap to replace current mutex now.

I think it's worth doing if we don't traverse all conns_ frequently.

conns_.range(), std::vector<int>{},
[](const ConnMap::const_range_type &range, std::vector<int> result) {
for (const auto &fd : range) {
result.emplace_back(fd.first);
}
return result;
},
[](const std::vector<int> &lhs, const std::vector<int> &rhs) {
std::vector<int> result = lhs;
result.insert(result.end(), rhs.begin(), rhs.end());
return result;
});
}

void WorkerThread::Start() {
auto s = util::CreateThread("worker", [this] { this->worker_->Run(std::this_thread::get_id()); });

Expand All @@ -564,6 +614,25 @@ void WorkerThread::Start() {
LOG(INFO) << "[worker] Thread #" << t_.get_id() << " started";
}

std::map<int, redis::Connection *> Worker::GetConnections() const {
std::map<int, redis::Connection *> result;
result = tbb::parallel_reduce(
conns_.range(), result,
[](const ConnMap::const_range_type &range, std::map<int, redis::Connection *> &&tmp_result) {
// std::map<int, redis::Connection *> tmp_result;
for (auto &it : range) {
tmp_result.emplace(it.first, it.second);
}
return tmp_result;
},
[](const std::map<int, redis::Connection *> &lhs, const std::map<int, redis::Connection *> &rhs) {
std::map<int, redis::Connection *> result = lhs;
result.insert(rhs.cbegin(), rhs.cend());
return result;
});
return result;
}

void WorkerThread::Stop(uint32_t wait_seconds) { worker_->Stop(wait_seconds); }

void WorkerThread::Join() {
Expand Down
Loading