Skip to content

Commit 5a68f14

Browse files
authored
feat: databend-metabench: benchmark list (#18745)
1 parent 4d00bfb commit 5a68f14

File tree

5 files changed

+157
-30
lines changed

5 files changed

+157
-30
lines changed

src/meta/binaries/metabench/main.rs

Lines changed: 112 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
use std::collections::BTreeMap;
1818
use std::fmt::Debug;
1919
use std::fmt::Display;
20+
use std::sync::atomic::AtomicU64;
21+
use std::sync::atomic::Ordering;
2022
use std::sync::Arc;
2123
use std::time::Duration;
2224
use std::time::Instant;
@@ -55,6 +57,7 @@ use databend_common_tracing::LogFormat;
5557
use databend_common_tracing::StderrConfig;
5658
use databend_common_version::BUILD_INFO;
5759
use databend_common_version::METASRV_COMMIT_VERSION;
60+
use futures::TryStreamExt;
5861
use serde::Deserialize;
5962
use serde::Serialize;
6063
use tokio::time::sleep;
@@ -84,6 +87,12 @@ struct Config {
8487
/// "get_table": single get_table() rpc;
8588
/// "table_copy_file": upsert table with copy file.
8689
/// "table_copy_file:{"file_cnt":100}": upsert table with 100 copy files. After ":" is a json config string
90+
/// "list": list all keys with default prefix pattern;
91+
/// "list:{"limit":50}": list up to 50 keys with default prefix pattern;
92+
/// "list:{"prefix":"custom_prefix"}": list all keys with custom prefix;
93+
/// "list:{"prefix":"custom_prefix","limit":50}": list up to 50 keys with custom prefix;
94+
/// "list:{"interval_ms":100}": add 100ms delay between reading each item (slow client simulation);
95+
/// "list:{"prefix":"custom_prefix","limit":50,"interval_ms":100}": combine all options;
8796
#[clap(long, default_value = "upsert_kv")]
8897
pub rpc: String,
8998
}
@@ -117,39 +126,33 @@ async fn main() {
117126
return;
118127
}
119128

129+
let client = MetaGrpcClient::try_create_with_features(
130+
vec![config.grpc_api_address.clone()],
131+
&BUILD_INFO,
132+
"root",
133+
"xxx",
134+
None,
135+
None,
136+
None,
137+
required::read_write(),
138+
)
139+
.unwrap();
140+
120141
let start = Instant::now();
121142
let mut client_num = 0;
122143
let mut handles = Vec::new();
123144
while client_num < config.client {
124145
client_num += 1;
125-
let addr = config.grpc_api_address.clone();
126146
let rpc = config.rpc.clone();
127147
let prefix = config.prefix;
128148

129149
let cmd_and_param = rpc.splitn(2, ':').collect::<Vec<_>>();
130150
let cmd = cmd_and_param[0].to_string();
131151
let param = cmd_and_param.get(1).unwrap_or(&"").to_string();
132152

133-
let handle = runtime::spawn(async move {
134-
let client = MetaGrpcClient::try_create_with_features(
135-
vec![addr.to_string()],
136-
&BUILD_INFO,
137-
"root",
138-
"xxx",
139-
None,
140-
None,
141-
None,
142-
required::read_write(),
143-
);
144-
145-
let client = match client {
146-
Ok(client) => client,
147-
Err(e) => {
148-
eprintln!("Failed to create client: {}", e);
149-
return;
150-
}
151-
};
153+
let client = client.clone();
152154

155+
let handle = runtime::spawn(async move {
153156
for i in 0..config.number {
154157
if cmd == "upsert_kv" {
155158
benchmark_upsert(&client, prefix, client_num, i).await;
@@ -161,6 +164,8 @@ async fn main() {
161164
benchmark_table_copy_file(&client, prefix, client_num, i, &param).await;
162165
} else if cmd == "semaphore" {
163166
benchmark_semaphore(&client, prefix, client_num, i, &param).await;
167+
} else if cmd == "list" {
168+
benchmark_list(&client, prefix, client_num, i, &param).await;
164169
} else {
165170
unreachable!("Invalid config.rpc: {}", rpc);
166171
}
@@ -447,6 +452,93 @@ async fn benchmark_semaphore(
447452
}
448453
}
449454

455+
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Default)]
456+
struct ListConfig {
457+
/// Maximum number of keys to return in the list operation.
458+
/// If None, all matching keys are returned.
459+
limit: Option<usize>,
460+
/// The prefix to search for. If None, uses the default pattern "{prefix}-{client_num}".
461+
prefix: Option<String>,
462+
/// Interval in milliseconds to wait between reading each item from the stream.
463+
/// This simulates a slow client. If None or 0, no delay is added.
464+
interval_ms: Option<u64>,
465+
}
466+
467+
/// Benchmark listing keys with a prefix.
468+
async fn benchmark_list(
469+
client: &Arc<ClientHandle>,
470+
prefix: u64,
471+
client_num: u64,
472+
i: u64,
473+
param: &str,
474+
) {
475+
let name = format!("client[{:>05}]-{}th", client_num, i);
476+
477+
let config = if param.is_empty() {
478+
ListConfig::default()
479+
} else {
480+
serde_json::from_str(param).unwrap()
481+
};
482+
483+
let key_prefix = config
484+
.prefix
485+
.clone()
486+
.unwrap_or_else(|| format!("{}-{}", prefix, client_num));
487+
488+
if i % 100 == 0 {
489+
println!("{:>10}-th list using prefix: '{}'", name, key_prefix);
490+
}
491+
492+
let start_time = Instant::now();
493+
let stream_res = client.list(&key_prefix).await;
494+
let stream_returned_time = Instant::now();
495+
496+
static TOTAL: AtomicU64 = AtomicU64::new(0);
497+
TOTAL.fetch_add(1, Ordering::Relaxed);
498+
499+
println!(
500+
"{:>10}-th list stream returned in {:?}, err: {:?}, total streams: {}",
501+
name,
502+
stream_returned_time.duration_since(start_time),
503+
stream_res.as_ref().err(),
504+
TOTAL.load(Ordering::Relaxed)
505+
);
506+
507+
let mut strm = match stream_res {
508+
Ok(stream) => stream,
509+
Err(e) => {
510+
println!("{:>10}-th list error: {:?}", name, e);
511+
return;
512+
}
513+
};
514+
515+
let mut count = 0;
516+
517+
while let Ok(Some(_item)) = strm.try_next().await {
518+
count += 1;
519+
520+
// Apply interval delay if specified (simulate slow client)
521+
if let Some(interval_ms) = config.interval_ms {
522+
if interval_ms > 0 {
523+
sleep(Duration::from_millis(interval_ms)).await;
524+
}
525+
}
526+
527+
// Apply limit if specified
528+
if let Some(limit) = config.limit {
529+
if count >= limit {
530+
break;
531+
}
532+
}
533+
534+
if count % 10 == 0 {
535+
println!("{:>10}-th list found {} keys", name, count);
536+
}
537+
}
538+
539+
println!("{:>10}-th list found {} keys", name, count);
540+
}
541+
450542
fn print_res<D: Debug>(i: u64, typ: impl Display, res: &D) {
451543
if i % 100 == 0 {
452544
println!("{:>10}-th {} result: {:?}", i, typ, res);

src/meta/raft-store/src/leveled_store/leveled_map/impl_scoped_seq_bounded_range.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@
1414

1515
use std::io::Error;
1616
use std::ops::RangeBounds;
17+
use std::time::Instant;
1718

1819
use futures_util::StreamExt;
20+
use log::debug;
1921
use log::warn;
2022
use map_api::mvcc;
2123
use map_api::mvcc::ViewKey;
@@ -60,7 +62,24 @@ where
6062
// writable level
6163

6264
let (vec, immutable) = {
65+
let start = Instant::now();
66+
debug!(
67+
"Level.writable::range(start={:?}, end={:?})",
68+
range.start_bound(),
69+
range.end_bound()
70+
);
71+
6372
let inner = self.data.lock().unwrap();
73+
74+
debug!(
75+
"Level.writable::range(start={:?}, end={:?}) acquired lock, took {:?}, writable: kv.len={}, expire.len={}",
76+
range.start_bound(),
77+
range.end_bound(),
78+
start.elapsed(),
79+
inner.writable.kv.inner.len(),
80+
inner.writable.expire.inner.len()
81+
);
82+
6483
let it = inner
6584
.writable
6685
.get_sub_table()

src/meta/service/src/api/grpc/grpc_service.rs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -333,19 +333,24 @@ impl MetaService for MetaServiceImpl {
333333
let req_str = format!("ReadRequest: {:?}", req);
334334

335335
ThreadTracker::tracking_future(async move {
336-
let _guard = InFlightRead::guard();
336+
let guard = InFlightRead::guard();
337337
let start = Instant::now();
338338
let (endpoint, strm) = self.handle_kv_read_v1(req).in_span(root).await?;
339339

340340
// Counter to track total items sent
341341
let count = Arc::new(AtomicU64::new(0));
342342
let count2 = count.clone();
343343

344-
let strm = strm.map(move |item| {
345-
network_metrics::incr_stream_sent_item(req_typ);
346-
count2.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
347-
item
348-
});
344+
let strm = strm
345+
.map(move |x| {
346+
let _g = &guard; // hold the guard until the stream is done.
347+
x
348+
})
349+
.map(move |item| {
350+
network_metrics::incr_stream_sent_item(req_typ);
351+
count2.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
352+
item
353+
});
349354

350355
// Log the total time and item count when the stream is finished.
351356
let strm = OnCompleteStream::new(strm, move || {
@@ -404,7 +409,7 @@ impl MetaService for MetaServiceImpl {
404409
&self,
405410
_request: Request<databend_common_meta_types::protobuf::Empty>,
406411
) -> Result<Response<Self::ExportStream>, Status> {
407-
let _guard = InFlightRead::guard();
412+
let guard = InFlightRead::guard();
408413

409414
let meta_node = self.try_get_meta_node()?;
410415

@@ -415,6 +420,10 @@ impl MetaService for MetaServiceImpl {
415420
// - Convert Vec<String> to ExportedChunk;
416421
// - Convert TryChunkError<_, io::Error> to Status;
417422
let s = strm
423+
.map(move |x| {
424+
let _g = &guard; // hold the guard until the stream is done.
425+
x
426+
})
418427
.try_chunks(chunk_size)
419428
.map_ok(|chunk: Vec<String>| ExportedChunk { data: chunk })
420429
.map_err(|e: TryChunksError<_, io::Error>| Status::internal(e.1.to_string()));
@@ -433,7 +442,7 @@ impl MetaService for MetaServiceImpl {
433442
&self,
434443
request: Request<pb::ExportRequest>,
435444
) -> Result<Response<Self::ExportV1Stream>, Status> {
436-
let _guard = InFlightRead::guard();
445+
let guard = InFlightRead::guard();
437446

438447
let meta_node = self.try_get_meta_node()?;
439448

@@ -444,6 +453,10 @@ impl MetaService for MetaServiceImpl {
444453
// - Convert Vec<String> to ExportedChunk;
445454
// - Convert TryChunkError<_, io::Error> to Status;
446455
let s = strm
456+
.map(move |x| {
457+
let _g = &guard; // hold the guard until the stream is done.
458+
x
459+
})
447460
.try_chunks(chunk_size)
448461
.map_ok(|chunk: Vec<String>| ExportedChunk { data: chunk })
449462
.map_err(|e: TryChunksError<_, io::Error>| Status::internal(e.1.to_string()));

src/meta/service/src/meta_service/meta_node.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,10 @@ impl MetaNode {
337337
let socket_addr = ip_port.parse::<std::net::SocketAddr>()?;
338338
let node_id = meta_node.raft_store.id;
339339

340-
let srv = tonic::transport::Server::builder().add_service(raft_server);
340+
let srv = tonic::transport::Server::builder()
341+
// .concurrency_limit_per_connection()
342+
// .timeout(Duration::from_secs(60))
343+
.add_service(raft_server);
341344

342345
let h = databend_common_base::runtime::spawn(async move {
343346
srv.serve_with_shutdown(socket_addr, async move {

src/meta/service/src/store/store_inner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ impl RaftStoreInner {
202202

203203
let mut compactor = self.state_machine().new_compactor(compactor_permit);
204204

205-
info!("do_build_snapshot compactor created: {:?}", compactor);
205+
info!("do_build_snapshot compactor created");
206206

207207
let (mut sys_data, mut strm) = compactor
208208
.compact_into_stream()

0 commit comments

Comments
 (0)