Skip to content

Commit e27e833

Browse files
authored
Merge pull request #9 from nomadiz/feat/tag-bucket
2 parents 5b6d97b + 4f0e7f1 commit e27e833

File tree

13 files changed

+180
-146
lines changed

13 files changed

+180
-146
lines changed

db/src/lib.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ mod interface;
33
#[macro_use]
44
mod mac;
55
mod constant;
6-
mod model;
6+
#[macro_use]
7+
pub mod model;
78
mod storage;
89
mod util;
910

db/src/mac/tx.rs

+18-35
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,14 @@ macro_rules! impl_global_transaction {
2929
}
3030

3131
// Count number of items
32-
async fn count(&mut self, cf: CF) -> Result<usize, Error> {
32+
async fn count(&mut self, tags: TagBucket) -> Result<usize, Error> {
3333
match self {
3434
$(
3535
#[cfg(feature = $feat)]
3636
Transaction {
3737
inner: Inner::$x(ds),
3838
..
39-
} => ds.count(cf).await,
39+
} => ds.count(tags).await,
4040
)*
4141
}
4242
}
@@ -55,101 +55,84 @@ macro_rules! impl_global_transaction {
5555
}
5656

5757
// Check if a key exists
58-
async fn exi<K: Into<Key> + Send>(&self, cf: CF, key: K) -> Result<bool, Error> {
58+
async fn exi<K: Into<Key> + Send>(&self, key: K, tags: TagBucket) -> Result<bool, Error> {
5959
match self {
6060
$(
6161
#[cfg(feature = $feat)]
6262
Transaction {
6363
inner: Inner::$x(ds),
6464
..
65-
} => ds.exi(cf, key).await,
65+
} => ds.exi(key, tags).await,
6666
)*
6767
}
6868
}
6969

7070
/// Fetch a key from the database
71-
async fn get<K: Into<Key> + Send>(&self, cf: CF, key: K) -> Result<Option<Val>, Error> {
71+
async fn get<K: Into<Key> + Send>(&self, key: K, tags: TagBucket) -> Result<Option<Val>, Error> {
7272
match self {
7373
$(
7474
#[cfg(feature = $feat)]
7575
Transaction {
7676
inner: Inner::$x(ds),
7777
..
78-
} => ds.get(cf, key).await,
79-
)*
80-
}
81-
}
82-
83-
// OPTIONAL Fetch multiple keys from the database
84-
async fn multi_get<K: Into<Key> + Send + AsRef<[u8]>>(
85-
&self,
86-
cf: CF,
87-
keys: Vec<K>,
88-
) -> Result<Vec<Option<Val>>, Error> {
89-
match self {
90-
$(
91-
#[cfg(feature = $feat)]
92-
Transaction {
93-
inner: Inner::$x(ds),
94-
..
95-
} => ds.multi_get(cf, keys).await,
78+
} => ds.get(key, tags).await,
9679
)*
9780
}
9881
}
9982

10083
/// Insert or update a key in the database
10184
async fn set<K: Into<Key> + Send, V: Into<Key> + Send>(
10285
&mut self,
103-
cf: CF,
10486
key: K,
10587
val: V,
88+
tags: TagBucket
10689
) -> Result<(), Error> {
10790
match self {
10891
$(
10992
#[cfg(feature = $feat)]
11093
Transaction {
11194
inner: Inner::$x(ds),
11295
..
113-
} => ds.set(cf, key, val).await,
96+
} => ds.set(key, val, tags).await,
11497
)*
11598
}
11699
}
117100

118101
/// Insert a key if it doesn't exist in the database
119102
async fn put<K: Into<Key> + Send, V: Into<Key> + Send>(
120103
&mut self,
121-
cf: CF,
122104
key: K,
123105
val: V,
106+
tags: TagBucket
124107
) -> Result<(), Error> {
125108
match self {
126109
$(
127110
#[cfg(feature = $feat)]
128111
Transaction {
129112
inner: Inner::$x(ds),
130113
..
131-
} => ds.put(cf, key, val).await,
114+
} => ds.put(key, val, tags).await,
132115
)*
133116
}
134117
}
135118

136119
/// Delete a key
137-
async fn del<K: Into<Key> + Send>(&mut self, cf: CF, key: K) -> Result<(), Error> {
120+
async fn del<K: Into<Key> + Send>(&mut self, key: K, tags: TagBucket) -> Result<(), Error> {
138121
match self {
139122
$(
140123
#[cfg(feature = $feat)]
141124
Transaction {
142125
inner: Inner::$x(ds),
143126
..
144-
} => ds.del(cf, key ).await,
127+
} => ds.del(key, tags).await,
145128
)*
146129
}
147130
}
148131

149132
async fn prefix_iterate<P>(
150133
&self,
151-
cf: CF,
152134
prefix: P,
135+
tags: TagBucket
153136
) -> Result<Vec<Result<(Val, Val), Error>>, Error>
154137
where
155138
P: Into<Key> + Send,
@@ -160,15 +143,15 @@ macro_rules! impl_global_transaction {
160143
Transaction {
161144
inner: Inner::$x(ds),
162145
..
163-
} => ds.prefix_iterate(cf, prefix).await,
146+
} => ds.prefix_iterate(prefix, tags).await,
164147
)*
165148
}
166149
}
167150

168151
async fn suffix_iterate<S>(
169152
&self,
170-
cf: CF,
171153
suffix: S,
154+
tags: TagBucket,
172155
) -> Result<Vec<Result<(Val, Val), Error>>, Error>
173156
where
174157
S: Into<Key> + Send,
@@ -179,18 +162,18 @@ macro_rules! impl_global_transaction {
179162
Transaction {
180163
inner: Inner::$x(ds),
181164
..
182-
} => ds.suffix_iterate(cf, suffix).await,
165+
} => ds.suffix_iterate(suffix, tags).await,
183166
)*
184167
}
185168
}
186169

187-
async fn iterate(&self, cf: CF) -> Result<Vec<Result<(Val, Val), Error>>, Error> {
170+
async fn iterate(&self, tags: TagBucket) -> Result<Vec<Result<(Val, Val), Error>>, Error> {
188171
match self {
189172
$(
190173
Transaction {
191174
inner: Inner::$x(ds),
192175
..
193-
} => ds.iterate(cf).await,
176+
} => ds.iterate(tags).await,
194177
)*
195178
}
196179
}

db/src/model/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
/// Model
22
mod adapter;
3+
mod tag;
34
mod tx;
45

56
pub use adapter::*;
7+
pub use tag::*;
68
pub use tx::*;

db/src/model/tag.rs

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
use std::collections::HashMap;
2+
3+
type TagKey = &'static str;
4+
type TagValue = String;
5+
type TagBucketInner = HashMap<TagKey, TagValue>;
6+
7+
#[derive(Clone, Default)]
8+
pub struct TagBucket(TagBucketInner);
9+
10+
#[macro_export]
11+
macro_rules! tag {
12+
($($key: expr => $value: expr),*) => {{
13+
#[allow(unused_mut)]
14+
let mut map = std::collections::HashMap::default();
15+
$(
16+
map.insert($key, $value);
17+
)*
18+
$crate::TagBucket::new(map)
19+
}};
20+
}
21+
22+
impl TagBucket {
23+
pub fn new(map: TagBucketInner) -> TagBucket {
24+
TagBucket(map)
25+
}
26+
27+
pub fn get(&self, key: TagKey) -> Option<TagValue> {
28+
self.0.get(key).cloned()
29+
}
30+
31+
pub fn unchecked_get(&self, key: TagKey) -> TagValue {
32+
self.0.get(key).unwrap().clone()
33+
}
34+
35+
pub fn get_bytes(&self, key: TagKey) -> Option<Vec<u8>> {
36+
let wrapped_value = self.0.get(key).cloned();
37+
if let Some(v) = wrapped_value {
38+
return Some(v.as_bytes().to_vec());
39+
}
40+
None
41+
}
42+
}

db/src/model/tx.rs

+11-18
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use crate::{
55
KeyValuePair,
66
},
77
util::now,
8+
TagBucket,
89
};
910
use async_trait::async_trait;
1011
use futures::lock::Mutex;
@@ -59,59 +60,51 @@ pub trait SimpleTransaction {
5960
async fn cancel(&mut self) -> Result<(), Error>;
6061

6162
// Count number of items
62-
async fn count(&mut self, cf: CF) -> Result<usize, Error>;
63+
async fn count(&mut self, tags: TagBucket) -> Result<usize, Error>;
6364

6465
// Commit a transaction
6566
async fn commit(&mut self) -> Result<(), Error>;
6667

6768
// Check if a key exists
68-
async fn exi<K: Into<Key> + Send>(&self, cf: CF, key: K) -> Result<bool, Error>;
69+
async fn exi<K: Into<Key> + Send>(&self, key: K, tags: TagBucket) -> Result<bool, Error>;
6970

7071
/// Fetch a key from the database
71-
async fn get<K: Into<Key> + Send>(&self, cf: CF, key: K) -> Result<Option<Val>, Error>;
72+
async fn get<K: Into<Key> + Send>(&self, key: K, tags: TagBucket)
73+
-> Result<Option<Val>, Error>;
7274

7375
/// Insert or update a key in the database
7476
async fn set<K: Into<Key> + Send, V: Into<Key> + Send>(
7577
&mut self,
76-
cf: CF,
7778
key: K,
7879
val: V,
80+
tags: TagBucket,
7981
) -> Result<(), Error>;
8082

8183
/// Insert a key if it doesn't exist in the database
8284
async fn put<K: Into<Key> + Send, V: Into<Key> + Send>(
8385
&mut self,
84-
cf: CF,
8586
key: K,
8687
val: V,
88+
tags: TagBucket,
8789
) -> Result<(), Error>;
8890

8991
/// Delete a key
90-
async fn del<K: Into<Key> + Send>(&mut self, cf: CF, key: K) -> Result<(), Error>;
91-
92-
// OPTIONAL Fetch multiple keys from the database
93-
async fn multi_get<K: Into<Key> + Send + AsRef<[u8]>>(
94-
&self,
95-
_cf: CF,
96-
_keys: Vec<K>,
97-
) -> Result<Vec<Option<Val>>, Error> {
98-
todo!();
99-
}
92+
async fn del<K: Into<Key> + Send>(&mut self, key: K, tags: TagBucket) -> Result<(), Error>;
10093

10194
// Iterate elements in key value store
102-
async fn iterate(&self, cf: CF) -> Result<Vec<Result<KeyValuePair, Error>>, Error>;
95+
async fn iterate(&self, tags: TagBucket) -> Result<Vec<Result<KeyValuePair, Error>>, Error>;
10396

10497
// Iterate elements with prefixx in key value store
10598
async fn prefix_iterate<P: Into<Key> + Send>(
10699
&self,
107-
cf: CF,
108100
prefix: P,
101+
tags: TagBucket,
109102
) -> Result<Vec<Result<KeyValuePair, Error>>, Error>;
110103

111104
// Iterate elements with prefixx in key value store
112105
async fn suffix_iterate<S: Into<Key> + Send>(
113106
&self,
114-
cf: CF,
115107
suffix: S,
108+
tags: TagBucket,
116109
) -> Result<Vec<Result<KeyValuePair, Error>>, Error>;
117110
}

db/src/storage/ds.rs

+8-12
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ impl Datastore {
112112
mod test {
113113
use crate::{
114114
constant::{ColumnFamily, COLUMN_FAMILIES},
115-
SimpleTransaction,
115+
tag, SimpleTransaction,
116116
};
117117

118118
use super::Datastore;
@@ -122,9 +122,6 @@ mod test {
122122
let db = Datastore::new("redb:../temp/v1.redb");
123123
assert!(db.transaction(false).await.is_ok());
124124

125-
// Seeding database
126-
let cf = None;
127-
128125
let key1 = i32::to_be_bytes(2001);
129126
let key2 = "new key new data hehe";
130127
let key3 = "this is a key";
@@ -134,9 +131,9 @@ mod test {
134131
let val3 = "this is a new value";
135132

136133
let mut tx = db.transaction(true).await.unwrap();
137-
tx.set(cf.clone(), key1, val1).await.unwrap();
138-
tx.set(cf.clone(), key2, val2).await.unwrap();
139-
tx.set(cf.clone(), key3, val3).await.unwrap();
134+
tx.set(key1, val1, tag!()).await.unwrap();
135+
tx.set(key2, val2, tag!()).await.unwrap();
136+
tx.set(key3, val3, tag!()).await.unwrap();
140137
tx.commit().await.unwrap();
141138
}
142139

@@ -147,8 +144,6 @@ mod test {
147144

148145
// Seeding database
149146
let cf_name = COLUMN_FAMILIES.get(&ColumnFamily::TestSuite).unwrap();
150-
let cf = Some(cf_name.to_string().into());
151-
152147
let key1 = i32::to_be_bytes(2100);
153148
let key2 = "cf => hello world";
154149
let key3 = "cf => this is a key";
@@ -158,9 +153,10 @@ mod test {
158153
let val3 = "cf => this is a new value";
159154

160155
let mut tx = db.transaction(true).await.unwrap();
161-
tx.set(cf.clone(), key1, val1).await.unwrap();
162-
tx.set(cf.clone(), key2, val2).await.unwrap();
163-
tx.set(cf.clone(), key3, val3).await.unwrap();
156+
let tags = tag!("column_family" => cf_name.clone());
157+
tx.set(key1, val1, tags.clone()).await.unwrap();
158+
tx.set(key2, val2, tags.clone()).await.unwrap();
159+
tx.set(key3, val3, tags.clone()).await.unwrap();
164160
tx.commit().await.unwrap();
165161
}
166162
}

db/src/storage/kvs/mod.rs

-2
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ mod redb;
33
#[cfg(feature = "kv-rocksdb")]
44
mod rocksdb;
55

6-
pub const LOG: &str = "edma::kvs";
7-
86
#[cfg(feature = "kv-redb")]
97
pub use self::redb::*;
108
#[cfg(feature = "kv-rocksdb")]

0 commit comments

Comments
 (0)