Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

写了个 半成品 ..... #6

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ Cargo.lock

# These are backup files generated by rustfmt
**/*.rs.bk
.idea
xlv/kvstore/target
xlv/kvstore/.idea
2 changes: 2 additions & 0 deletions xlv/kvstore/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/target
.idea
12 changes: 12 additions & 0 deletions xlv/kvstore/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "kvstore"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
rand = {version = "0.8", features = ["small_rng"] }
crossbeam = "0.8.1"
serde = { version = "1.0.89", features = ["derive"] }
bincode = "1.3.3"
21 changes: 21 additions & 0 deletions xlv/kvstore/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
### 请求流程
如图所示
![img_1.png](img_1.png)

### 开发过程
由于一开始准备通过atomic实现cas insert数据的,但是后面因为rust语言不熟悉放弃了这个思路,改成通过request请求进入一个lockfree_queue,然后queue pull出来的时候通过单线程写入skiplist,类似简易的存算分离
。思路是写queue时是多线程写入,存储时是单线程顺序写存储,顺序写存储的单线程直接cpu绑核,防止线程上下文切换开销。

#### 已完成
- 简易的skiplist(没有经过并发测试)
- 无锁的queue(也没有经过并发测试,借助`crossbeam::epoch`完成的)
- 实现了trait接口

#### 未完成
- 当kv数据封装成entry存储进入queue时没有写`serde`压缩
- 没有考虑好在queue中读取时如何range读取
- 从queue中消费出来写skiplist时,由于加了额外的trait bound导致写入报错 目前还没怎么解决
#### 总结
1. 虽然项目没有完成仅仅是个半成品 但是我俩还是在中学到很多东西,也了解到了rust的难度
2. 后续还是继续深入学习rust吧 这次开发 思路有,语言层面给我们拦住了,只能怪太菜了..
3. 开发人员:@uran0SH(主导项目) @azhsmesos
Binary file added xlv/kvstore/img_1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
22 changes: 22 additions & 0 deletions xlv/kvstore/src/entry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};

#[derive(Serialize, Deserialize, Debug)]
pub struct Entry<K: Ord, V> {
pub key: K,

pub value: V,
}

impl<K: Ord, V> Entry<K, V> {
pub fn new(key: K, value: V) -> Entry<K, V> {
Self { key, value }
}
}

// impl<K: Ord, V> Display for Entry<K, V> {
// fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
// println!("{}-{}", self.key, self.value);
// Ok(())
// }
// }
91 changes: 91 additions & 0 deletions xlv/kvstore/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
mod entry;
mod lockfree_queue;
mod skip;
mod atomic;

use std::{collections::BTreeMap, sync::Mutex};
use std::collections::LinkedList;
use std::iter::Skip;
use crate::entry::Entry;
use crate::lockfree_queue::Queue;
use crate::skip::SkipList;


/// Operations of Index
pub(crate) trait IndexOperate<K: Ord, V> {
/// Get a range of keys in [key, range_end]
fn get(&self, key: &K, range_end: &K) -> Vec<&V>;
/// delete a range of keys in [key, range_end]
fn delete(&self, key: &K, range_end: &K) -> Vec<V>;
/// insert of update a key
fn insert_or_update(&self, key: K, value: V) -> Option<V>;
}

pub struct KVStore<K: Ord, V> {
map: Mutex<BTreeMap<K, V>>,
}

pub struct KVQueue<K: Ord, V> {
queue: Queue<Entry<K, V>>,
}

impl <K: Ord, V> KVQueue<K, V> {
pub fn new() -> Self {
Self {
queue: Queue::new(),
}
}
}


impl<K: Ord, V> KVStore<K, V> {
pub fn new() -> Self {
Self {
map: Mutex::new(BTreeMap::new()),
}
}
}

impl<K: Ord, V> IndexOperate<K, V> for KVQueue<K, V> {
fn get(&self, key: &K, range_end: &K) -> Vec<&V> {
todo!()
}

fn delete(&self, key: &K, range_end: &K) -> Vec<V> {
todo!()
}

fn insert_or_update(&self, key: K, value: V) -> Option<V> {
let entry = Entry::new(key, value);
self.queue.push(entry);
// thread::spawn( move || {
// let entry = self.queue.pop().unwrap();
// self.list.insert_or_update(entry.key, entry.value);
//
// });

None
}
}

impl<K: Ord, V> IndexOperate<K, V> for KVStore<K, V> {
fn get(&self, key: &K, range_end: &K) -> Vec<&V> {
todo!()
}

fn delete(&self, key: &K, range_end: &K) -> Vec<V> {
todo!()
}

fn insert_or_update(&self, key: K, value: V) -> Option<V> {
let mut lock = self.map.lock().unwrap();
lock.insert(key, value);
None
}
}

#[cfg(test)]
mod tests {
#[test]
fn test_get() {}
}
181 changes: 181 additions & 0 deletions xlv/kvstore/src/lockfree_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
use std::{
io::Write,
sync::atomic::{AtomicUsize, Ordering},
};

use crossbeam::epoch;
use epoch::{Atomic, Owned, Shared};

type NodePtr<T> = Atomic<Node<T>>;
struct Node<T> {
pub item: Option<T>,
pub next: NodePtr<T>,
}

impl<T> Node<T> {
pub fn new_empty() -> Self {
Self {
item: None,
next: Atomic::null(),
}
}

pub fn new(data: T) -> Self {
Self {
item: Some(data),
next: Atomic::null(),
}
}
}

pub struct Queue<T> {
len: AtomicUsize,
head: NodePtr<T>,
tail: NodePtr<T>,
}

impl<T> Default for Queue<T> {
fn default() -> Self {
let head = Atomic::new(Node::new_empty());
let tail = head.clone();
Self {
len: AtomicUsize::new(0),
head,
tail,
}
}
}

impl<T> Queue<T> {
pub fn new() -> Self {
Self::default()
}

pub fn size(&self) -> usize {
self.len.load(Ordering::SeqCst)
}

pub fn is_empty(&self) -> bool {
0 == self.len.load(Ordering::SeqCst)
}

pub fn push(&self, data: T) {
let guard = epoch::pin();

let new_node = Owned::new(Node::new(data)).into_shared(&guard);

let mut tail;
unsafe {
let null = Shared::null();
loop {
tail = self.tail.load(Ordering::Acquire, &guard);
let tail_next = &(*tail.as_raw()).next;
if tail_next
.compare_exchange(null, new_node, Ordering::AcqRel, Ordering::Relaxed, &guard)
.is_ok()
{
break;
}
let tail_next = tail_next.load(Ordering::Acquire, &guard);
let _ = self.tail.compare_exchange(
tail,
tail_next,
Ordering::AcqRel,
Ordering::Relaxed,
&guard,
);
}
}
let _ = self.tail.compare_exchange(
tail,
new_node,
Ordering::Release,
Ordering::Relaxed,
&guard,
);

self.len.fetch_add(1, Ordering::SeqCst);
}

pub fn pop(&self) -> Option<T> {
let mut data = None;
if self.is_empty() {
return data;
}
let guard = &epoch::pin();
unsafe {
loop {
let head = self.head.load(Ordering::Acquire, guard);
let mut next = (*head.as_raw()).next.load(Ordering::Acquire, guard);

if next.is_null() {
return None;
}

if self
.head
.compare_exchange(head, next, Ordering::Release, Ordering::Relaxed, guard)
.is_ok()
{
data = next.deref_mut().item.take();
guard.defer_destroy(head);
break;
}
}
}
self.len.fetch_sub(1, Ordering::SeqCst);
data
}
}

impl<T> Drop for Queue<T> {
fn drop(&mut self) {
while self.pop().is_some() {}
let guard = &epoch::pin();
unsafe {
let h = self.head.load_consume(guard);
guard.defer_destroy(h);
}
}
}

impl<T> Queue<T> {
// 用来打印元素
pub fn print_queue(&self) {
let _ = std::io::stdout().flush();
let guard = epoch::pin();
let mut start = self.head.load(Ordering::Acquire, &guard);

let mut actual_len = 0;
while !start.is_null() {
unsafe {
print!("-> {:?}", start.as_raw());
let _ = std::io::stdout().flush();
start = (*start.as_raw()).next.load(Ordering::Acquire, &guard);
}
actual_len += 1;
}
println!(" size:{} actual: {}", self.size(), actual_len - 1);
}
}

#[cfg(test)]
mod lockfree_queue_test {
use crate::entry::Entry;
use crate::lockfree_queue::Queue;
use std::{
sync::{
atomic::{AtomicI32, Ordering},
Arc, Barrier,
},
thread,
};

#[test]
fn test_single() {
let q = Queue::new();
let entry = Entry::new(1, 2);
q.push(entry);
q.print_queue();
}
}
Loading