Skip to content

Commit

Permalink
Add lru queue to etcd cache
Browse files Browse the repository at this point in the history
  • Loading branch information
mrwwheat7 authored and pwang7 committed Mar 1, 2021
1 parent 9729e1c commit 16cc8c6
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 48 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ grpcio = { version = "0.7.0", default-features = false, features = ["protobuf-co
http = "0.2"
log = "0.4.11"
lockfree-cuckoohash = { git = "https://github.com/datenlord/lockfree-cuckoohash.git", rev = "be1e03c"}
priority-queue = "1.0.5"
protobuf = "2.16.2"
smol = "1.2.4"
thiserror = "1.0"
Expand Down
95 changes: 86 additions & 9 deletions src/kv/cache.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,117 @@
//! The implementation for Etcd cache.
use super::KeyValue;
use super::OverflowArithmetic;
use crate::watch::EtcdWatchRequest;
use lockfree_cuckoohash::{pin, LockFreeCuckooHash};
use priority_queue::PriorityQueue;
use smol::channel::Sender;
use smol::lock::Mutex;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use utilities::Cast;

/// Cache struct contains a lock-free hashTable.
#[derive(Clone)]
pub struct Cache {
/// map to store key value
hashtable: Arc<LockFreeCuckooHash<Vec<u8>, KeyValue>>,
/// lru queue to store the keys in hashtable.
lru_queue: Arc<Mutex<PriorityQueue<Vec<u8>, u64>>>,
/// map to store key and watch id
watch_id_table: Arc<LockFreeCuckooHash<Vec<u8>, i64>>,
/// Etcd watch request sender.
watch_sender: Sender<EtcdWatchRequest>,
}

impl Cache {
/// Create a new `Cache` with specified size.
pub fn new(size: usize) -> Self {
pub fn new(size: usize, sender: Sender<EtcdWatchRequest>) -> Self {
Self {
hashtable: Arc::new(LockFreeCuckooHash::with_capacity(size)),
lru_queue: Arc::new(Mutex::new(PriorityQueue::new())),
watch_id_table: Arc::new(LockFreeCuckooHash::with_capacity(size)),
watch_sender: sender,
}
}

/// Searches a `key` from the cache.
pub fn search(&self, key: &[u8]) -> Option<KeyValue> {
let guard = pin();
let search_result = self.hashtable.search_with_guard(&key.to_vec(), &guard);
pub async fn search(&self, key: Vec<u8>) -> Option<KeyValue> {
let search_result = {
let guard = pin();
self.hashtable.search_with_guard(&key, &guard).cloned()
};
match search_result {
Some(value) => Some(value.clone()),
Some(value) => {
self.lru_queue.lock().await.push(key, Self::get_priority());
Some(value.clone())
}
None => None,
}
}

/// Inserts a `key` from the cache.
pub fn insert(&self, key: Vec<u8>, value: KeyValue) {
self.hashtable.insert(key, value);
pub async fn insert(&self, key: Vec<u8>, value: KeyValue) {
self.hashtable.insert(key.clone(), value);
self.lru_queue.lock().await.push(key, Self::get_priority());
self.adjust_cache_size().await;
}

/// Deletes a `key` from the cache.
pub fn delete(&self, key: &[u8]) {
self.hashtable.remove(&key.to_vec());
pub async fn delete(&self, key: Vec<u8>, cancel_watch: bool) {
if cancel_watch {
let watch_id = self
.search_watch_id(&key.clone())
.unwrap_or_else(|| panic!("Fail to get watch id for a key"));
// Remove watch for this key.
self.watch_sender
.send(EtcdWatchRequest::cancel(watch_id.cast()))
.await
.unwrap_or_else(|e| panic!("Fail to send watch request, the error is {}", e));
} else {
self.delete_watch_id(&key);
}

self.hashtable.remove(&key);
self.lru_queue.lock().await.remove(&key);
}

/// Deletes a watch id from the cache.
pub fn delete_watch_id(&self, key: &[u8]) {
self.watch_id_table.remove(&key.to_vec());
}

/// Inserts a watch id to the cache.
pub fn insert_watch_id(&self, key: Vec<u8>, value: i64) {
self.watch_id_table.insert(key, value);
}

/// Searches a watch id for a specific key.
pub fn search_watch_id(&self, key: &[u8]) -> Option<i64> {
let guard = pin();
let search_result = self.watch_id_table.search_with_guard(&key.to_vec(), &guard);
match search_result {
Some(value) => Some(*value),
None => None,
}
}

/// Gets the priority of a key in lru queue.
fn get_priority() -> u64 {
let current_time = SystemTime::now();
let since_the_epoch = current_time
.duration_since(UNIX_EPOCH)
.unwrap_or_else(|e| panic!("Fail to get the time since the epoch, the error is {}", e));

u64::MAX.overflow_sub(since_the_epoch.as_secs())
}

/// Adjusts cache size if the number of value in cache has exceed the threshold(0.6).
async fn adjust_cache_size(&self) {
if self.hashtable.size() > self.hashtable.capacity().overflow_mul(6).overflow_div(10) {
let queue = self.lru_queue.lock().await;
if let Some(pop_value) = queue.peek() {
self.delete(pop_value.0.to_vec().clone(), false).await;
}
}
}
}
137 changes: 99 additions & 38 deletions src/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mod range;
/// Etcd txn mod for transaction operations.
mod txn;

pub use super::watch::EtcdWatchRequest;
pub use super::watch::{EtcdWatchRequest, EtcdWatchResponse};
pub use cache::Cache;
pub use delete::{EtcdDeleteRequest, EtcdDeleteResponse};
pub use get::{EtcdGetRequest, EtcdGetResponse};
Expand All @@ -26,12 +26,12 @@ use futures::stream::StreamExt;
use grpcio::WriteFlags;
use log::warn;
use protobuf::RepeatedField;
use smol::channel::{bounded, unbounded, Sender};
use std::str;

use crate::protos::kv::KeyValue;
use crate::Result as Res;
use futures::SinkExt;
use std::sync::Arc;
use utilities::Cast;

/// Key-Value client.
Expand All @@ -40,9 +40,11 @@ pub struct Kv {
/// Etcd Key-Value client.
client: KvClient,
/// Etcd client cache.
cache: Arc<Cache>,
cache: Cache,
/// Enable etcd client cache.
cache_enable: bool,
/// Etcd watch request sender.
watch_sender: Sender<EtcdWatchRequest>,
}

/// Etcd client cache default size.
Expand All @@ -63,39 +65,78 @@ impl Kv {
} else {
cache_size
};
let cache = Arc::new(Cache::new(etcd_cache_size));

let (watch_req_sender, watch_req_receiver) = unbounded::<EtcdWatchRequest>();
let (watch_id_sender, watch_id_receiver) = bounded::<i64>(1);

let cache = Cache::new(etcd_cache_size, watch_req_sender.clone());
let (mut client_req_sender, mut client_resp_receiver) = watch_client
.watch()
.unwrap_or_else(|e| panic!("failed to send watch commend, the response is: {}", e));

let cache_inner = Arc::<Cache>::clone(&cache);
let cache_clone = cache.clone();
let cache_inner = cache.clone();

// Task that handles all the pending watch requests.
smol::spawn(async move {
let watch_request = EtcdWatchRequest::create(KeyRange::all());
client_req_sender
.send((watch_request.into(), WriteFlags::default()))
.await
.unwrap_or_else(|e| panic!("Fail to send watch request, the error is: {}", e));
while let Ok(watch_req) = watch_req_receiver.recv().await {
let processing_key = watch_req.get_key();
client_req_sender
.send((watch_req.clone().into(), WriteFlags::default()))
.await
.unwrap_or_else(|e| panic!("Fail to send watch request, the error is: {}", e));
// Wait until etcd server returns watch id.
let watch_id = watch_id_receiver.recv().await.unwrap_or_else(|e| {
panic!("Fail to receive watch id from channel, the error is {}", e)
});
// Watch request can only be create or cancel.
if watch_req.is_create() {
cache_clone.insert_watch_id(processing_key.clone(), watch_id);
} else {
cache_clone.delete_watch_id(&processing_key);
}
}
})
.detach();

// Task that handle the watch responses from Etcd server.
smol::spawn(async move {
while let Some(watch_resp) = client_resp_receiver.next().await {
match watch_resp {
Ok(resp) => {
let events = resp.get_events().to_vec();
events.iter().for_each(|event| {
if event.get_field_type() == Event_EventType::PUT {
let value = cache_inner.search(event.get_kv().get_key());
if let Some(valid_value) = value {
if valid_value.version >= event.get_kv().get_version() {
return;
// TODO: Check if need to spawn new task here.
if resp.get_created() || resp.get_canceled() {
watch_id_sender
.send(resp.get_watch_id())
.await
.unwrap_or_else(|e| {
panic!("Fail to send watch id, the error is {}", e)
});
} else {
let events = resp.get_events().to_vec();
for event in events {
if event.get_field_type() == Event_EventType::PUT {
if let Some(valid_value) =
cache_inner.search(event.get_kv().get_key().to_vec()).await
{
// Only update the cache if event's version is larger than
// existing value's version
if valid_value.version < event.get_kv().get_version() {
cache_inner
.insert(
event.get_kv().get_key().to_vec(),
event.get_kv().clone(),
)
.await;
}
}
} else {
cache_inner
.delete(event.get_kv().get_key().to_vec(), true)
.await;
}
cache_inner.insert(
event.get_kv().get_key().to_vec(),
event.get_kv().clone(),
);
} else {
cache_inner.delete(event.get_kv().get_key());
}
})
}
}
Err(e) => {
warn!("Watch response contains error, the error is: {}", e);
Expand All @@ -108,6 +149,7 @@ impl Kv {
client,
cache,
cache_enable,
watch_sender: watch_req_sender,
}
}

Expand All @@ -118,8 +160,24 @@ impl Kv {
/// Will return `Err` if RPC call is failed.
#[inline]
pub async fn put(&mut self, req: EtcdPutRequest) -> Res<EtcdPutResponse> {
let resp = self.client.put_async(&req.into())?;
Ok(From::from(resp.await?))
let key = req.get_key();
let value = req.get_value();
let resp = self.client.put_async(&req.into())?.await?;
if self.cache.search(key.clone()).await == None {
let key_value = KeyValue {
key: key.clone(),
value,
..KeyValue::default()
};
self.cache.insert(key.clone(), key_value).await;
// Creates a new watch request and adds to the send queue.
let watch_request = EtcdWatchRequest::create(KeyRange::key(key));
self.watch_sender
.send(watch_request)
.await
.unwrap_or_else(|e| panic!("Fail to send watch request, the error is {}", e));
}
Ok(From::from(resp))
}

/// Performs a single key-value fetching operation.
Expand All @@ -130,7 +188,7 @@ impl Kv {
#[inline]
pub async fn get(&mut self, req: EtcdGetRequest) -> Res<EtcdGetResponse> {
if self.cache_enable {
if let Some(value) = self.cache.search(&req.get_key()) {
if let Some(value) = self.cache.search(req.get_key()).await {
let mut response = RangeResponse::new();
response.set_count(1);
response.set_kvs(RepeatedField::from_vec(vec![value]));
Expand All @@ -141,9 +199,19 @@ impl Kv {
let resp = self.client.range_async(&req.into())?.await?;
if self.cache_enable {
let kvs = resp.get_kvs();
kvs.iter().for_each(|kv| {
self.cache.insert(kv.get_key().to_vec(), kv.clone());
});
for kv in kvs {
if self.cache.search(kv.get_key().to_vec()).await == None {
// Creates a new watch request and adds to the send queue.
let watch_request = EtcdWatchRequest::create(KeyRange::key(kv.get_key()));
self.watch_sender
.send(watch_request)
.await
.unwrap_or_else(|e| {
panic!("Fail to send watch request, the error is {}", e)
});
}
self.cache.insert(kv.get_key().to_vec(), kv.clone()).await;
}
}
Ok(From::from(resp))
}
Expand All @@ -155,14 +223,7 @@ impl Kv {
/// Will return `Err` if RPC call is failed.
#[inline]
pub async fn range(&mut self, req: EtcdRangeRequest) -> Res<EtcdRangeResponse> {
let resp = self.client.range_async(&req.into())?.await?;
if self.cache_enable {
let kvs = resp.get_kvs();
kvs.iter().for_each(|kv| {
self.cache.insert(kv.get_key().to_vec(), kv.clone());
});
}
Ok(From::from(resp))
Ok(From::from(self.client.range_async(&req.into())?.await?))
}

/// Performs a key-value deleting operation.
Expand Down
23 changes: 22 additions & 1 deletion src/watch/watch_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::ResponseHeader;
use utilities::Cast;

/// Request for creating or canceling watch.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct EtcdWatchRequest {
/// Etcd watch key request.
proto: WatchRequest,
Expand Down Expand Up @@ -85,6 +85,27 @@ impl EtcdWatchRequest {
req.prev_kv = prev_kv
}
}

/// Gets the watch key.
/// It only effects when the request is for subscribing.
#[inline]
pub fn get_key(&self) -> Vec<u8> {
if let Some(WatchRequest_oneof_request_union::create_request(req)) =
self.proto.request_union.clone()
{
return req.get_key().to_vec();
}
vec![]
}

/// Returns if the watch request is a create watch request.
#[inline]
pub fn is_create(&self) -> bool {
if self.proto.has_create_request() {
return true;
}
false
}
}

impl Into<WatchRequest> for EtcdWatchRequest {
Expand Down

0 comments on commit 16cc8c6

Please sign in to comment.