Skip to content

Commit

Permalink
Implemented SortedSet key-value structure
Browse files Browse the repository at this point in the history
Need to design an API to expose it, but this functionality should be all
that's needed for the initial implementation.
  • Loading branch information
ecton committed Jan 26, 2022
1 parent 04ad9ef commit cb74df9
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 44 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ actionable = { git = "https://github.com/khonsulabs/actionable.git", branch = "l
# nebari = { path = "../nebari/nebari", version = "0.1" }
nebari = { git = "https://github.com/khonsulabs/nebari.git", branch = "main" }
# arc-bytes = { path = "../shared-buffer" }
arc-bytes = { git = "https://github.com/khonsulabs/arc-bytes.git", branch = "main" }

# [patch."https://github.com/khonsulabs/custodian.git"]
# custodian-password = { path = "../custodian/password" }
Expand Down
62 changes: 36 additions & 26 deletions crates/bonsaidb-core/src/keyvalue.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use arc_bytes::serde::Bytes;
use serde::{Deserialize, Serialize};

mod sorted_set;
mod timestamp;

pub use self::timestamp::Timestamp;
use crate::Error;
use crate::{keyvalue::sorted_set::SortedSet, Error};

mod implementation {
use arc_bytes::serde::Bytes;
Expand Down Expand Up @@ -286,85 +287,94 @@ pub struct SetCommand {
}

/// A value stored in a key.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum Value {
/// A value stored as a byte array.
Bytes(Bytes),
/// A numeric value.
Numeric(Numeric),
/// A set of values sorted by an associated value.
SortedSet(SortedSet),
}

impl Value {
/// Validates this value to ensure it is safe to store.
pub fn validate(self) -> Result<Self, Error> {
match self {
Self::Numeric(numeric) => numeric.validate().map(Self::Numeric),
Self::Bytes(vec) => Ok(Self::Bytes(vec)),
other => Ok(other),
}
}

/// Deserializes the bytes contained inside of this value. Returns an error
/// if this value doesn't contain bytes.
pub fn deserialize<V: for<'de> Deserialize<'de>>(&self) -> Result<V, Error> {
match self {
Self::Bytes(bytes) => Ok(pot::from_slice(bytes)?),
Self::Numeric(_) => Err(Error::Database(String::from(
"key contains numeric value, not serialized data",
))),
if let Self::Bytes(bytes) = self {
pot::from_slice(bytes).map_err(Error::from)
} else {
Err(Error::Database(String::from(
"key contains another type of data",
)))
}
}

/// Returns this value as an `i64`, allowing for precision to be lost if the type was not an `i64` originally. If saturating is true, the conversion will not allow overflows. Returns None if the value is bytes.
#[must_use]
pub fn as_i64_lossy(&self, saturating: bool) -> Option<i64> {
match self {
Self::Bytes(_) => None,
Self::Numeric(value) => Some(value.as_i64_lossy(saturating)),
if let Self::Numeric(value) = self {
Some(value.as_i64_lossy(saturating))
} else {
None
}
}

/// Returns this value as an `u64`, allowing for precision to be lost if the type was not an `u64` originally. If saturating is true, the conversion will not allow overflows. Returns None if the value is bytes.
#[must_use]
pub fn as_u64_lossy(&self, saturating: bool) -> Option<u64> {
match self {
Self::Bytes(_) => None,
Self::Numeric(value) => Some(value.as_u64_lossy(saturating)),
if let Self::Numeric(value) = self {
Some(value.as_u64_lossy(saturating))
} else {
None
}
}

/// Returns this value as an `f64`, allowing for precision to be lost if the type was not an `f64` originally. Returns None if the value is bytes.
#[must_use]
pub const fn as_f64_lossy(&self) -> Option<f64> {
match self {
Self::Bytes(_) => None,
Self::Numeric(value) => Some(value.as_f64_lossy()),
if let Self::Numeric(value) = self {
Some(value.as_f64_lossy())
} else {
None
}
}

/// Returns this numeric as an `i64`, allowing for precision to be lost if the type was not an `i64` originally. Returns None if the value is bytes.
#[must_use]
pub fn as_i64(&self) -> Option<i64> {
match self {
Self::Bytes(_) => None,
Self::Numeric(value) => value.as_i64(),
if let Self::Numeric(value) = self {
value.as_i64()
} else {
None
}
}

/// Returns this numeric as an `u64`, allowing for precision to be lost if the type was not an `u64` originally. Returns None if the value is bytes.
#[must_use]
pub fn as_u64(&self) -> Option<u64> {
match self {
Self::Bytes(_) => None,
Self::Numeric(value) => value.as_u64(),
if let Self::Numeric(value) = self {
value.as_u64()
} else {
None
}
}

/// Returns this numeric as an `f64`, allowing for precision to be lost if the type was not an `f64` originally. Returns None if the value is bytes.
#[must_use]
pub const fn as_f64(&self) -> Option<f64> {
match self {
Self::Bytes(_) => None,
Self::Numeric(value) => value.as_f64(),
if let Self::Numeric(value) = self {
value.as_f64()
} else {
None
}
}
}
Expand Down
220 changes: 220 additions & 0 deletions crates/bonsaidb-core/src/keyvalue/sorted_set.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
use std::{cmp::Ordering, collections::HashMap, ops::Deref};

use arc_bytes::{ArcBytes, OwnedBytes};
use serde::{ser::SerializeMap, Deserialize, Serialize};

#[derive(Default, Clone, Debug)]
pub struct SortedSet {
members: HashMap<OwnedBytes, Score>,
sorted_members: Vec<Entry>,
}

impl SortedSet {
pub fn insert(&mut self, value: OwnedBytes, score: Score) -> Option<Score> {
let entry = Entry { value, score };
let existing_score = self
.members
.insert(entry.value.clone(), entry.score.clone());

if existing_score.is_some() {
let remove_index = self
.sorted_members
.binary_search_by(|member| member.value.cmp(&entry.value))
.unwrap();
self.sorted_members.remove(remove_index);
}

let insert_at = self
.sorted_members
.binary_search_by(|member| member.score.cmp(&entry.score))
.unwrap_or_else(|i| i);
self.sorted_members.insert(insert_at, entry);

existing_score
}

pub fn score(&self, value: &[u8]) -> Option<&Score> {
self.members.get(value)
}

pub fn remove(&mut self, value: &[u8]) -> Option<Score> {
let existing_score = self.members.remove(value);
if existing_score.is_some() {
let (remove_index, _) = self
.sorted_members
.iter()
.enumerate()
.find(|(_index, member)| member.value == value)
.unwrap();
self.sorted_members.remove(remove_index);
}
existing_score
}
}

impl Deref for SortedSet {
type Target = Vec<Entry>;

fn deref(&self) -> &Self::Target {
&self.sorted_members
}
}

#[derive(Clone, Debug)]
pub struct Entry {
value: OwnedBytes,
score: Score,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum Score {
Signed(i64),
Unsigned(u64),
Float(f64),
Bytes(OwnedBytes),
}

// We check that the float value on input is not a NaN.
impl Eq for Score {}

impl PartialEq for Score {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}

#[allow(clippy::cast_precision_loss)]
impl Ord for Score {
fn cmp(&self, other: &Self) -> Ordering {
match (self, other) {
(Self::Signed(a), Self::Signed(b)) => a.cmp(b),
(Self::Signed(a), Self::Unsigned(b)) => {
if let Ok(a) = u64::try_from(*a) {
a.cmp(b)
} else {
Ordering::Less
}
}
(Self::Unsigned(a), Self::Signed(b)) => {
if let Ok(b) = u64::try_from(*b) {
a.cmp(&b)
} else {
Ordering::Greater
}
}
(Self::Unsigned(a), Self::Unsigned(b)) => a.cmp(b),
(Self::Float(a), Self::Float(b)) => a.partial_cmp(b).unwrap(),
(Self::Float(a), Self::Signed(b)) => a.partial_cmp(&(*b as f64)).unwrap(),
(Self::Float(a), Self::Unsigned(b)) => a.partial_cmp(&(*b as f64)).unwrap(),
(Self::Signed(a), Self::Float(b)) => (*a as f64).partial_cmp(b).unwrap(),
(Self::Unsigned(a), Self::Float(b)) => (*a as f64).partial_cmp(b).unwrap(),
(Self::Bytes(a), Self::Bytes(b)) => a.cmp(b),
(_, Self::Bytes(_)) => Ordering::Less,
(Self::Bytes(_), _) => Ordering::Greater,
}
}
}

impl PartialOrd for Score {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Serialize for SortedSet {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut map = serializer.serialize_map(Some(self.members.len()))?;
for member in &self.sorted_members {
map.serialize_entry(&member.value, &member.score)?;
}
map.end()
}
}

impl<'de> Deserialize<'de> for SortedSet {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
deserializer.deserialize_map(Visitor)
}
}

struct Visitor;

impl<'de> serde::de::Visitor<'de> for Visitor {
type Value = SortedSet;

fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter.write_str("sorted set entries")
}

fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: serde::de::MapAccess<'de>,
{
let (mut members, mut sorted_members) = if let Some(size) = map.size_hint() {
(HashMap::with_capacity(size), Vec::with_capacity(size))
} else {
(HashMap::default(), Vec::default())
};

while let Some((value, score)) = map.next_entry::<ArcBytes<'_>, Score>()? {
let entry = Entry {
value: OwnedBytes(value.into_owned()),
score,
};
members.insert(entry.value.clone(), entry.score.clone());
sorted_members.push(entry);
}

sorted_members.sort_by(|a, b| a.score.cmp(&b.score));

Ok(SortedSet {
members,
sorted_members,
})
}
}

#[test]
fn basics() {
let mut set = SortedSet::default();
assert_eq!(
set.insert(OwnedBytes::from(b"first"), Score::Unsigned(2)),
None
);
assert_eq!(set.score(b"first"), Some(&Score::Unsigned(2)));
assert_eq!(
set.insert(OwnedBytes::from(b"first"), Score::Unsigned(1)),
Some(Score::Unsigned(2))
);
assert_eq!(set.score(b"first"), Some(&Score::Unsigned(1)));

assert_eq!(set.insert(OwnedBytes::from(b"a"), Score::Unsigned(2)), None);
assert_eq!(set.len(), 2);
assert_eq!(set.score(b"a"), Some(&Score::Unsigned(2)));
assert_eq!(set[0].value, b"first");
assert_eq!(set[1].value, b"a");
assert_eq!(set.remove(b"first"), Some(Score::Unsigned(1)));
assert_eq!(set.remove(b"first"), None);
}

#[test]
fn serialization() {
let mut set = SortedSet::default();
set.insert(OwnedBytes::from(b"a"), Score::Signed(2));
set.insert(OwnedBytes::from(b"b"), Score::Unsigned(1));
set.insert(OwnedBytes::from(b"c"), Score::Float(0.));
let as_bytes = pot::to_vec(&set).unwrap();
let deserialized = pot::from_slice::<SortedSet>(&as_bytes).unwrap();
assert_eq!(deserialized.score(b"a"), set.score(b"a"));
assert_eq!(deserialized.score(b"b"), set.score(b"b"));
assert_eq!(deserialized.score(b"c"), set.score(b"c"));
assert_eq!(deserialized[0].value, b"c");
assert_eq!(deserialized[1].value, b"b");
assert_eq!(deserialized[2].value, b"a");
}
17 changes: 8 additions & 9 deletions crates/bonsaidb-core/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1630,13 +1630,12 @@ macro_rules! define_kv_test_suite {
db.get_key("akey").and_delete().into().await?,
Some(String::from("new_value"))
);
assert_eq!(db.get_key("akey").await?, None);
assert_eq!(
db.set_key("akey", &String::from("new_value"))
.returning_previous()
.await?,
None
);
assert!(db.get_key("akey").await?.is_none());
assert!(db
.set_key("akey", &String::from("new_value"))
.returning_previous()
.await?
.is_none());
assert_eq!(db.delete_key("akey").await?, KeyStatus::Deleted);
assert_eq!(db.delete_key("akey").await?, KeyStatus::NotChanged);

Expand Down Expand Up @@ -1951,10 +1950,10 @@ macro_rules! define_kv_test_suite {
continue;
}

assert_eq!(kv.get_key("b").await?, None, "b never expired");
assert!(kv.get_key("b").await?.is_none(), "b never expired");

timing.wait_until(Duration::from_secs_f32(5.)).await;
assert_eq!(kv.get_key("a").await?, None, "a never expired");
assert!(kv.get_key("a").await?.is_none(), "a never expired");
break;
}
harness.shutdown().await?;
Expand Down
Loading

0 comments on commit cb74df9

Please sign in to comment.