Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use incremental upgrade to better control the RAM usage #116

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ pub enum Node<'a, D: Distance> {
SplitPlaneNormal(SplitPlaneNormal<'a, D>),
}

const LEAF_TAG: u8 = 0;
const DESCENDANTS_TAG: u8 = 1;
const SPLIT_PLANE_NORMAL_TAG: u8 = 2;
pub const LEAF_TAG: u8 = 0;
pub const DESCENDANTS_TAG: u8 = 1;
pub const SPLIT_PLANE_NORMAL_TAG: u8 = 2;

impl<'a, D: Distance> Node<'a, D> {
pub fn leaf(self) -> Option<Leaf<'a, D>> {
Expand Down
142 changes: 109 additions & 33 deletions src/parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use heed::types::Bytes;
use heed::{BytesDecode, BytesEncode, RoTxn};
use memmap2::Mmap;
use nohash::{BuildNoHashHasher, IntMap};
use nohash::{BuildNoHashHasher, IntMap, IntSet};
use rand::seq::index;
use rand::Rng;
use roaring::{RoaringBitmap, RoaringTreemap};

use crate::internals::{KeyCodec, Leaf, NodeCodec};
use crate::key::{Prefix, PrefixCodec};
use crate::node::Node;
use crate::key::{Key, Prefix, PrefixCodec};
use crate::node::{Node, SplitPlaneNormal};
use crate::node_id::NodeMode;
use crate::{Database, Distance, Error, ItemId, Result};

/// A structure to store the tree nodes out of the heed database.
Expand All @@ -24,6 +25,7 @@ pub struct TmpNodes<DE> {
ids: Vec<ItemId>,
bounds: Vec<usize>,
deleted: RoaringBitmap,
remap_ids: IntMap<ItemId, ItemId>,
_marker: marker::PhantomData<DE>,
}

Expand All @@ -35,6 +37,7 @@ impl<'a, DE: BytesEncode<'a>> TmpNodes<DE> {
ids: Vec::new(),
bounds: vec![0],
deleted: RoaringBitmap::new(),
remap_ids: IntMap::default(),
_marker: marker::PhantomData,
})
}
Expand All @@ -46,6 +49,7 @@ impl<'a, DE: BytesEncode<'a>> TmpNodes<DE> {
ids: Vec::new(),
bounds: vec![0],
deleted: RoaringBitmap::new(),
remap_ids: IntMap::default(),
_marker: marker::PhantomData,
})
}
Expand All @@ -71,10 +75,16 @@ impl<'a, DE: BytesEncode<'a>> TmpNodes<DE> {
Ok(())
}

/// Remap the item id of an already inserted node to another node.
/// Only apply to the nodes to insert. Won't interact with the to_delete nodes.
pub fn remap(&mut self, current: ItemId, new: ItemId) {
self.remap_ids.insert(current, new);
}

/// Delete the tmp_nodes and the node in the database.
pub fn remove(&mut self, item: ItemId) {
let deleted = self.deleted.insert(item);
debug_assert!(deleted);
debug_assert!(deleted, "Removed the same item with id {item} twice");
}

/// Converts it into a readers to read the nodes.
Expand All @@ -84,7 +94,13 @@ impl<'a, DE: BytesEncode<'a>> TmpNodes<DE> {
let mmap = unsafe { Mmap::map(&file)? };
#[cfg(unix)]
mmap.advise(memmap2::Advice::Sequential)?;
Ok(TmpNodesReader { mmap, ids: self.ids, bounds: self.bounds, deleted: self.deleted })
Ok(TmpNodesReader {
mmap,
ids: self.ids,
bounds: self.bounds,
deleted: self.deleted,
remap_ids: self.remap_ids,
})
}
}

Expand All @@ -94,14 +110,10 @@ pub struct TmpNodesReader {
ids: Vec<ItemId>,
bounds: Vec<usize>,
deleted: RoaringBitmap,
remap_ids: IntMap<ItemId, ItemId>,
}

impl TmpNodesReader {
/// The number of nodes stored in this file.
pub fn len(&self) -> usize {
self.ids.len()
}

pub fn to_delete(&self) -> impl Iterator<Item = ItemId> + '_ {
self.deleted.iter()
}
Expand All @@ -112,6 +124,10 @@ impl TmpNodesReader {
.iter()
.zip(self.bounds.windows(2))
.filter(|(&id, _)| !self.deleted.contains(id))
.map(|(id, bounds)| match self.remap_ids.get(id) {
Some(new_id) => (new_id, bounds),
None => (id, bounds),
})
.map(|(id, bounds)| {
let [start, end] = [bounds[0], bounds[1]];
(*id, &self.mmap[start..end])
Expand Down Expand Up @@ -168,18 +184,14 @@ impl ConcurrentNodeIds {
Ok(self.current.fetch_add(1, Ordering::Relaxed))
}
}

/// Returns the number of used ids in total.
pub fn used(&self) -> u64 {
self.used.load(Ordering::Relaxed)
}
}

/// A struture used to keep a list of the leaf nodes in the tree.
///
/// It is safe to share between threads as the pointer are pointing
/// in the mmapped file and the transaction is kept here and therefore
/// no longer touches the database.
// TODO: Rename to immutable Items since it doesn't contains descendants
pub struct ImmutableLeafs<'t, D> {
leafs: IntMap<ItemId, *const u8>,
constant_length: Option<usize>,
Expand All @@ -189,31 +201,60 @@ pub struct ImmutableLeafs<'t, D> {
impl<'t, D: Distance> ImmutableLeafs<'t, D> {
/// Creates the structure by fetching all the leaf pointers
/// and keeping the transaction making the pointers valid.
/// Do not take more items than memory allows.
/// Remove from the list of candidates all the items that were selected and return them.
pub fn new(
rtxn: &'t RoTxn,
database: Database<D>,
index: u16,
nb_leafs: u64,
progress: &AtomicU32,
) -> heed::Result<Self> {
let mut leafs =
IntMap::with_capacity_and_hasher(nb_leafs as usize, BuildNoHashHasher::default());
let mut constant_length = None;
candidates: &mut RoaringBitmap,
memory: usize,
) -> heed::Result<(Self, RoaringBitmap)> {
let page_size = page_size::get();
let nb_page_allowed = (memory as f64 / page_size as f64).floor() as usize;

let iter = database
.remap_types::<PrefixCodec, Bytes>()
.prefix_iter(rtxn, &Prefix::item(index))?
.remap_key_type::<KeyCodec>();
let mut leafs = IntMap::with_capacity_and_hasher(
candidates.len() as usize, // TODO: could be reduced a lot
BuildNoHashHasher::default(),
);
let mut pages_used = IntSet::with_capacity_and_hasher(
nb_page_allowed.min(candidates.len() as usize),
BuildNoHashHasher::default(),
);
let mut selected_items = RoaringBitmap::new();
let mut constant_length = None;

for result in iter {
let (key, bytes) = result?;
let item_id = key.node.unwrap_item();
while let Some(item_id) = candidates.select(0) {
let bytes = database
.remap_types::<KeyCodec, Bytes>()
.get(rtxn, &Key::item(index, item_id))?
.unwrap();
assert_eq!(*constant_length.get_or_insert(bytes.len()), bytes.len());
leafs.insert(item_id, bytes.as_ptr());
progress.fetch_add(1, Ordering::Relaxed);

let ptr = bytes.as_ptr();
let addr = ptr as usize;
let start = addr / page_size;
let end = (addr + bytes.len()) / page_size;

pages_used.insert(start);
if start != end {
pages_used.insert(end);
}

if pages_used.len() >= nb_page_allowed && leafs.len() >= 200 {
break;
}

// Safe because the items comes from another roaring bitmap
selected_items.push(item_id);
candidates.remove_smallest(1);
leafs.insert(item_id, ptr);
}

Ok(ImmutableLeafs { leafs, constant_length, _marker: marker::PhantomData })
Ok((
ImmutableLeafs { leafs, constant_length, _marker: marker::PhantomData },
selected_items,
))
}

/// Returns the leafs identified by the given ID.
Expand Down Expand Up @@ -409,7 +450,6 @@ impl<'t, D: Distance> ImmutableTrees<'t, D> {
database: Database<D>,
index: u16,
nb_trees: u64,
progress: &AtomicU32,
) -> heed::Result<Self> {
let mut trees =
IntMap::with_capacity_and_hasher(nb_trees as usize, BuildNoHashHasher::default());
Expand All @@ -423,12 +463,48 @@ impl<'t, D: Distance> ImmutableTrees<'t, D> {
let (key, bytes) = result?;
let tree_id = key.node.unwrap_tree();
trees.insert(tree_id, (bytes.len(), bytes.as_ptr()));
progress.fetch_add(1, Ordering::Relaxed);
}

Ok(ImmutableTrees { trees, _marker: marker::PhantomData })
}

pub fn sub_tree_from_id(
rtxn: &'t RoTxn,
database: Database<D>,
index: u16,
start: ItemId,
) -> Result<Self> {
let mut trees = IntMap::default();
let mut explore = vec![start];
while let Some(current) = explore.pop() {
let bytes =
database.remap_data_type::<Bytes>().get(rtxn, &Key::tree(index, current))?.unwrap();
let node: Node<'_, D> = NodeCodec::bytes_decode(bytes).unwrap();
match node {
Node::Leaf(_leaf) => unreachable!(),
Node::Descendants(_descendants) => {
trees.insert(current, (bytes.len(), bytes.as_ptr()));
}
Node::SplitPlaneNormal(SplitPlaneNormal { left, right, normal: _ }) => {
trees.insert(current, (bytes.len(), bytes.as_ptr()));
// We must avoid the items and only push the tree nodes
if left.mode == NodeMode::Tree {
explore.push(left.item);
}
if right.mode == NodeMode::Tree {
explore.push(right.item);
}
}
}
}

Ok(Self { trees, _marker: marker::PhantomData })
}

pub fn empty() -> Self {
Self { trees: IntMap::default(), _marker: marker::PhantomData }
}

/// Returns the tree node identified by the given ID.
pub fn get(&self, item_id: ItemId) -> heed::Result<Option<Node<'t, D>>> {
let (ptr, len) = match self.trees.get(&item_id) {
Expand Down
6 changes: 5 additions & 1 deletion src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,11 @@ impl<'t, D: Distance> Reader<'t, D> {
rtxn: &RoTxn,
node_id: NodeId,
) -> Result<(RoaringBitmap, RoaringBitmap)> {
match self.database.get(rtxn, &Key::new(self.index, node_id))?.unwrap() {
match self
.database
.get(rtxn, &Key::new(self.index, node_id))?
.unwrap_or_else(|| panic!("Could not find {node_id:?} in index {}", self.index))
{
Node::Leaf(_) => Ok((
RoaringBitmap::new(),
RoaringBitmap::from_sorted_iter(Some(node_id.item)).unwrap(),
Expand Down
5 changes: 1 addition & 4 deletions src/tests/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,7 @@ fn two_dimension_on_a_line() {

// if we can't look into enough nodes we find some random points
let ret = reader.nns(5).search_k(NonZeroUsize::new(1).unwrap()).by_item(&rtxn, 1).unwrap();
insta::assert_snapshot!(NnsRes(ret), @r###"
id(48): distance(47)
id(92): distance(91)
"###);
insta::assert_snapshot!(NnsRes(ret), @"id(63): distance(62)");

// if we can look into all the node there is no inifinite loop and it works
let ret =
Expand Down
Loading
Loading