Skip to content

Commit 1b08886

Browse files
authored
⚡ faster parallel searchsorted (#15)
1 parent 108c557 commit 1b08886

File tree

6 files changed

+107
-72
lines changed

6 files changed

+107
-72
lines changed

downsample_rs/src/m4/generic.rs

+27-28
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use ndarray::{s, Array1, ArrayView1};
33

44
use rayon::iter::IndexedParallelIterator;
55
use rayon::prelude::*;
6-
use std::sync::{Arc, Mutex};
76

87
// --------------------- WITHOUT X
98

@@ -134,7 +133,7 @@ pub(crate) fn m4_generic_with_x<T: Copy>(
134133
#[inline(always)]
135134
pub(crate) fn m4_generic_with_x_parallel<T: Copy + PartialOrd + Send + Sync>(
136135
arr: ArrayView1<T>,
137-
bin_idx_iterator: impl IndexedParallelIterator<Item = (usize, usize)>,
136+
bin_idx_iterator: impl IndexedParallelIterator<Item = impl Iterator<Item = (usize, usize)>>,
138137
n_out: usize,
139138
f_argminmax: fn(ArrayView1<T>) -> (usize, usize),
140139
) -> Array1<usize> {
@@ -143,30 +142,30 @@ pub(crate) fn m4_generic_with_x_parallel<T: Copy + PartialOrd + Send + Sync>(
143142
return Array1::from((0..arr.len()).collect::<Vec<usize>>());
144143
}
145144

146-
let sampled_indices = Arc::new(Mutex::new(Array1::<usize>::default(n_out)));
147-
148-
// Iterate over the sample_index pointers and the array chunks
149-
bin_idx_iterator
150-
.enumerate()
151-
.for_each(|(i, (start_idx, end_idx))| {
152-
let (min_index, max_index) = f_argminmax(arr.slice(s![start_idx..end_idx]));
153-
154-
sampled_indices.lock().unwrap()[4 * i] = start_idx;
155-
156-
// Add the indexes in sorted order
157-
if min_index < max_index {
158-
sampled_indices.lock().unwrap()[4 * i + 1] = min_index + start_idx;
159-
sampled_indices.lock().unwrap()[4 * i + 2] = max_index + start_idx;
160-
} else {
161-
sampled_indices.lock().unwrap()[4 * i + 1] = max_index + start_idx;
162-
sampled_indices.lock().unwrap()[4 * i + 2] = min_index + start_idx;
163-
}
164-
sampled_indices.lock().unwrap()[4 * i + 3] = end_idx - 1;
165-
});
166-
167-
// Remove the mutex and return the sampled indices
168-
Arc::try_unwrap(sampled_indices)
169-
.unwrap()
170-
.into_inner()
171-
.unwrap()
145+
Array1::from_vec(
146+
bin_idx_iterator
147+
.flat_map(|bin_idx_iterator| {
148+
bin_idx_iterator
149+
.map(|(start, end)| {
150+
let step = unsafe {
151+
ArrayView1::from_shape_ptr(end - start, arr.as_ptr().add(start))
152+
};
153+
let (min_index, max_index) = f_argminmax(step);
154+
155+
// Add the indexes in sorted order
156+
let mut sampled_index = [start, 0, 0, end - 1];
157+
if min_index < max_index {
158+
sampled_index[1] = min_index + start;
159+
sampled_index[2] = max_index + start;
160+
} else {
161+
sampled_index[1] = max_index + start;
162+
sampled_index[2] = min_index + start;
163+
}
164+
sampled_index
165+
})
166+
.collect::<Vec<[usize; 4]>>()
167+
})
168+
.flatten()
169+
.collect::<Vec<usize>>(),
170+
)
172171
}

downsample_rs/src/minmax/generic.rs

+27-24
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use ndarray::{s, Array1, ArrayView1};
33

44
use rayon::iter::IndexedParallelIterator;
55
use rayon::prelude::*;
6-
use std::sync::{Arc, Mutex};
76

87
// --------------------- WITHOUT X
98

@@ -123,7 +122,7 @@ pub(crate) fn min_max_generic_with_x<T: Copy>(
123122
#[inline(always)]
124123
pub(crate) fn min_max_generic_with_x_parallel<T: Copy + Send + Sync>(
125124
arr: ArrayView1<T>,
126-
bin_idx_iterator: impl IndexedParallelIterator<Item = (usize, usize)>,
125+
bin_idx_iterator: impl IndexedParallelIterator<Item = impl Iterator<Item = (usize, usize)>>,
127126
n_out: usize,
128127
f_argminmax: fn(ArrayView1<T>) -> (usize, usize),
129128
) -> Array1<usize> {
@@ -132,26 +131,30 @@ pub(crate) fn min_max_generic_with_x_parallel<T: Copy + Send + Sync>(
132131
return Array1::from((0..arr.len()).collect::<Vec<usize>>());
133132
}
134133

135-
// Create a mutex to store the sampled indices
136-
let sampled_indices = Arc::new(Mutex::new(Array1::<usize>::default(n_out)));
137-
138-
// Iterate over the bins
139-
bin_idx_iterator.enumerate().for_each(|(i, (start, end))| {
140-
let (min_index, max_index) = f_argminmax(arr.slice(s![start..end]));
141-
142-
// Add the indexes in sorted order
143-
if min_index < max_index {
144-
sampled_indices.lock().unwrap()[2 * i] = min_index + start;
145-
sampled_indices.lock().unwrap()[2 * i + 1] = max_index + start;
146-
} else {
147-
sampled_indices.lock().unwrap()[2 * i] = max_index + start;
148-
sampled_indices.lock().unwrap()[2 * i + 1] = min_index + start;
149-
}
150-
});
151-
152-
// Remove the mutex and return the sampled indices
153-
Arc::try_unwrap(sampled_indices)
154-
.unwrap()
155-
.into_inner()
156-
.unwrap()
134+
Array1::from_vec(
135+
bin_idx_iterator
136+
.flat_map(|bin_idx_iterator| {
137+
bin_idx_iterator
138+
.map(|(start, end)| {
139+
let step = unsafe {
140+
ArrayView1::from_shape_ptr(end - start, arr.as_ptr().add(start))
141+
};
142+
let (min_index, max_index) = f_argminmax(step);
143+
144+
// Add the indexes in sorted order
145+
let mut sampled_index = [0, 0];
146+
if min_index < max_index {
147+
sampled_index[0] = min_index + start;
148+
sampled_index[1] = max_index + start;
149+
} else {
150+
sampled_index[0] = max_index + start;
151+
sampled_index[1] = min_index + start;
152+
}
153+
sampled_index
154+
})
155+
.collect::<Vec<[usize; 2]>>()
156+
})
157+
.flatten()
158+
.collect::<Vec<usize>>(),
159+
)
157160
}

downsample_rs/src/minmax/simd.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ where
5858
Tx: Num + FromPrimitive + AsPrimitive<f64> + Send + Sync,
5959
Ty: Copy + PartialOrd + Send + Sync,
6060
{
61-
assert_eq!(n_out % 2, 0);
61+
assert_eq!(n_out % 2, 0); // TODO can be faster (check last bit)
6262
let bin_idx_iterator = get_equidistant_bin_idx_iterator_parallel(x, n_out / 2);
6363
min_max_generic_with_x_parallel(arr, bin_idx_iterator, n_out, |arr| arr.argminmax())
6464
}

downsample_rs/src/searchsorted.rs

+50-17
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,20 @@ use ndarray::ArrayView1;
22

33
use rayon::iter::IndexedParallelIterator;
44
use rayon::prelude::*;
5+
use std::thread::available_parallelism;
56

67
use super::types::Num;
78
use num_traits::{AsPrimitive, FromPrimitive};
89

910
// ---------------------- Binary search ----------------------
1011

1112
// #[inline(always)]
12-
fn binary_search<T: PartialOrd>(arr: ArrayView1<T>, value: T, left: usize, right: usize) -> usize {
13+
fn binary_search<T: Copy + PartialOrd>(
14+
arr: ArrayView1<T>,
15+
value: T,
16+
left: usize,
17+
right: usize,
18+
) -> usize {
1319
let mut size: usize = right - left;
1420
let mut left: usize = left;
1521
let mut right: usize = right;
@@ -27,7 +33,7 @@ fn binary_search<T: PartialOrd>(arr: ArrayView1<T>, value: T, left: usize, right
2733
}
2834

2935
// #[inline(always)]
30-
fn binary_search_with_mid<T: PartialOrd>(
36+
fn binary_search_with_mid<T: Copy + PartialOrd>(
3137
arr: ArrayView1<T>,
3238
value: T,
3339
left: usize,
@@ -69,17 +75,17 @@ where
6975
(arr[arr.len() - 1].as_() / nb_bins as f64) - (arr[0].as_() / nb_bins as f64);
7076
let idx_step: usize = arr.len() / nb_bins; // used to pre-guess the mid index
7177
let mut value: f64 = arr[0].as_(); // Search value
72-
let mut idx = 0; // Index of the search value
78+
let mut idx: usize = 0; // Index of the search value
7379
(0..nb_bins).map(move |_| {
74-
let start_idx = idx; // Start index of the bin (previous end index)
80+
let start_idx: usize = idx; // Start index of the bin (previous end index)
7581
value += val_step;
76-
let mid = idx + idx_step;
82+
let mid: usize = idx + idx_step;
7783
let mid = if mid < arr.len() - 1 {
7884
mid
7985
} else {
8086
arr.len() - 2 // TODO: arr.len() - 1 gives error I thought...
8187
};
82-
let search_value = T::from_f64(value).unwrap();
88+
let search_value: T = T::from_f64(value).unwrap();
8389
// Implementation WITHOUT pre-guessing mid is slower!!
8490
// idx = binary_search(arr, search_value, idx, arr.len()-1);
8591
idx = binary_search_with_mid(arr, search_value, idx, arr.len() - 1, mid); // End index of the bin
@@ -102,7 +108,7 @@ fn sequential_add_mul(start_val: f64, add_val: f64, mul: usize) -> f64 {
102108
pub(crate) fn get_equidistant_bin_idx_iterator_parallel<T>(
103109
arr: ArrayView1<T>,
104110
nb_bins: usize,
105-
) -> impl IndexedParallelIterator<Item = (usize, usize)> + '_
111+
) -> impl IndexedParallelIterator<Item = impl Iterator<Item = (usize, usize)> + '_> + '_
106112
where
107113
T: Num + FromPrimitive + AsPrimitive<f64> + Sync + Send,
108114
{
@@ -111,14 +117,35 @@ where
111117
let val_step: f64 =
112118
(arr[arr.len() - 1].as_() / nb_bins as f64) - (arr[0].as_() / nb_bins as f64);
113119
let arr0: f64 = arr[0].as_();
114-
(0..nb_bins).into_par_iter().map(move |i| {
115-
let start_value = sequential_add_mul(arr0, val_step, i);
116-
let end_value = start_value + val_step;
117-
let start_value = T::from_f64(start_value).unwrap();
118-
let end_value = T::from_f64(end_value).unwrap();
119-
let start_idx = binary_search(arr, start_value, 0, arr.len() - 1);
120-
let end_idx = binary_search(arr, end_value, start_idx, arr.len() - 1);
121-
(start_idx, end_idx)
120+
let nb_threads = available_parallelism().map(|x| x.get()).unwrap_or(1);
121+
let nb_threads = if nb_threads > nb_bins {
122+
nb_bins
123+
} else {
124+
nb_threads
125+
};
126+
let nb_bins_per_thread = nb_bins / nb_threads;
127+
let nb_bins_last_thread = nb_bins - nb_bins_per_thread * (nb_threads - 1);
128+
// Iterate over the number of threads
129+
// -> for each thread perform the binary search sorted with moving left and
130+
// yield the indices (using the same idea as for the sequential version)
131+
(0..nb_threads).into_par_iter().map(move |i| {
132+
// Search the start of the fist bin o(f the thread)
133+
let mut value: f64 = sequential_add_mul(arr0, val_step, i * nb_bins_per_thread); // Search value
134+
let start_value: T = T::from_f64(value).unwrap();
135+
let mut idx: usize = binary_search(arr, start_value, 0, arr.len() - 1); // Index of the search value
136+
let nb_bins_thread = if i == nb_threads - 1 {
137+
nb_bins_last_thread
138+
} else {
139+
nb_bins_per_thread
140+
};
141+
// Perform sequential binary search for the end of the bins (of the thread)
142+
(0..nb_bins_thread).map(move |_| {
143+
let start_idx: usize = idx; // Start index of the bin (previous end index)
144+
value += val_step;
145+
let search_value: T = T::from_f64(value).unwrap();
146+
idx = binary_search(arr, search_value, idx, arr.len() - 1); // End index of the bin
147+
(start_idx, idx)
148+
})
122149
})
123150
}
124151

@@ -207,7 +234,10 @@ mod tests {
207234
let bin_idxs = bin_idxs_iter.map(|x| x.0).collect::<Vec<usize>>();
208235
assert_eq!(bin_idxs, vec![0, 3, 6]);
209236
let bin_idxs_iter = get_equidistant_bin_idx_iterator_parallel(arr.view(), 3);
210-
let bin_idxs = bin_idxs_iter.map(|x| x.0).collect::<Vec<usize>>();
237+
let bin_idxs = bin_idxs_iter
238+
.map(|x| x.map(|x| x.0).collect::<Vec<usize>>())
239+
.flatten()
240+
.collect::<Vec<usize>>();
211241
assert_eq!(bin_idxs, vec![0, 3, 6]);
212242
}
213243

@@ -225,7 +255,10 @@ mod tests {
225255
let bin_idxs_iter = get_equidistant_bin_idx_iterator(arr.view(), nb_bins);
226256
let bin_idxs = bin_idxs_iter.map(|x| x.0).collect::<Vec<usize>>();
227257
let bin_idxs_iter = get_equidistant_bin_idx_iterator_parallel(arr.view(), nb_bins);
228-
let bin_idxs_parallel = bin_idxs_iter.map(|x| x.0).collect::<Vec<usize>>();
258+
let bin_idxs_parallel = bin_idxs_iter
259+
.map(|x| x.map(|x| x.0).collect::<Vec<usize>>())
260+
.flatten()
261+
.collect::<Vec<usize>>();
229262
assert_eq!(bin_idxs, bin_idxs_parallel);
230263
}
231264
}

pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ build-backend = "maturin"
55
[project]
66
name = "tsdownsample"
77
description = "Extremely fast time series downsampling in Rust"
8-
version = "0.1.0a7"
8+
version = "0.1.0"
99
requires-python = ">=3.7"
1010
dependencies = ["numpy"]
1111
authors = [{name = "Jeroen Van Der Donckt"}]

tsdownsample/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
MinMaxLTTBDownsampler,
99
)
1010

11-
__version__ = "0.1.0a7"
11+
__version__ = "0.1.0"
1212
__author__ = "Jeroen Van Der Donckt"
1313

1414
__all__ = [

0 commit comments

Comments
 (0)