Skip to content

Commit

Permalink
Merge pull request #81 from fjall-rs/2.5.0
Browse files Browse the repository at this point in the history
2.5.0
  • Loading branch information
marvin-j97 authored Jan 8, 2025
2 parents c7164d2 + abddb10 commit 5d023fd
Show file tree
Hide file tree
Showing 66 changed files with 1,771 additions and 1,020 deletions.
10 changes: 8 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "lsm-tree"
description = "A K.I.S.S. implementation of log-structured merge trees (LSM-trees/LSMTs)"
license = "MIT OR Apache-2.0"
version = "2.4.0"
version = "2.5.0"
edition = "2021"
rust-version = "1.74.0"
readme = "README.md"
Expand Down Expand Up @@ -38,7 +38,7 @@ quick_cache = { version = "0.6.5", default-features = false, features = [] }
rustc-hash = "2.0.0"
self_cell = "1.0.4"
tempfile = "3.12.0"
value-log = "1.3.0"
value-log = "1.4.1"
varint-rs = "2.2.0"
xxhash-rust = { version = "0.8.12", features = ["xxh3"] }

Expand All @@ -57,6 +57,12 @@ harness = false
path = "benches/tli.rs"
required-features = []

[[bench]]
name = "merge"
harness = false
path = "benches/merge.rs"
required-features = []

[[bench]]
name = "memtable"
harness = false
Expand Down
77 changes: 77 additions & 0 deletions benches/merge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use criterion::{criterion_group, criterion_main, Criterion};
use lsm_tree::merge::{BoxedIterator, Merger};
use lsm_tree::{mvcc_stream::MvccStream, InternalValue, Memtable};
use nanoid::nanoid;

fn merger(c: &mut Criterion) {
for num in [2, 4, 8, 16, 30] {
c.bench_function(&format!("Merge {num}"), |b| {
let memtables = (0..num)
.map(|_| {
let table = Memtable::default();

for _ in 0..100 {
table.insert(InternalValue::from_components(
nanoid!(),
vec![],
0,
lsm_tree::ValueType::Value,
));
}

table
})
.collect::<Vec<_>>();

b.iter_with_large_drop(|| {
let iters = memtables
.iter()
.map(|x| x.iter().map(Ok))
.map(|x| Box::new(x) as BoxedIterator<'_>)
.collect();

let merger = Merger::new(iters);

assert_eq!(num * 100, merger.count());
})
});
}
}

fn mvcc_stream(c: &mut Criterion) {
for num in [2, 4, 8, 16, 30] {
c.bench_function(&format!("MVCC stream {num} versions"), |b| {
let memtables = (0..num)
.map(|_| {
let table = Memtable::default();

for key in 'a'..='z' {
table.insert(InternalValue::from_components(
key.to_string(),
vec![],
num,
lsm_tree::ValueType::Value,
));
}

table
})
.collect::<Vec<_>>();

b.iter_with_large_drop(|| {
let iters = memtables
.iter()
.map(|x| x.iter().map(Ok))
.map(|x| Box::new(x) as BoxedIterator<'_>)
.collect();

let merger = MvccStream::new(Merger::new(iters));

assert_eq!(26, merger.count());
})
});
}
}

criterion_group!(benches, merger, mvcc_stream);
criterion_main!(benches);
2 changes: 1 addition & 1 deletion benches/tli.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use criterion::{criterion_group, criterion_main, Criterion};
use lsm_tree::segment::{
block_index::BlockIndex, value_block::BlockOffset, value_block::CachePolicy,
block_index::KeyedBlockIndex, value_block::BlockOffset, value_block::CachePolicy,
};

fn tli_find_item(c: &mut Criterion) {
Expand Down
11 changes: 3 additions & 8 deletions src/abstract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,16 +559,11 @@ pub trait AbstractTree {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
fn insert<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V, seqno: SeqNo) -> (u32, u32);

/// Inserts a key-value pair.
fn raw_insert_with_lock<K: AsRef<[u8]>, V: AsRef<[u8]>>(
fn insert<K: Into<UserKey>, V: Into<UserValue>>(
&self,
lock: &RwLockWriteGuard<'_, Memtable>,
key: K,
value: V,
seqno: SeqNo,
r#type: ValueType,
) -> (u32, u32);

/// Removes an item from the tree.
Expand Down Expand Up @@ -598,7 +593,7 @@ pub trait AbstractTree {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
fn remove<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> (u32, u32);
fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32);

/// Removes an item from the tree.
///
Expand Down Expand Up @@ -632,5 +627,5 @@ pub trait AbstractTree {
/// # Errors
///
/// Will return `Err` if an IO error occurs.
fn remove_weak<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> (u32, u32);
fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32);
}
2 changes: 1 addition & 1 deletion src/blob_tree/gc/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl<'a> GcReader<'a> {
fn get_internal(&self, key: &[u8]) -> crate::Result<Option<MaybeInlineValue>> {
let Some(item) = self
.tree
.get_internal_entry_with_lock(self.memtable, key, true, None)?
.get_internal_entry_with_lock(self.memtable, key, None)?
.map(|x| x.value)
else {
return Ok(None);
Expand Down
24 changes: 4 additions & 20 deletions src/blob_tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,34 +596,18 @@ impl AbstractTree for BlobTree {
)
}

fn raw_insert_with_lock<K: AsRef<[u8]>, V: AsRef<[u8]>>(
fn insert<K: Into<UserKey>, V: Into<UserValue>>(
&self,
lock: &RwLockWriteGuard<'_, Memtable>,
key: K,
value: V,
seqno: SeqNo,
r#type: ValueType,
) -> (u32, u32) {
use value::MaybeInlineValue;

// NOTE: Initially, we always write an inline value
// On memtable flush, depending on the values' sizes, they will be separated
// into inline or indirect values
let item = MaybeInlineValue::Inline(value.as_ref().into());

let value = item.encode_into_vec();

let value = InternalValue::from_components(key.as_ref(), value, seqno, r#type);
lock.insert(value)
}

fn insert<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V, seqno: SeqNo) -> (u32, u32) {
use value::MaybeInlineValue;

// NOTE: Initially, we always write an inline value
// On memtable flush, depending on the values' sizes, they will be separated
// into inline or indirect values
let item = MaybeInlineValue::Inline(value.as_ref().into());
let item = MaybeInlineValue::Inline(value.into());

let value = item.encode_into_vec();

Expand Down Expand Up @@ -680,11 +664,11 @@ impl AbstractTree for BlobTree {
}
}

fn remove<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
self.index.remove(key, seqno)
}

fn remove_weak<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
self.index.remove_weak(key, seqno)
}
}
12 changes: 0 additions & 12 deletions src/bloom/bit_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,6 @@ impl BitArray {
Self(bytes)
}

/// Size in bytes
#[must_use]
pub fn len(&self) -> usize {
self.0.len()
}

#[allow(unused)]
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}

#[must_use]
pub fn bytes(&self) -> &[u8] {
&self.0
Expand Down
2 changes: 1 addition & 1 deletion src/bloom/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl BloomFilter {
/// Size of bloom filter in bytes.
#[must_use]
pub fn len(&self) -> usize {
self.inner.len()
self.inner.bytes().len()
}

fn from_raw(m: usize, k: usize, bytes: Box<[u8]>) -> Self {
Expand Down
27 changes: 14 additions & 13 deletions src/compaction/fifo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ impl Strategy {
}

impl CompactionStrategy for Strategy {
fn get_name(&self) -> &'static str {
"FifoStrategy"
}

fn choose(&self, levels: &LevelManifest, config: &Config) -> Choice {
let resolved_view = levels.resolved_view();

Expand All @@ -58,11 +62,8 @@ impl CompactionStrategy for Strategy {
let lifetime_sec = lifetime_us / 1000 / 1000;

if lifetime_sec > ttl_seconds.into() {
log::warn!(
"segment is older than configured TTL: {:?}",
segment.metadata.id,
);
segment_ids_to_delete.insert(segment.metadata.id);
log::warn!("segment is older than configured TTL: {:?}", segment.id(),);
segment_ids_to_delete.insert(segment.id());
}
}
}
Expand All @@ -86,11 +87,11 @@ impl CompactionStrategy for Strategy {

bytes_to_delete = bytes_to_delete.saturating_sub(segment.metadata.file_size);

segment_ids_to_delete.insert(segment.metadata.id);
segment_ids_to_delete.insert(segment.id());

log::debug!(
"dropping segment to reach configured size limit: {:?}",
segment.metadata.id,
segment.id(),
);
}
}
Expand Down Expand Up @@ -124,7 +125,7 @@ mod tests {
key_range::KeyRange,
level_manifest::LevelManifest,
segment::{
block_index::two_level_index::TwoLevelBlockIndex,
block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
file_offsets::FileOffsets,
meta::{Metadata, SegmentId},
value_block::BlockOffset,
Expand All @@ -136,18 +137,18 @@ mod tests {
use std::sync::Arc;
use test_log::test;

#[cfg(feature = "bloom")]
use crate::bloom::BloomFilter;

#[allow(clippy::expect_used)]
#[allow(clippy::cast_possible_truncation)]
fn fixture_segment(id: SegmentId, created_at: u128) -> Segment {
let block_cache = Arc::new(BlockCache::with_capacity_bytes(10 * 1_024 * 1_024));

let block_index = TwoLevelBlockIndex::new((0, id).into(), block_cache.clone());
let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));

SegmentInner {
tree_id: 0,
descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
block_index: Arc::new(TwoLevelBlockIndex::new((0, id).into(), block_cache.clone())),
block_index,

offsets: FileOffsets {
bloom_ptr: BlockOffset(0),
Expand Down Expand Up @@ -180,7 +181,7 @@ mod tests {
block_cache,

#[cfg(feature = "bloom")]
bloom_filter: Some(BloomFilter::with_fp_rate(1, 0.1)),
bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
}
.into()
}
Expand Down
Loading

0 comments on commit 5d023fd

Please sign in to comment.