Skip to content

Commit

Permalink
Merge pull request #86 from meilisearch/abort-indexing-process
Browse files Browse the repository at this point in the history
  • Loading branch information
Kerollmops authored Sep 28, 2024
2 parents 13985ba + d1b64ed commit e6dd6df
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 3 deletions.
4 changes: 4 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ pub enum Error {
#[error("The trees have not been built after an update on index {0}")]
NeedBuild(u16),

/// Returned iff the `should_abort` function returned true.
#[error("The corresponding build process has been cancelled")]
BuildCancelled,

/// Internal error
#[error("Internal error: {mode}({item}) is missing in index `{index}`")]
MissingKey {
Expand Down
34 changes: 34 additions & 0 deletions src/tests/writer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::sync::atomic::{AtomicUsize, Ordering};

use heed::EnvOpenOptions;
use insta::assert_snapshot;
use rand::seq::SliceRandom;
use rand::Rng;

Expand Down Expand Up @@ -1042,3 +1045,34 @@ fn prepare_changing_distance() {
assert!(writer.need_build(&wtxn).unwrap(), "because an item has been updated");
writer.builder(&mut rng).build(&mut wtxn).unwrap();
}

#[test]
fn cancel_indexing_process() {
let handle = create_database::<Euclidean>();
let mut rng = rng();
let mut wtxn = handle.env.write_txn().unwrap();
let writer = Writer::new(handle.database, 0, 2);
writer.add_item(&mut wtxn, 0, &[0.0, 0.0]).unwrap();
// Cancel straight away
let err = writer.builder(&mut rng).cancel(|| true).build(&mut wtxn).unwrap_err();
assert_snapshot!(err, @"The corresponding build process has been cancelled");

// Do not cancel at all
writer.builder(&mut rng).cancel(|| false).build(&mut wtxn).unwrap();

// Cancel after being called a few times
let writer = Writer::new(handle.database, 0, 2);
for i in 0..100 {
writer.add_item(&mut wtxn, i, &[i as f32, 1.1]).unwrap();
}
let cpt = AtomicUsize::new(0);
let err = writer
.builder(&mut rng)
.cancel(|| {
let prev = cpt.fetch_add(1, Ordering::Relaxed);
prev > 5
})
.build(&mut wtxn)
.unwrap_err();
assert_snapshot!(err, @"The corresponding build process has been cancelled");
}
71 changes: 68 additions & 3 deletions src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,20 @@ use crate::{
pub struct ArroyBuilder<'a, D: Distance, R: Rng + SeedableRng> {
writer: &'a Writer<D>,
rng: &'a mut R,
inner: BuildOption,
inner: BuildOption<'a>,
}

/// The options available when building the arroy database.
struct BuildOption {
struct BuildOption<'a> {
n_trees: Option<usize>,
split_after: Option<usize>,
cancel: Box<dyn Fn() -> bool + 'a + Sync + Send>,
}

impl Default for BuildOption<'_> {
fn default() -> Self {
Self { n_trees: None, split_after: None, cancel: Box::new(|| false) }
}
}

impl<'a, D: Distance, R: Rng + SeedableRng> ArroyBuilder<'a, D, R> {
Expand Down Expand Up @@ -79,6 +86,41 @@ impl<'a, D: Distance, R: Rng + SeedableRng> ArroyBuilder<'a, D, R> {
self
}

/// Provide a closure that can cancel the indexing process early if needed.
/// There is no guarantee on when the process is going to cancel itself, but
/// arroy will try to stop as soon as possible once the closure returns `true`.
///
/// Since the closure is not mutable and will be called from multiple threads
/// at the same time it's encouraged to make it quick to execute. A common
/// way to use it is to fetch an `AtomicBool` inside it that can be set
/// from another thread without lock.
///
/// # Example
///
/// ```no_run
/// # use arroy::{Writer, distances::Euclidean};
/// # let (writer, wtxn): (Writer<Euclidean>, heed::RwTxn) = todo!();
/// use rand::rngs::StdRng;
/// use rand::SeedableRng;
/// use std::sync::atomic::{AtomicBool, Ordering};
///
/// let stops_after = AtomicBool::new(false);
///
/// // Cancel the task after one minute
/// std::thread::spawn(|| {
/// let one_minute = std::time::Duration::from_secs(60);
/// std::thread::sleep(one_minute);
/// stops_after.store(true, Ordering::Relaxed);
/// });
///
/// let mut rng = StdRng::seed_from_u64(92);
/// writer.builder(&mut rng).split_after(1000).build(&mut wtxn);
/// ```
pub fn cancel(&mut self, cancel: impl Fn() -> bool + 'a + Sync + Send) -> &mut Self {
self.inner.cancel = Box::new(cancel);
self
}

/// Generates a forest of `n_trees` trees.
///
/// More trees give higher precision when querying at the cost of more disk usage.
Expand Down Expand Up @@ -334,7 +376,7 @@ impl<D: Distance> Writer<D> {

/// Returns an [`ArroyBuilder`] to configure the available options to build the database.
pub fn builder<'a, R: Rng + SeedableRng>(&'a self, rng: &'a mut R) -> ArroyBuilder<'a, D, R> {
ArroyBuilder { writer: self, rng, inner: BuildOption { n_trees: None, split_after: None } }
ArroyBuilder { writer: self, rng, inner: BuildOption::default() }
}

fn build<R: Rng + SeedableRng>(
Expand All @@ -345,6 +387,10 @@ impl<D: Distance> Writer<D> {
) -> Result<()> {
log::debug!("started preprocessing the items...");

if (options.cancel)() {
return Err(Error::BuildCancelled);
}

D::preprocess(wtxn, |wtxn| {
Ok(self
.database
Expand All @@ -353,6 +399,10 @@ impl<D: Distance> Writer<D> {
.remap_key_type::<KeyCodec>())
})?;

if (options.cancel)() {
return Err(Error::BuildCancelled);
}

let item_indices = self.item_indices(wtxn)?;
let n_items = item_indices.len();

Expand Down Expand Up @@ -415,6 +465,10 @@ impl<D: Distance> Writer<D> {

log::debug!("Getting a reference to your {} items...", n_items);

if (options.cancel)() {
return Err(Error::BuildCancelled);
}

let used_node_ids = self.used_tree_node(wtxn)?;
let nb_tree_nodes = used_node_ids.len();

Expand Down Expand Up @@ -483,6 +537,7 @@ impl<D: Distance> Writer<D> {
log::debug!("Deleting the extraneous trees if there is some...");
self.delete_extra_trees(
wtxn,
options,
&mut roots,
options.n_trees,
concurrent_node_ids.used(),
Expand Down Expand Up @@ -565,6 +620,9 @@ impl<D: Distance> Writer<D> {
to_delete: &RoaringBitmap,
tmp_nodes: &mut TmpNodes<NodeCodec<D>>,
) -> Result<(NodeId, RoaringBitmap)> {
if (opt.cancel)() {
return Err(Error::BuildCancelled);
}
match current_node.mode {
NodeMode::Item => {
// We were called on a specific item, we should create a descendants node
Expand Down Expand Up @@ -768,6 +826,9 @@ impl<D: Distance> Writer<D> {
item_indices: &RoaringBitmap,
tmp_nodes: &mut TmpNodes<NodeCodec<D>>,
) -> Result<NodeId> {
if (opt.cancel)() {
return Err(Error::BuildCancelled);
}
if item_indices.len() == 1 {
return Ok(NodeId::item(item_indices.min().unwrap()));
}
Expand Down Expand Up @@ -839,6 +900,7 @@ impl<D: Distance> Writer<D> {
fn delete_extra_trees(
&self,
wtxn: &mut RwTxn,
opt: &BuildOption,
roots: &mut Vec<ItemId>,
nb_trees: Option<usize>,
nb_tree_nodes: u64,
Expand Down Expand Up @@ -868,6 +930,9 @@ impl<D: Distance> Writer<D> {
log::debug!("Deleting {} trees", to_delete.len());

for tree in to_delete {
if (opt.cancel)() {
return Err(Error::BuildCancelled);
}
self.delete_tree(wtxn, NodeId::tree(tree))?;
}
}
Expand Down

0 comments on commit e6dd6df

Please sign in to comment.