Skip to content

Commit 1b75e7e

Browse files
authored
refactor: meta: unify FetchAddU64 into FetchIncreaseU64 (#18847)
Rename FetchAddU64 to FetchIncreaseU64 and add `max_value` field to support unified atomic operation semantics. The new operation computes: `after = max(current_value, max_value) + delta`. This unified approach supports three use cases through a single operation: - Pure ADD: max_value=0, delta=N (backward compatible) - Pure MAX: max_value=N, delta=0 (new capability) - Combined: max_value=N, delta=M (ensures minimum then adds) The refactoring maintains backward compatibility through the existing `fetch_add_u64()` helper method which automatically sets max_value=0. Two new helper methods are added: - `fetch_increase_u64(key, max_value, delta)` for combined ops - `fetch_max_u64(key, max_value)` for pure max operations
1 parent 2567265 commit 1b75e7e

File tree

17 files changed

+192
-134
lines changed

17 files changed

+192
-134
lines changed

src/meta/api/src/auto_increment_nextval_impl.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use databend_common_meta_app::tenant::ToTenant;
1919
use databend_common_meta_kvapi::kvapi;
2020
use databend_common_meta_kvapi::kvapi::Key;
2121
use databend_common_meta_types::anyerror::func_name;
22-
use databend_common_meta_types::protobuf::FetchAddU64Response;
22+
use databend_common_meta_types::protobuf::FetchIncreaseU64Response;
2323
use databend_common_meta_types::MetaError;
2424
use databend_common_meta_types::TxnOp;
2525
use databend_common_meta_types::TxnRequest;
@@ -46,7 +46,7 @@ where KV: kvapi::KVApi<Error = MetaError> + ?Sized
4646
self,
4747
tenant: impl ToTenant,
4848
count: u64,
49-
) -> Result<Result<FetchAddU64Response, AutoIncrementError>, MetaTxnError> {
49+
) -> Result<Result<FetchIncreaseU64Response, AutoIncrementError>, MetaTxnError> {
5050
debug!("{}", func_name!());
5151

5252
// Key for the sequence number value.
@@ -69,7 +69,7 @@ where KV: kvapi::KVApi<Error = MetaError> + ?Sized
6969
);
7070
debug_assert!(succ);
7171

72-
let resp = responses[0].try_as_fetch_add_u64().unwrap();
72+
let resp = responses[0].try_as_fetch_increase_u64().unwrap();
7373
let got_delta = resp.delta();
7474

7575
if got_delta < delta {

src/meta/api/src/sequence_nextval_impl.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ where KV: kvapi::KVApi<Error = MetaError> + ?Sized
127127
);
128128

129129
if succ {
130-
let resp = responses[0].try_as_fetch_add_u64().unwrap();
130+
let resp = responses[0].try_as_fetch_increase_u64().unwrap();
131131

132132
let got_delta = resp.delta();
133133

src/meta/client/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,12 @@ use semver::Version;
158158
/// - 2025-09-30: since 1.2.823
159159
/// 🖥 server: store raft-log proposing time `proposed_at_ms` in `KVMeta`.
160160
///
161-
/// - 2025-09-27: since TODO: update when merged
161+
/// - 2025-09-27: since 1.2.823
162162
/// 👥 client: require 1.2.770, remove calling RPC kv_api
163163
///
164+
/// - 2025-10-16: since TODO
165+
/// 🖥 server: rename `FetchAddU64` to `FetchIncreaseU64`, add `max_value`.
166+
///
164167
/// Server feature set:
165168
/// ```yaml
166169
/// server_features:

src/meta/kvapi-test-suite/src/kvapi_test_suite.rs

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use databend_common_meta_kvapi::kvapi::KvApiExt;
2020
use databend_common_meta_types::normalize_meta::NormalizeMeta;
2121
use databend_common_meta_types::protobuf as pb;
2222
use databend_common_meta_types::protobuf::BooleanExpression;
23-
use databend_common_meta_types::protobuf::FetchAddU64Response;
23+
use databend_common_meta_types::protobuf::FetchIncreaseU64Response;
2424
use databend_common_meta_types::protobuf::KvMeta;
2525
use databend_common_meta_types::txn_condition;
2626
use databend_common_meta_types::txn_op;
@@ -1259,8 +1259,8 @@ impl TestSuite {
12591259
let resp = kv.transaction(txn).await?;
12601260

12611261
assert_eq!(
1262-
resp.responses[0].try_as_fetch_add_u64().unwrap(),
1263-
&FetchAddU64Response {
1262+
resp.responses[0].try_as_fetch_increase_u64().unwrap(),
1263+
&FetchIncreaseU64Response {
12641264
key: "k1".to_string(),
12651265
before_seq: 0,
12661266
before: 0,
@@ -1269,8 +1269,8 @@ impl TestSuite {
12691269
}
12701270
);
12711271
assert_eq!(
1272-
resp.responses[1].try_as_fetch_add_u64().unwrap(),
1273-
&FetchAddU64Response {
1272+
resp.responses[1].try_as_fetch_increase_u64().unwrap(),
1273+
&FetchIncreaseU64Response {
12741274
key: "k2".to_string(),
12751275
before_seq: 0,
12761276
before: 0,
@@ -1290,8 +1290,8 @@ impl TestSuite {
12901290
let resp = kv.transaction(txn).await?;
12911291

12921292
assert_eq!(
1293-
resp.responses[0].try_as_fetch_add_u64().unwrap(),
1294-
&FetchAddU64Response {
1293+
resp.responses[0].try_as_fetch_increase_u64().unwrap(),
1294+
&FetchIncreaseU64Response {
12951295
key: "k1".to_string(),
12961296
before_seq: 1,
12971297
before: 2,
@@ -1300,8 +1300,8 @@ impl TestSuite {
13001300
}
13011301
);
13021302
assert_eq!(
1303-
resp.responses[1].try_as_fetch_add_u64().unwrap(),
1304-
&FetchAddU64Response {
1303+
resp.responses[1].try_as_fetch_increase_u64().unwrap(),
1304+
&FetchIncreaseU64Response {
13051305
key: "k2".to_string(),
13061306
before_seq: 2,
13071307
before: 3,
@@ -1322,8 +1322,8 @@ impl TestSuite {
13221322
let resp = kv.transaction(txn).await?;
13231323

13241324
assert_eq!(
1325-
resp.responses[0].try_as_fetch_add_u64().unwrap(),
1326-
&FetchAddU64Response {
1325+
resp.responses[0].try_as_fetch_increase_u64().unwrap(),
1326+
&FetchIncreaseU64Response {
13271327
key: "k1".to_string(),
13281328
before_seq: 3,
13291329
before: 4,
@@ -1332,8 +1332,8 @@ impl TestSuite {
13321332
}
13331333
);
13341334
assert_eq!(
1335-
resp.responses[1].try_as_fetch_add_u64().unwrap(),
1336-
&FetchAddU64Response {
1335+
resp.responses[1].try_as_fetch_increase_u64().unwrap(),
1336+
&FetchIncreaseU64Response {
13371337
key: "k2".to_string(),
13381338
before_seq: 4,
13391339
before: 6,
@@ -1342,8 +1342,8 @@ impl TestSuite {
13421342
}
13431343
);
13441344
assert_eq!(
1345-
resp.responses[2].try_as_fetch_add_u64().unwrap(),
1346-
&FetchAddU64Response {
1345+
resp.responses[2].try_as_fetch_increase_u64().unwrap(),
1346+
&FetchIncreaseU64Response {
13471347
key: "k2".to_string(),
13481348
before_seq: 6,
13491349
before: u64::MAX / 2 + 6,
@@ -1378,8 +1378,8 @@ impl TestSuite {
13781378
let resp = kv.transaction(txn).await?;
13791379

13801380
assert_eq!(
1381-
resp.responses[0].try_as_fetch_add_u64().unwrap(),
1382-
&FetchAddU64Response {
1381+
resp.responses[0].try_as_fetch_increase_u64().unwrap(),
1382+
&FetchIncreaseU64Response {
13831383
key: "k1".to_string(),
13841384
before_seq: 0,
13851385
before: 0,
@@ -1388,8 +1388,8 @@ impl TestSuite {
13881388
}
13891389
);
13901390
assert_eq!(
1391-
resp.responses[1].try_as_fetch_add_u64().unwrap(),
1392-
&FetchAddU64Response {
1391+
resp.responses[1].try_as_fetch_increase_u64().unwrap(),
1392+
&FetchIncreaseU64Response {
13931393
key: "k1".to_string(),
13941394
before_seq: 1,
13951395
before: 2,
@@ -1398,8 +1398,8 @@ impl TestSuite {
13981398
}
13991399
);
14001400
assert_eq!(
1401-
resp.responses[2].try_as_fetch_add_u64().unwrap(),
1402-
&FetchAddU64Response {
1401+
resp.responses[2].try_as_fetch_increase_u64().unwrap(),
1402+
&FetchIncreaseU64Response {
14031403
key: "k1".to_string(),
14041404
before_seq: 1,
14051405
before: 2,
@@ -1408,8 +1408,8 @@ impl TestSuite {
14081408
}
14091409
);
14101410
assert_eq!(
1411-
resp.responses[3].try_as_fetch_add_u64().unwrap(),
1412-
&FetchAddU64Response {
1411+
resp.responses[3].try_as_fetch_increase_u64().unwrap(),
1412+
&FetchIncreaseU64Response {
14131413
key: "k2".to_string(),
14141414
before_seq: 0,
14151415
before: 0,
@@ -1430,8 +1430,8 @@ impl TestSuite {
14301430
let resp = kv.transaction(txn).await?;
14311431

14321432
assert_eq!(
1433-
resp.responses[0].try_as_fetch_add_u64().unwrap(),
1434-
&FetchAddU64Response {
1433+
resp.responses[0].try_as_fetch_increase_u64().unwrap(),
1434+
&FetchIncreaseU64Response {
14351435
key: "k1".to_string(),
14361436
before_seq: 2,
14371437
before: 6,
@@ -1440,8 +1440,8 @@ impl TestSuite {
14401440
}
14411441
);
14421442
assert_eq!(
1443-
resp.responses[1].try_as_fetch_add_u64().unwrap(),
1444-
&FetchAddU64Response {
1443+
resp.responses[1].try_as_fetch_increase_u64().unwrap(),
1444+
&FetchIncreaseU64Response {
14451445
key: "k1".to_string(),
14461446
before_seq: 4,
14471447
before: 8,
@@ -1450,8 +1450,8 @@ impl TestSuite {
14501450
}
14511451
);
14521452
assert_eq!(
1453-
resp.responses[2].try_as_fetch_add_u64().unwrap(),
1454-
&FetchAddU64Response {
1453+
resp.responses[2].try_as_fetch_increase_u64().unwrap(),
1454+
&FetchIncreaseU64Response {
14551455
key: "k1".to_string(),
14561456
before_seq: 4,
14571457
before: 8,

src/meta/process/src/kv_processor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ where F: Fn(&str, Vec<u8>) -> Result<Vec<u8>, anyhow::Error>
210210
}
211211
Request::Delete(_) => {}
212212
Request::DeleteByPrefix(_) => {}
213-
Request::FetchAddU64(_) => {}
213+
Request::FetchIncreaseU64(_) => {}
214214
Request::PutSequential(_) => {}
215215
}
216216

src/meta/raft-store/src/applier/mod.rs

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use databend_common_meta_types::node::Node;
2525
use databend_common_meta_types::protobuf as pb;
2626
use databend_common_meta_types::protobuf::boolean_expression::CombiningOperator;
2727
use databend_common_meta_types::protobuf::BooleanExpression;
28-
use databend_common_meta_types::protobuf::FetchAddU64;
28+
use databend_common_meta_types::protobuf::FetchIncreaseU64;
2929
use databend_common_meta_types::raft_types::Entry;
3030
use databend_common_meta_types::raft_types::EntryPayload;
3131
use databend_common_meta_types::raft_types::StoredMembership;
@@ -509,8 +509,10 @@ where SM: StateMachineApi<SysData> + 'static
509509
let r = self.txn_execute_delete_by_prefix(delete_by_prefix).await?;
510510
TxnOpResponse::new(r)
511511
}
512-
Request::FetchAddU64(fetch_add_u64) => {
513-
let r = self.txn_execute_fetch_add_u64(fetch_add_u64).await?;
512+
Request::FetchIncreaseU64(fetch_increase_u64) => {
513+
let r = self
514+
.txn_execute_fetch_increase_u64(fetch_increase_u64)
515+
.await?;
514516
TxnOpResponse::new(r)
515517
}
516518
Request::PutSequential(put_sequential) => {
@@ -595,10 +597,10 @@ where SM: StateMachineApi<SysData> + 'static
595597
Ok(del_resp)
596598
}
597599

598-
async fn txn_execute_fetch_add_u64(
600+
async fn txn_execute_fetch_increase_u64(
599601
&mut self,
600-
req: &FetchAddU64,
601-
) -> Result<pb::FetchAddU64Response, io::Error> {
602+
req: &FetchIncreaseU64,
603+
) -> Result<pb::FetchIncreaseU64Response, io::Error> {
602604
let before_seqv = self.sm.get_maybe_expired_kv(&req.key).await?;
603605

604606
let before_seq = before_seqv.seq();
@@ -610,7 +612,7 @@ where SM: StateMachineApi<SysData> + 'static
610612
Ok(v) => v,
611613
Err(e) => {
612614
warn!(
613-
"fetch_add_u64: failed to deserialize u64 value: {:?}, error: {}",
615+
"fetch_increase_u64: failed to deserialize u64 value: {:?}, error: {}",
614616
bytes, e
615617
);
616618
0
@@ -622,21 +624,24 @@ where SM: StateMachineApi<SysData> + 'static
622624

623625
if let Some(match_seq) = req.match_seq {
624626
if match_seq != before_seq {
625-
let response =
626-
pb::FetchAddU64Response::new_unchanged(&req.key, SeqV::new(before_seq, before));
627+
let response = pb::FetchIncreaseU64Response::new_unchanged(
628+
&req.key,
629+
SeqV::new(before_seq, before),
630+
);
627631
return Ok(response);
628632
}
629633
}
630634

631-
let after = before.saturating_add_signed(req.delta);
635+
// First take max, then add delta
636+
let after = std::cmp::max(before, req.max_value).saturating_add_signed(req.delta);
632637

633638
let (_prev, result) = {
634639
let after_value = serde_json::to_vec(&after).expect("serialize u64 to json");
635640
let upsert = UpsertKV::update(&req.key, &after_value);
636641
self.upsert_kv(&upsert).await?
637642
};
638643

639-
let response = pb::FetchAddU64Response::new(
644+
let response = pb::FetchIncreaseU64Response::new(
640645
&req.key,
641646
SeqV::new(before_seq, before),
642647
SeqV::new(result.seq(), after),
@@ -651,13 +656,16 @@ where SM: StateMachineApi<SysData> + 'static
651656
) -> Result<TxnPutResponse, io::Error> {
652657
// Step 1. Build sequential key
653658

654-
let fetch_add_u64 = FetchAddU64 {
659+
let fetch_increase_u64 = FetchIncreaseU64 {
655660
key: req.sequence_key.clone(),
656661
delta: 1,
657662
match_seq: None,
663+
max_value: 0,
658664
};
659665

660-
let fetch_add_response = self.txn_execute_fetch_add_u64(&fetch_add_u64).await?;
666+
let fetch_add_response = self
667+
.txn_execute_fetch_increase_u64(&fetch_increase_u64)
668+
.await?;
661669
let next_seq = fetch_add_response.before;
662670

663671
let key = req.build_key(next_seq);

src/meta/types/build.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,11 +155,11 @@ fn build_proto() {
155155
r#"#[serde(skip_serializing_if = "Vec::is_empty")] #[serde(default)]"#,
156156
)
157157
.type_attribute(
158-
"FetchAddU64",
158+
"FetchIncreaseU64",
159159
"#[derive(Eq, serde::Serialize, serde::Deserialize, deepsize::DeepSizeOf)]",
160160
)
161161
.type_attribute(
162-
"FetchAddU64Response",
162+
"FetchIncreaseU64Response",
163163
"#[derive(Eq, serde::Serialize, serde::Deserialize, deepsize::DeepSizeOf)]",
164164
)
165165
.type_attribute(

src/meta/types/proto/meta.proto

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ message TxnOp {
183183
TxnDeleteRequest delete = 3;
184184
TxnDeleteByPrefixRequest delete_by_prefix = 4;
185185

186-
FetchAddU64 fetch_add_u64 = 5;
186+
FetchIncreaseU64 fetch_increase_u64 = 5;
187187
PutSequential put_sequential = 6;
188188
}
189189
}
@@ -195,7 +195,7 @@ message TxnOpResponse {
195195
TxnDeleteResponse delete = 3;
196196
TxnDeleteByPrefixResponse delete_by_prefix = 4;
197197

198-
FetchAddU64Response fetch_add_u64 = 5;
198+
FetchIncreaseU64Response fetch_increase_u64 = 5;
199199
}
200200
}
201201

src/meta/types/proto/request.proto

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,27 +45,35 @@ message TxnGetResponse {
4545
optional SeqV value = 2;
4646
}
4747

48-
// Return the value by key, then add the delta to the key.
48+
// Fetch and increase the value: after = max(current, max_value) + delta
4949
//
50-
// This operation assume the value bytes is a json encoded `uint64`,
50+
// This operation assumes the value bytes is a json encoded `uint64`,
5151
// e.g. `1025` in bytes is `b"1025"`.
5252
// If the result is negative, it will be set to zero.
53-
message FetchAddU64 {
53+
//
54+
// Use cases:
55+
// - Pure ADD: max_value=0, delta=N -> adds N to current value
56+
// - Pure MAX: max_value=N, delta=0 -> ensures value is at least N
57+
// - Combined: max_value=N, delta=M -> ensures at least N, then adds M
58+
message FetchIncreaseU64 {
5459

55-
// The key to fetch and add the delta.
60+
// The key to fetch and increase.
5661
string key = 1;
5762

5863
// Assert the seq number of the record before update.
5964
// - If it does not match, no update will be made and the record seq number won't change.
6065
// - If it is None, the update will always be made.
6166
optional uint64 match_seq = 3;
6267

63-
// The delta to add to the value.
68+
// The delta to add to the value after taking max.
6469
int64 delta = 2;
70+
71+
// The minimum value to ensure before adding delta. Defaults to 0 for backward compatibility.
72+
uint64 max_value = 4;
6573
}
6674

67-
// Response for FetchAddU64, contains the value before and after `add`
68-
message FetchAddU64Response {
75+
// Response for FetchIncreaseU64, contains the value before and after increase
76+
message FetchIncreaseU64Response {
6977

7078
string key = 1;
7179

src/meta/types/src/proto_ext/conditional_operation_ext.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ mod tests {
115115
));
116116
assert!(matches!(
117117
cond_op.operations[3].request,
118-
Some(pb::txn_op::Request::FetchAddU64(_))
118+
Some(pb::txn_op::Request::FetchIncreaseU64(_))
119119
));
120120
}
121121

0 commit comments

Comments
 (0)