Skip to content

Commit

Permalink
Merge pull request #361 from holmes1412/master
Browse files Browse the repository at this point in the history
Recent update for RPCModule
  • Loading branch information
Barenboim authored Mar 4, 2024
2 parents 27e34fb + 11daf65 commit c5d27e0
Show file tree
Hide file tree
Showing 5 changed files with 364 additions and 61 deletions.
2 changes: 1 addition & 1 deletion src/module/rpc_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class RPCFilter
return true;
}

const std::string raw_var_name(const std::string& name) const
static const std::string raw_var_name(const std::string& name)
{
size_t pos = name.find("::");
if (pos != std::string::npos)
Expand Down
144 changes: 127 additions & 17 deletions src/module/rpc_metrics_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ SummaryVar *RPCMetricsFilter::summary(const std::string& name)
return RPCVarFactory::summary(name);
}

HistogramCounterVar *RPCMetricsFilter::histogram_counter(const std::string &name)
{
std::string var_name = this->get_name() + name;
return RPCVarFactory::histogram_counter(var_name);
}

GaugeVar *RPCMetricsFilter::create_gauge(const std::string& str,
const std::string& help)
{
Expand Down Expand Up @@ -249,6 +255,32 @@ SummaryVar *RPCMetricsFilter::create_summary(const std::string& str,
return summary;
}

HistogramCounterVar *RPCMetricsFilter::create_histogram_counter(const std::string &str,
const std::string &help,
const std::vector<double> &bucket)
{
if (RPCVarFactory::check_name_format(str) == false)
{
errno = EINVAL;
return NULL;
}

std::string name = this->get_name() + str;
this->mutex.lock();
const auto it = var_names.insert(name);
this->mutex.unlock();

if (!it.second)
{
errno = EEXIST;
return NULL;
}

HistogramCounterVar *hc = new HistogramCounterVar(name, help, bucket);
RPCVarLocal::get_instance()->add(name, hc);
return hc;
}

void RPCMetricsFilter::reduce(std::unordered_map<std::string, RPCVar *>& out)
{
std::unordered_map<std::string, RPCVar *>::iterator it;
Expand All @@ -274,6 +306,26 @@ void RPCMetricsFilter::reduce(std::unordered_map<std::string, RPCVar *>& out)
global_var->mutex.unlock();
}

void RPCMetricsFilter::reset()
{
std::unordered_map<std::string, RPCVar *>::iterator it;
RPCVarGlobal *global_var = RPCVarGlobal::get_instance();

global_var->mutex.lock();
for (RPCVarLocal *local : global_var->local_vars)
{
local->mutex.lock();
for (it = local->vars.begin(); it != local->vars.end(); it++)
{
if (this->var_names.find(it->first) == this->var_names.end())
continue;
it->second->reset();
}
local->mutex.unlock();
}
global_var->mutex.unlock();
}

RPCMetricsPull::RPCMetricsPull() :
collector(this->report_output),
server(std::bind(&RPCMetricsPull::pull, this, std::placeholders::_1))
Expand Down Expand Up @@ -557,6 +609,7 @@ bool RPCMetricsOTel::expose(google::protobuf::Message *msg)
std::unordered_map<std::string, RPCVar *> tmp;
std::unordered_map<std::string, RPCVar *>::iterator it;
ScopeMetrics *metrics;
Histogram *report_histogram;
metrics = static_cast<ScopeMetrics *>(msg);

this->reduce(tmp);
Expand All @@ -569,7 +622,7 @@ bool RPCMetricsOTel::expose(google::protobuf::Message *msg)
Metric *m = metrics->add_metrics();
google::protobuf::Message *current_var;

m->set_name(this->raw_var_name(var->get_name()));
m->set_name(RPCFilter::raw_var_name(var->get_name()));
m->set_description(var->get_help());

switch(var->get_type())
Expand All @@ -584,19 +637,25 @@ bool RPCMetricsOTel::expose(google::protobuf::Message *msg)
this->collector.collect_counter(var, current_var);
break;
case VAR_HISTOGRAM:
current_var = m->mutable_histogram();
report_histogram = m->mutable_histogram();
current_var = report_histogram->add_data_points();
this->collector.collect_histogram(var, current_var);
break;
case VAR_SUMMARY:
current_var = m->mutable_summary();
this->collector.collect_summary(var, current_var);
break;
case VAR_HISTOGRAM_COUNTER:
// add multiple metrics inside
this->collector.collect_histogram_counter(var, metrics);
break;
}
}

for (it = tmp.begin(); it != tmp.end(); it++)
delete it->second;

this->reset(); // reset by report interval
return true;
}

Expand All @@ -618,7 +677,7 @@ void RPCMetricsOTel::Collector::collect_counter(RPCVar *var,
CounterVar *counter = (CounterVar *)var;

std::unordered_map<std::string, GaugeVar *> *data;
data = (std::unordered_map<std::string, GaugeVar *> *)counter->get_data();
data = (std::unordered_map<std::string, GaugeVar *> *)counter->get_map();

for (auto it = data->begin(); it != data->end(); it++)
this->collect_counter_each(it->first, it->second->get(), msg);
Expand Down Expand Up @@ -661,22 +720,25 @@ void RPCMetricsOTel::Collector::collect_counter_each(const std::string& label,
{
Sum *report_sum = static_cast<Sum *>(msg);
NumberDataPoint *data_points = report_sum->add_data_points();
std::map<std::string, LABEL_MAP *>::iterator it = this->label_map.find(label);
std::map<std::string, LABEL_MAP *>::iterator it;
std::string key;
std::string value;

if (it == this->label_map.end())
if (!label.empty())
{
this->add_counter_label(label);
it = this->label_map.find(label);
}
if (it == this->label_map.end())
{
this->add_counter_label(label);
it = this->label_map.find(label);
}

for (const auto& kv : *(it->second))
{
KeyValue *attribute = data_points->add_attributes();
attribute->set_key(kv.first);
AnyValue *value = attribute->mutable_value();
value->set_string_value(kv.second);
for (const auto& kv : *(it->second))
{
KeyValue *attribute = data_points->add_attributes();
attribute->set_key(kv.first);
AnyValue *value = attribute->mutable_value();
value->set_string_value(kv.second);
}
}

data_points->set_as_double(data);
Expand All @@ -687,13 +749,11 @@ void RPCMetricsOTel::Collector::collect_histogram(RPCVar *var,
google::protobuf::Message *msg)
{
HistogramVar *histogram = (HistogramVar *)var;
Histogram *report_histogram = static_cast<Histogram *>(msg);
HistogramDataPoint *data_points = static_cast<HistogramDataPoint *>(msg);

const std::vector<size_t> *bucket_counts = histogram->get_bucket_counts();
const std::vector<double> *bucket_boundaries = histogram->get_bucket_boundaries();

HistogramDataPoint *data_points = report_histogram->add_data_points();

// begin
data_points->set_time_unix_nano(this->current_timestamp_nano);

Expand Down Expand Up @@ -763,5 +823,55 @@ void RPCMetricsOTel::Collector::collect_summary_each(double quantile,
vaq->set_value(quantile_out);
}

void RPCMetricsOTel::Collector::collect_histogram_counter(RPCVar *var,
google::protobuf::Message *msg)
{
HistogramCounterVar *hc = (HistogramCounterVar *)var;
ScopeMetrics *metrics = static_cast<ScopeMetrics *>(msg);

const std::unordered_map<std::string, HistogramVar *> *data;
data = static_cast<const std::unordered_map<std::string,
HistogramVar *> *>(hc->get_map());

std::map<std::string, LABEL_MAP *>::iterator m_it;
std::string key;
std::string value;

Metric *m;
Histogram *report_histogram;
HistogramDataPoint *data_points;
std::string label;

for (auto it = data->begin(); it != data->end(); it++)
{
m = metrics->add_metrics();
m->set_name(RPCFilter::raw_var_name(hc->get_name()));
m->set_description(hc->get_help());
report_histogram = m->mutable_histogram();
report_histogram->set_aggregation_temporality(
AggregationTemporality::AGGREGATION_TEMPORALITY_DELTA);
data_points = report_histogram->add_data_points();

label = it->first;

m_it = this->label_map.find(label);
if (m_it == this->label_map.end())
{
this->add_counter_label(label);
m_it = this->label_map.find(label);
}

for (const auto &kv : *(m_it->second))
{
KeyValue *attribute = data_points->add_attributes();
attribute->set_key(kv.first);
AnyValue *value = attribute->mutable_value();
value->set_string_value(kv.second);
}

this->collect_histogram(it->second, data_points);
}
}

} // end namespace srpc

20 changes: 16 additions & 4 deletions src/module/rpc_metrics_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,16 @@ class RPCMetricsFilter : public RPCFilter
const std::vector<struct Quantile>& quantile,
const std::chrono::milliseconds max_age,
int age_bucket);

HistogramCounterVar *create_histogram_counter(const std::string &name,
const std::string &help,
const std::vector<double> &bucket);
// thread local api
GaugeVar *gauge(const std::string& name);
CounterVar *counter(const std::string& name);
HistogramVar *histogram(const std::string& name);
SummaryVar *summary(const std::string& name);
HistogramCounterVar *histogram_counter(const std::string &name);

// filter api
bool client_end(SubTask *task, RPCModuleData& data) override;
Expand All @@ -73,6 +78,7 @@ class RPCMetricsFilter : public RPCFilter

protected:
void reduce(std::unordered_map<std::string, RPCVar *>& out);
void reset();

protected:
std::mutex mutex;
Expand Down Expand Up @@ -219,12 +225,16 @@ class RPCMetricsOTel : public RPCMetricsFilter
// new api : fill var into msg
void collect_gauge(RPCVar *gauge, google::protobuf::Message *msg);
void collect_counter(RPCVar *counter, google::protobuf::Message *msg);
void collect_histogram(RPCVar *histogram, google::protobuf::Message *msg);
void collect_histogram(RPCVar *histogram,
google::protobuf::Message *msg);
void collect_summary(RPCVar *summary, google::protobuf::Message *msg);
void collect_histogram_counter(RPCVar *histogram_counter,
google::protobuf::Message *msg);

void collect_counter_each(const std::string &label, double data,
google::protobuf::Message *msg);
void collect_histogram_each(double bucket_boudary, size_t current_count,
void collect_histogram_each(double bucket_boudary,
size_t current_count,
google::protobuf::Message *msg);
void collect_summary_each(double quantile, double quantile_out,
google::protobuf::Message *msg);
Expand All @@ -237,12 +247,14 @@ class RPCMetricsOTel : public RPCMetricsFilter
void collect_histogram_begin(RPCVar *histogram) override {}
void collect_histogram_each(RPCVar *histogram, double bucket_boudary,
size_t current_count) override {}
void collect_histogram_end(RPCVar *histogram, double sum, size_t count) override{}
void collect_histogram_end(RPCVar *histogram, double sum,
size_t count) override{}

void collect_summary_begin(RPCVar *summary) override {}
void collect_summary_each(RPCVar *summary, double quantile,
double quantile_out) override {}
void collect_summary_end(RPCVar *summary, double sum, size_t count) override {}
void collect_summary_end(RPCVar *summary, double sum,
size_t count) override {}

private:
void add_counter_label(const std::string& label);
Expand Down
Loading

0 comments on commit c5d27e0

Please sign in to comment.