Skip to content

Commit cf7cab6

Browse files
author
tongjian
committed
address comment
Signed-off-by: tongjian <[email protected]>
1 parent 291d193 commit cf7cab6

File tree

4 files changed

+37
-15
lines changed

4 files changed

+37
-15
lines changed

grpc-sys/build.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ fn build_grpc(cc: &mut cc::Build, library: &str) {
300300

301301
fn figure_systemd_path(build_dir: &str) {
302302
let path = format!("{build_dir}/CMakeCache.txt");
303-
let f = BufReader::new(std::fs::File::open(&path).unwrap());
303+
let f = BufReader::new(std::fs::File::open(path).unwrap());
304304
let mut libdir: Option<String> = None;
305305
let mut libname: Option<String> = None;
306306
for l in f.lines() {

grpc-sys/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,5 @@ mod bindings {
1010
mod grpc_wrap;
1111

1212
pub use bindings::*;
13+
#[allow(unused_imports)]
1314
pub use grpc_wrap::*;

src/env.rs

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ use std::sync::mpsc;
55
use std::sync::Arc;
66
use std::thread::{Builder as ThreadBuilder, JoinHandle};
77

8+
use prometheus::local::LocalHistogram;
9+
use prometheus::IntCounter;
10+
811
use crate::cq::{CompletionQueue, CompletionQueueHandle, EventType, WorkQueue};
912
use crate::grpc_sys;
1013
use crate::task::CallTag;
@@ -16,39 +19,43 @@ use {
1619
GRPC_TASK_WAIT_DURATION,
1720
},
1821
crate::task::resolve,
19-
prometheus::{
20-
core::{AtomicU64, Counter},
21-
Histogram,
22-
},
2322
std::time::Instant,
2423
};
2524

25+
const METRICS_FLUSH_INTERVAL: u64 = 10_000; // 10s
26+
2627
#[cfg(feature = "prometheus")]
2728
pub struct GRPCRunner {
28-
cq_next_duration_his: Histogram,
29-
execute_duration_his: Histogram,
30-
wait_duration_his: Histogram,
31-
event_counter: [Counter<AtomicU64>; 6],
29+
cq_next_duration_his: LocalHistogram,
30+
execute_duration_his: LocalHistogram,
31+
wait_duration_his: LocalHistogram,
32+
event_counter: [IntCounter; 6],
33+
last_flush_time: Instant,
3234
}
3335

3436
#[cfg(feature = "prometheus")]
3537
impl GRPCRunner {
3638
pub fn new(name: &String) -> GRPCRunner {
37-
let cq_next_duration_his = GRPC_POOL_CQ_NEXT_DURATION.with_label_values(&[name]);
38-
let execute_duration_his = GRPC_POOL_EXECUTE_DURATION.with_label_values(&[name]);
39-
let wait_duration_his = GRPC_TASK_WAIT_DURATION.with_label_values(&[name]);
39+
let cq_next_duration_his = GRPC_POOL_CQ_NEXT_DURATION
40+
.with_label_values(&[name])
41+
.local();
42+
let execute_duration_his = GRPC_POOL_EXECUTE_DURATION
43+
.with_label_values(&[name])
44+
.local();
45+
let wait_duration_his = GRPC_TASK_WAIT_DURATION.with_label_values(&[name]).local();
4046
let event_counter = ["batch", "request", "unary", "abort", "action", "spawn"]
4147
.map(|event| GRPC_POOL_EVENT_COUNT_VEC.with_label_values(&[name, event]));
4248
GRPCRunner {
4349
cq_next_duration_his,
4450
execute_duration_his,
4551
wait_duration_his,
4652
event_counter,
53+
last_flush_time: Instant::now(),
4754
}
4855
}
4956

5057
// event loop
51-
pub fn run(&self, tx: mpsc::Sender<CompletionQueue>) {
58+
pub fn run(&mut self, tx: mpsc::Sender<CompletionQueue>) {
5259
let cq = Arc::new(CompletionQueueHandle::new());
5360
let worker_info = Arc::new(WorkQueue::new());
5461
let cq = CompletionQueue::new(cq, worker_info);
@@ -73,7 +80,21 @@ impl GRPCRunner {
7380
}
7481
self.execute_duration_his
7582
.observe(now.elapsed().as_secs_f64());
83+
self.maybe_flush();
84+
}
85+
}
86+
87+
fn maybe_flush(&mut self) {
88+
let now = Instant::now();
89+
if now.saturating_duration_since(self.last_flush_time)
90+
< std::time::Duration::from_millis(METRICS_FLUSH_INTERVAL)
91+
{
92+
return;
7693
}
94+
self.last_flush_time = now;
95+
self.cq_next_duration_his.flush();
96+
self.execute_duration_his.flush();
97+
self.wait_duration_his.flush();
7798
}
7899

79100
fn resolve(&self, tag: Box<CallTag>, cq: &CompletionQueue, success: bool) {
@@ -193,7 +214,7 @@ impl EnvBuilder {
193214
.as_ref()
194215
.map_or(format!("grpc-pool-{i}"), |prefix| format!("{prefix}-{i}"));
195216
#[cfg(feature = "prometheus")]
196-
let runner = GRPCRunner::new(&name);
217+
let mut runner = GRPCRunner::new(&name);
197218
builder = builder.name(name);
198219
let after_start = self.after_start.clone();
199220
let before_stop = self.before_stop.clone();

src/metrics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ lazy_static! {
2929
"grpc_pool_execute_duration",
3030
"Bucketed histogram of grpc pool execute duration for every time",
3131
&["name"],
32-
exponential_buckets(1e-7, 2.0, 20).unwrap() // 100ns ~ 100ms
32+
exponential_buckets(1e-7, 2.0, 30).unwrap() // 100ns ~ 100s
3333
)
3434
.unwrap();
3535

0 commit comments

Comments
 (0)