Skip to content

Commit

Permalink
Merge pull request #357 from holmes1412/master
Browse files Browse the repository at this point in the history
rpc_var : fix concurrent bug when CounterVar will loop data
  • Loading branch information
Barenboim authored Jan 15, 2024
2 parents dfecdae + 709d3ce commit 1add506
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 8 deletions.
8 changes: 4 additions & 4 deletions src/module/rpc_metrics_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ RPCMetricsFilter::RPCMetricsFilter() :
bool RPCMetricsFilter::client_end(SubTask *task, RPCModuleData& data)
{
this->gauge(METRICS_REQUEST_COUNT)->increase();
this->counter(METRICS_REQUEST_METHOD)->add(
this->counter(METRICS_REQUEST_METHOD)->increase(
{{"service", data[OTLP_SERVICE_NAME]},
{"method", data[OTLP_METHOD_NAME] }})->increase();
{"method", data[OTLP_METHOD_NAME] }});
this->summary(METRICS_REQUEST_LATENCY)->observe(atoll(data[SRPC_DURATION].data()));

return true;
Expand All @@ -75,9 +75,9 @@ bool RPCMetricsFilter::client_end(SubTask *task, RPCModuleData& data)
bool RPCMetricsFilter::server_end(SubTask *task, RPCModuleData& data)
{
this->gauge(METRICS_REQUEST_COUNT)->increase();
this->counter(METRICS_REQUEST_METHOD)->add(
this->counter(METRICS_REQUEST_METHOD)->increase(
{{"service", data[OTLP_SERVICE_NAME]},
{"method", data[OTLP_METHOD_NAME] }})->increase();
{"method", data[OTLP_METHOD_NAME] }});
this->summary(METRICS_REQUEST_LATENCY)->observe(atoll(data[SRPC_DURATION].data()));

return true;
Expand Down
42 changes: 38 additions & 4 deletions src/var/rpc_var.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,14 @@ RPCVar *RPCVarGlobal::find(const std::string& name)
for (size_t i = 0; i < global_var->local_vars.size() && !ret; i++)
{
local = global_var->local_vars[i];
for (it = local->vars.begin(); it != local->vars.end(); it++)
local->mutex.lock();

for (it = local->vars.begin(); it != local->vars.end() && !ret; it++)
{
if (!name.compare(it->second->get_name()))
{
ret = it->second;
break;
}
}
local->mutex.unlock();
}

global_var->mutex.unlock();
Expand All @@ -203,6 +203,8 @@ CounterVar::~CounterVar()
delete it->second;
}

// Caution :
// make sure local->mutex.lock() before CounterVar::create(true)
RPCVar *CounterVar::create(bool with_data)
{
CounterVar *var = new CounterVar(this->name, this->help);
Expand Down Expand Up @@ -232,6 +234,9 @@ bool CounterVar::label_to_str(const LABEL_MAP& labels, std::string& str)
return true;
}

// [deprecate]
// This cannot guarantee the GaugeVar still exists
// because global will counter->reset() and delete the internal GaugeVar
GaugeVar *CounterVar::add(const LABEL_MAP& labels)
{
std::string label_str;
Expand All @@ -253,6 +258,35 @@ GaugeVar *CounterVar::add(const LABEL_MAP& labels)
return var;
}

void CounterVar::increase(const LABEL_MAP& labels)
{
std::string label_str;
GaugeVar *var;

if (!this->label_to_str(labels, label_str))
return;

RPCVarLocal *local = RPCVarLocal::get_instance();
local->mutex.lock(); // against reset() and delete GaugeVar

auto it = this->data.find(label_str);

if (it == this->data.end())
{
var = new GaugeVar(label_str, "");
this->data.insert(std::make_pair(label_str, var));
}
else
var = it->second;

var->increase();
local->mutex.unlock();

return;
}

// Caution :
// make sure local->mutex.lock() before CounterVar::reduce()
bool CounterVar::reduce(const void *ptr, size_t)
{
std::unordered_map<std::string, GaugeVar *> *data;
Expand Down
1 change: 1 addition & 0 deletions src/var/rpc_var.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ class CounterVar : public RPCVar
public:
using LABEL_MAP = std::map<std::string, std::string>;
GaugeVar *add(const LABEL_MAP& labels);
void increase(const LABEL_MAP& labels);

RPCVar *create(bool with_data) override;
bool reduce(const void *ptr, size_t sz) override;
Expand Down

0 comments on commit 1add506

Please sign in to comment.