diff --git a/.github/workflows/ci-tsdownsample.yml b/.github/workflows/ci-tsdownsample.yml index 8261300..9a13c06 100644 --- a/.github/workflows/ci-tsdownsample.yml +++ b/.github/workflows/ci-tsdownsample.yml @@ -46,7 +46,7 @@ jobs: matrix: os: ['windows-latest', 'macOS-latest', 'ubuntu-latest'] rust: ['nightly'] # ['stable', 'beta'] - python-version: ['3.7', '3.8', '3.9', '3.10', '3.11'] + python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', '3.12'] env: PYTHON: ${{ matrix.python-version }} @@ -155,7 +155,7 @@ jobs: target: ${{ matrix.target }} manylinux: ${{ matrix.manylinux || 'auto' }} container: ${{ matrix.container }} - args: --release --out dist --interpreter ${{ matrix.interpreter || '3.7 3.8 3.9 3.10 3.11' }} + args: --release --out dist --interpreter ${{ matrix.interpreter || '3.7 3.8 3.9 3.10 3.11 3.12' }} - run: ${{ matrix.ls || 'ls -lh' }} dist/ diff --git a/.github/workflows/codspeed.yml b/.github/workflows/codspeed.yml index c9e31cb..7475832 100644 --- a/.github/workflows/codspeed.yml +++ b/.github/workflows/codspeed.yml @@ -43,6 +43,6 @@ jobs: # - run: rm tests/__init__.py - name: Run CodSpeed benchmarks - uses: CodSpeedHQ/action@v1 + uses: CodSpeedHQ/action@v2 with: run: pytest tests/benchmarks/ --codspeed diff --git a/Cargo.toml b/Cargo.toml index bc18a23..184e8c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,10 +9,10 @@ license = "MIT" [dependencies] downsample_rs = { path = "downsample_rs", features = ["half"]} -pyo3 = { version = "0.19", features = ["extension-module"] } -numpy = { version = "0.19", features = ["half"] } -half = { version = "2.1", default-features = false } -paste = { version = "1.0.9", default-features = false } +pyo3 = { version = "0.20", features = ["extension-module"] } +numpy = { version = "0.20", features = ["half"] } +half = { version = "2.3.1", default-features = false } +paste = { version = "1.0.14", default-features = false } [lib] name = "tsdownsample" diff --git a/README.md b/README.md index 5b484b4..e3e55f2 100644 --- a/README.md +++ b/README.md @@ -89,9 +89,10 @@ downsample([x], y, n_out, **kwargs) -> ndarray[uint64] - `x` is optional - `x` and `y` are both positional arguments -- `n_out` is a mandatory keyword argument that defines the number of output values\* -- `**kwargs` are optional keyword arguments _(see [table below](#downsampling-algorithms-📈))_: - - `n_threads`: how many threads to use for multi-threading (default `1`, so no multi-threading) +- `n_out` is a mandatory keyword argument that defines the number of output values* +- `**kwargs` are optional keyword arguments *(see [table below](#downsampling-algorithms-📈))*: + - `parallel`: whether to use multi-threading (default: `False`) + ❗ The max number of threads can be configured with the `TSDOWNSAMPLE_MAX_THREADS` ENV var (e.g. `os.environ["TSDOWNSAMPLE_MAX_THREADS"] = "4"`) - ... **Returns**: a `ndarray[uint64]` of indices that can be used to index the original data. @@ -102,13 +103,12 @@ downsample([x], y, n_out, **kwargs) -> ndarray[uint64] The following downsampling algorithms (classes) are implemented: -| Downsampler | Description | `**kwargs` | -| ----------------------: | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------------------------------- | -| `MinMaxDownsampler` | selects the **min and max** value in each bin | `n_threads` | -| `M4Downsampler` | selects the [**min, max, first and last**](https://dl.acm.org/doi/pdf/10.14778/2732951.2732953) value in each bin | `n_threads` | -| `LTTBDownsampler` | performs the [**Largest Triangle Three Buckets**](https://skemman.is/bitstream/1946/15343/3/SS_MSthesis.pdf) algorithm | `n_threads` | -| `MinMaxLTTBDownsampler` | (_new two-step algorithm 🎉_) first selects `n_out` \* `minmax_ratio` **min and max** values, then further reduces these to `n_out` values using the **Largest Triangle Three Buckets** algorithm | `n_threads`, `minmax_ratio`\* | - +| Downsampler | Description | `**kwargs` | +| ---:| --- |--- | +| `MinMaxDownsampler` | selects the **min and max** value in each bin | `parallel` | +| `M4Downsampler` | selects the [**min, max, first and last**](https://dl.acm.org/doi/pdf/10.14778/2732951.2732953) value in each bin | `parallel` | +| `LTTBDownsampler` | performs the [**Largest Triangle Three Buckets**](https://skemman.is/bitstream/1946/15343/3/SS_MSthesis.pdf) algorithm | `parallel` | +| `MinMaxLTTBDownsampler` | (*new two-step algorithm 🎉*) first selects `n_out` * `minmax_ratio` **min and max** values, then further reduces these to `n_out` values using the **Largest Triangle Three Buckets** algorithm | `parallel`, `minmax_ratio`* | *Default value for `minmax_ratio` is 4, which is empirically proven to be a good default. More details here: https://arxiv.org/abs/2305.00332 diff --git a/downsample_rs/Cargo.toml b/downsample_rs/Cargo.toml index 9c52eb8..107469b 100644 --- a/downsample_rs/Cargo.toml +++ b/downsample_rs/Cargo.toml @@ -9,15 +9,15 @@ license = "MIT" [dependencies] # TODO: perhaps use polars? argminmax = { version = "0.6.1", features = ["half"] } -# argminmax = { path = "../../argminmax" , features = ["half", "ndarray"] } -half = { version = "2.1", default-features = false , features=["num-traits"], optional = true} -num-traits = { version = "0.2.15", default-features = false } -rayon = { version = "1.6.0", default-features = false } +half = { version = "2.3.1", default-features = false , features=["num-traits"], optional = true} +num-traits = { version = "0.2.17", default-features = false } +once_cell = "1" +rayon = { version = "1.8.0", default-features = false } [dev-dependencies] -rstest = { version = "0.18.1", default-features = false } +rstest = { version = "0.18.2", default-features = false } rstest_reuse = { version = "0.6", default-features = false } -criterion = "0.4.0" +criterion = "0.5.1" dev_utils = { path = "dev_utils" } [[bench]] diff --git a/downsample_rs/benches/bench_m4.rs b/downsample_rs/benches/bench_m4.rs index c40df6a..2aa1732 100644 --- a/downsample_rs/benches/bench_m4.rs +++ b/downsample_rs/benches/bench_m4.rs @@ -14,15 +14,8 @@ fn m4_f32_random_array_long_single_core(c: &mut Criterion) { fn m4_f32_random_array_long_multi_core(c: &mut Criterion) { let n = config::ARRAY_LENGTH_LONG; let data = utils::get_random_array::(n, f32::MIN, f32::MAX); - let all_threads: usize = utils::get_all_threads(); c.bench_function("m4_p_f32", |b| { - b.iter(|| { - m4_mod::m4_without_x_parallel( - black_box(data.as_slice()), - black_box(2_000), - black_box(all_threads), - ) - }) + b.iter(|| m4_mod::m4_without_x_parallel(black_box(data.as_slice()), black_box(2_000))) }); } @@ -48,15 +41,8 @@ fn m4_f32_random_array_50M_multi_core(c: &mut Criterion) { let n = 50_000_000; let data = utils::get_random_array::(n, f32::MIN, f32::MAX); let x = (0..n).map(|i| i as i32).collect::>(); - let all_threads: usize = utils::get_all_threads(); c.bench_function("m4_p_50M_f32", |b| { - b.iter(|| { - m4_mod::m4_without_x_parallel( - black_box(data.as_slice()), - black_box(2_000), - black_box(all_threads), - ) - }) + b.iter(|| m4_mod::m4_without_x_parallel(black_box(data.as_slice()), black_box(2_000))) }); c.bench_function("m4_x_p_50M_f32", |b| { b.iter(|| { @@ -64,7 +50,6 @@ fn m4_f32_random_array_50M_multi_core(c: &mut Criterion) { black_box(x.as_slice()), black_box(data.as_slice()), black_box(2_000), - black_box(all_threads), ) }) }); diff --git a/downsample_rs/benches/bench_minmax.rs b/downsample_rs/benches/bench_minmax.rs index 599cdbd..97bf677 100644 --- a/downsample_rs/benches/bench_minmax.rs +++ b/downsample_rs/benches/bench_minmax.rs @@ -14,14 +14,9 @@ fn minmax_f32_random_array_long_single_core(c: &mut Criterion) { fn minmax_f32_random_array_long_multi_core(c: &mut Criterion) { let n = config::ARRAY_LENGTH_LONG; let data = utils::get_random_array::(n, f32::MIN, f32::MAX); - let all_threads: usize = utils::get_all_threads(); c.bench_function("minmax_p_f32", |b| { b.iter(|| { - minmax_mod::min_max_without_x_parallel( - black_box(data.as_slice()), - black_box(2_000), - black_box(all_threads), - ) + minmax_mod::min_max_without_x_parallel(black_box(data.as_slice()), black_box(2_000)) }) }); } @@ -55,14 +50,9 @@ fn minmax_f32_random_array_50M_long_multi_core(c: &mut Criterion) { let n = 50_000_000; let data = utils::get_random_array::(n, f32::MIN, f32::MAX); let x = (0..n).map(|i| i as i32).collect::>(); - let all_threads: usize = utils::get_all_threads(); c.bench_function("minmax_p_50M_f32", |b| { b.iter(|| { - minmax_mod::min_max_without_x_parallel( - black_box(data.as_slice()), - black_box(2_000), - black_box(all_threads), - ) + minmax_mod::min_max_without_x_parallel(black_box(data.as_slice()), black_box(2_000)) }) }); c.bench_function("minmax_x_p_50M_f32", |b| { @@ -71,7 +61,6 @@ fn minmax_f32_random_array_50M_long_multi_core(c: &mut Criterion) { black_box(x.as_slice()), black_box(data.as_slice()), black_box(2_000), - black_box(all_threads), ) }) }); diff --git a/downsample_rs/benches/bench_minmaxlttb.rs b/downsample_rs/benches/bench_minmaxlttb.rs index a0241de..bb78e09 100644 --- a/downsample_rs/benches/bench_minmaxlttb.rs +++ b/downsample_rs/benches/bench_minmaxlttb.rs @@ -25,7 +25,6 @@ fn minmaxlttb_f32_random_array_long_multi_core(c: &mut Criterion) { let n = config::ARRAY_LENGTH_LONG; let x = (0..n).map(|i| i as i32).collect::>(); let y = utils::get_random_array::(n, f32::MIN, f32::MAX); - let all_threads: usize = utils::get_all_threads(); c.bench_function("mlttb_x_p_f32", |b| { b.iter(|| { minmaxlttb_mod::minmaxlttb_with_x_parallel( @@ -33,7 +32,6 @@ fn minmaxlttb_f32_random_array_long_multi_core(c: &mut Criterion) { black_box(y.as_slice()), black_box(2_000), black_box(MINMAX_RATIO), - black_box(all_threads), ) }) }); @@ -59,7 +57,6 @@ fn minmaxlttb_f32_random_array_50M_multi_core(c: &mut Criterion) { let n = 50_000_000; let x = (0..n).map(|i| i as i32).collect::>(); let y = utils::get_random_array::(n, f32::MIN, f32::MAX); - let all_threads: usize = utils::get_all_threads(); c.bench_function("mlttb_x_p_50M_f32", |b| { b.iter(|| { minmaxlttb_mod::minmaxlttb_with_x_parallel( @@ -67,7 +64,6 @@ fn minmaxlttb_f32_random_array_50M_multi_core(c: &mut Criterion) { black_box(y.as_slice()), black_box(2_000), black_box(MINMAX_RATIO), - black_box(all_threads), ) }) }); @@ -90,14 +86,12 @@ fn minmaxlttb_without_x_f32_random_array_50M_single_core(c: &mut Criterion) { fn minmaxlttb_without_x_f32_random_array_50M_multi_core(c: &mut Criterion) { let n = 50_000_000; let y = utils::get_random_array::(n, f32::MIN, f32::MAX); - let all_threads: usize = utils::get_all_threads(); c.bench_function("mlttb_p_50M_f32", |b| { b.iter(|| { minmaxlttb_mod::minmaxlttb_without_x_parallel( black_box(y.as_slice()), black_box(2_000), black_box(MINMAX_RATIO), - black_box(all_threads), ) }) }); diff --git a/downsample_rs/dev_utils/src/utils.rs b/downsample_rs/dev_utils/src/utils.rs index f3413ae..975fa65 100644 --- a/downsample_rs/dev_utils/src/utils.rs +++ b/downsample_rs/dev_utils/src/utils.rs @@ -33,11 +33,3 @@ where } arr } - -// ------------- Multi-threading ------------- - -use std::thread::available_parallelism; - -pub fn get_all_threads() -> usize { - available_parallelism().map(|x| x.get()).unwrap_or(1) -} diff --git a/downsample_rs/src/lib.rs b/downsample_rs/src/lib.rs index f678a45..409294d 100644 --- a/downsample_rs/src/lib.rs +++ b/downsample_rs/src/lib.rs @@ -14,3 +14,22 @@ pub use m4::*; pub(crate) mod helpers; pub(crate) mod searchsorted; pub(crate) mod types; + +use once_cell::sync::Lazy; +use rayon::{ThreadPool, ThreadPoolBuilder}; + +// Inspired by: https://github.com/pola-rs/polars/blob/9a69062aa0beb2a1bc5d57294cac49961fc91058/crates/polars-core/src/lib.rs#L49 +pub static POOL: Lazy = Lazy::new(|| { + ThreadPoolBuilder::new() + .num_threads( + std::env::var("TSDOWNSAMPLE_MAX_THREADS") + .map(|s| s.parse::().expect("integer")) + .unwrap_or_else(|_| { + std::thread::available_parallelism() + .unwrap_or(std::num::NonZeroUsize::new(1).unwrap()) + .get() + }), + ) + .build() + .expect("could not spawn threads") +}); diff --git a/downsample_rs/src/m4.rs b/downsample_rs/src/m4.rs index d64e33b..71d5689 100644 --- a/downsample_rs/src/m4.rs +++ b/downsample_rs/src/m4.rs @@ -3,10 +3,11 @@ use num_traits::{AsPrimitive, FromPrimitive}; use rayon::iter::IndexedParallelIterator; use rayon::prelude::*; -use crate::searchsorted::{ +use super::searchsorted::{ get_equidistant_bin_idx_iterator, get_equidistant_bin_idx_iterator_parallel, }; -use crate::types::Num; +use super::types::Num; +use super::POOL; // ----------------------------------- NON-PARALLEL ------------------------------------ @@ -51,25 +52,15 @@ m4_without_x!(m4_without_x_nan, NaNArgMinMax, |arr| arr.nanargminmax()); // ----------- WITH X -macro_rules! m4_with_x_parallel { - ($func_name:ident, $trait:path, $func:expr) => { - pub fn $func_name( - x: &[Tx], - arr: &[Ty], - n_out: usize, - n_threads: usize, - ) -> Vec - where - for<'a> &'a [Ty]: $trait, - Tx: Num + FromPrimitive + AsPrimitive + Send + Sync, - Ty: Copy + PartialOrd + Send + Sync, - { - assert_eq!(n_out % 4, 0); - let bin_idx_iterator = - get_equidistant_bin_idx_iterator_parallel(x, n_out / 4, n_threads); - m4_generic_with_x_parallel(arr, bin_idx_iterator, n_out, n_threads, $func) - } - }; +pub fn m4_with_x_parallel(x: &[Tx], arr: &[Ty], n_out: usize) -> Vec +where + for<'a> &'a [Ty]: ArgMinMax, + Tx: Num + FromPrimitive + AsPrimitive + Send + Sync, + Ty: Copy + PartialOrd + Send + Sync, +{ + assert_eq!(n_out % 4, 0); + let bin_idx_iterator = get_equidistant_bin_idx_iterator_parallel(x, n_out / 4); + m4_generic_with_x_parallel(arr, bin_idx_iterator, n_out, |arr| arr.argminmax()) } m4_with_x_parallel!(m4_with_x_parallel, ArgMinMax, |arr| arr.argminmax()); @@ -78,20 +69,15 @@ m4_with_x_parallel!(m4_with_x_parallel_nan, NaNArgMinMax, |arr| arr // ----------- WITHOUT X -macro_rules! m4_without_x_parallel { - ($func_name:ident, $trait:path, $func:expr) => { - pub fn $func_name( - arr: &[T], - n_out: usize, - n_threads: usize, - ) -> Vec - where - for<'a> &'a [T]: $trait, - { - assert_eq!(n_out % 4, 0); - m4_generic_parallel(arr, n_out, n_threads, $func) - } - }; +pub fn m4_without_x_parallel( + arr: &[T], + n_out: usize, +) -> Vec +where + for<'a> &'a [T]: ArgMinMax, +{ + assert_eq!(n_out % 4, 0); + m4_generic_parallel(arr, n_out, |arr| arr.argminmax()) } m4_without_x_parallel!(m4_without_x_parallel, ArgMinMax, |arr| arr.argminmax()); @@ -154,7 +140,6 @@ pub(crate) fn m4_generic( pub(crate) fn m4_generic_parallel( arr: &[T], n_out: usize, - n_threads: usize, f_argminmax: fn(&[T]) -> (usize, usize), ) -> Vec { // Assumes n_out is a multiple of 4 @@ -166,39 +151,34 @@ pub(crate) fn m4_generic_parallel( let block_size: f64 = (arr.len() - 1) as f64 / (n_out / 4) as f64; // Store the enumerated indexes in the output array + // These indexes are used to calculate the start and end indexes of each bin in + // the multi-threaded execution let mut sampled_indices: Vec = (0..n_out).collect::>(); - // to limit the amounts of threads Rayon uses, an explicit threadpool needs to be created - // in which the required code is "installed". This limits the amount of used threads. - // https://docs.rs/rayon/latest/rayon/struct.ThreadPool.html#method.install - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(n_threads) - .build(); - - let func = || { - for chunk in sampled_indices.chunks_exact_mut(4) { - let i: f64 = unsafe { *chunk.get_unchecked(0) >> 2 } as f64; - let start_idx: usize = (block_size * i) as usize + (i != 0.0) as usize; - let end_idx: usize = (block_size * (i + 1.0)) as usize + 1; - - let (min_index, max_index) = f_argminmax(&arr[start_idx..end_idx]); - - chunk[0] = start_idx; - // Add the indexes in sorted order - if min_index < max_index { - chunk[1] = min_index + start_idx; - chunk[2] = max_index + start_idx; - } else { - chunk[1] = max_index + start_idx; - chunk[2] = min_index + start_idx; - } - chunk[3] = end_idx - 1; - } - }; + POOL.install(|| { + sampled_indices + .par_chunks_exact_mut(4) + .for_each(|sampled_index_chunk| { + let i: f64 = unsafe { *sampled_index_chunk.get_unchecked(0) >> 2 } as f64; + let start_idx: usize = (block_size * i) as usize + (i != 0.0) as usize; + let end_idx: usize = (block_size * (i + 1.0)) as usize + 1; - pool.unwrap().install(func); // allow panic if pool could not be created + let (min_index, max_index) = f_argminmax(&arr[start_idx..end_idx]); - sampled_indices.to_vec() + sampled_index_chunk[0] = start_idx; + // Add the indexes in sorted order + if min_index < max_index { + sampled_index_chunk[1] = min_index + start_idx; + sampled_index_chunk[2] = max_index + start_idx; + } else { + sampled_index_chunk[1] = max_index + start_idx; + sampled_index_chunk[2] = min_index + start_idx; + } + sampled_index_chunk[3] = end_idx - 1; + }) + }); + + sampled_indices } // --------------------- WITH X @@ -253,7 +233,6 @@ pub(crate) fn m4_generic_with_x_parallel( arr: &[T], bin_idx_iterator: impl IndexedParallelIterator>>, n_out: usize, - n_threads: usize, f_argminmax: fn(&[T]) -> (usize, usize), ) -> Vec { // Assumes n_out is a multiple of 4 @@ -261,14 +240,7 @@ pub(crate) fn m4_generic_with_x_parallel( return (0..arr.len()).collect::>(); } - // to limit the amounts of threads Rayon uses, an explicit threadpool needs to be created - // in which the required code is "installed". This limits the amount of used threads. - // https://docs.rs/rayon/latest/rayon/struct.ThreadPool.html#method.install - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(n_threads) - .build(); - - let iter_func = || { + POOL.install(|| { bin_idx_iterator .flat_map(|bin_idx_iterator| { bin_idx_iterator @@ -304,10 +276,7 @@ pub(crate) fn m4_generic_with_x_parallel( }) .flatten() .collect::>() - }; - - let result = pool.unwrap().install(iter_func); // allow panic if pool could not be created - result + }) } #[cfg(test)] @@ -325,14 +294,13 @@ mod tests { utils::get_random_array(n, f32::MIN, f32::MAX) } - // Template for the n_threads matrix + // Template for n_out #[template] #[rstest] - #[case(1)] - #[case(utils::get_all_threads() / 2)] - #[case(utils::get_all_threads())] - #[case(utils::get_all_threads() * 2)] - fn threads(#[case] n_threads: usize) {} + #[case(196)] + #[case(200)] + #[case(204)] + fn n_outs(#[case] n_out: usize) {} #[test] fn test_m4_scalar_without_x_correct() { @@ -354,11 +322,11 @@ mod tests { assert_eq!(sampled_values, expected_values); } - #[apply(threads)] - fn test_m4_scalar_without_x_parallel_correct(n_threads: usize) { + #[test] + fn test_m4_scalar_without_x_parallel_correct() { let arr: [f32; 100] = core::array::from_fn(|i| i.as_()); - let sampled_indices = m4_without_x_parallel(&arr, 12, n_threads); + let sampled_indices = m4_without_x_parallel(&arr, 12); let sampled_values = sampled_indices .iter() .map(|x| arr[*x]) @@ -395,12 +363,12 @@ mod tests { assert_eq!(sampled_values, expected_values); } - #[apply(threads)] - fn test_m4_scalar_with_x_parallel_correct(n_threads: usize) { + #[test] + fn test_m4_scalar_with_x_parallel_correct() { let x: [i32; 100] = core::array::from_fn(|i| i.as_()); let arr: [f32; 100] = core::array::from_fn(|i| i.as_()); - let sampled_indices = m4_with_x_parallel(&x, &arr, 12, n_threads); + let sampled_indices = m4_with_x_parallel(&x, &arr, 12); let sampled_values = sampled_indices .iter() .map(|x| arr[*x]) @@ -439,14 +407,14 @@ mod tests { assert_eq!(sampled_indices, expected_indices); } - #[apply(threads)] - fn test_m4_scalar_with_x_gap_parallel(n_threads: usize) { + #[test] + fn test_m4_scalar_with_x_gap_parallel() { // We will create a gap in the middle of the array // Increment the second half of the array by 50 let x: [i32; 100] = core::array::from_fn(|i| if i > 50 { (i + 50).as_() } else { i.as_() }); let arr: [f32; 100] = core::array::from_fn(|i| i.as_()); - let sampled_indices = m4_with_x_parallel(&x, &arr, 20, n_threads); + let sampled_indices = m4_with_x_parallel(&x, &arr, 20); assert_eq!(sampled_indices.len(), 16); // One full gap let expected_indices = vec![0, 0, 29, 29, 30, 30, 50, 50, 51, 51, 69, 69, 70, 70, 99, 99]; assert_eq!(sampled_indices, expected_indices); @@ -454,7 +422,7 @@ mod tests { // Increment the second half of the array by 50 again let x = x.map(|x| if x > 101 { x + 50 } else { x }); - let sampled_indices = m4_with_x_parallel(&x, &arr, 20, n_threads); + let sampled_indices = m4_with_x_parallel(&x, &arr, 20); assert_eq!(sampled_indices.len(), 17); // Gap with 1 value let expected_indices = vec![ 0, 0, 39, 39, 40, 40, 50, 50, 51, 52, 52, 59, 59, 60, 60, 99, 99, @@ -462,20 +430,19 @@ mod tests { assert_eq!(sampled_indices, expected_indices); } - #[apply(threads)] - fn test_many_random_runs_correct(n_threads: usize) { + #[apply(n_outs)] + fn test_many_random_runs_correct(n_out: usize) { const N: usize = 20_003; - const N_OUT: usize = 204; let x: [i32; N] = core::array::from_fn(|i| i.as_()); for _ in 0..100 { let arr = get_array_f32(N); - let idxs1 = m4_without_x(arr.as_slice(), N_OUT); - let idxs2 = m4_with_x(&x, arr.as_slice(), N_OUT); + let idxs1 = m4_without_x(arr.as_slice(), n_out); + let idxs2 = m4_with_x(&x, arr.as_slice(), n_out); assert_eq!(idxs1, idxs2); - let idxs3 = m4_without_x_parallel(arr.as_slice(), N_OUT, n_threads); - let idxs4 = m4_with_x_parallel(&x, arr.as_slice(), N_OUT, n_threads); + let idxs3 = m4_without_x_parallel(arr.as_slice(), n_out); + let idxs4 = m4_with_x_parallel(&x, arr.as_slice(), n_out); assert_eq!(idxs1, idxs3); - assert_eq!(idxs1, idxs4); // TODO: this should not fail when n_threads = 16 + assert_eq!(idxs1, idxs4); // TODO: this fails when nb. of threads = 16 } } } diff --git a/downsample_rs/src/minmax.rs b/downsample_rs/src/minmax.rs index 840e26f..dc50849 100644 --- a/downsample_rs/src/minmax.rs +++ b/downsample_rs/src/minmax.rs @@ -8,6 +8,7 @@ use super::searchsorted::{ get_equidistant_bin_idx_iterator, get_equidistant_bin_idx_iterator_parallel, }; use super::types::Num; +use super::POOL; // ----------------------------------- NON-PARALLEL ------------------------------------ @@ -53,25 +54,15 @@ min_max_without_x!(min_max_without_x_nan, NaNArgMinMax, |arr| arr // ----------- WITH X -macro_rules! min_max_with_x_parallel { - ($func_name:ident, $trait:path, $func:expr) => { - pub fn $func_name( - x: &[Tx], - arr: &[Ty], - n_out: usize, - n_threads: usize, - ) -> Vec - where - for<'a> &'a [Ty]: $trait, - Tx: Num + FromPrimitive + AsPrimitive + Send + Sync, - Ty: Copy + PartialOrd + Send + Sync, - { - assert_eq!(n_out % 2, 0); - let bin_idx_iterator = - get_equidistant_bin_idx_iterator_parallel(x, n_out / 2, n_threads); - min_max_generic_with_x_parallel(arr, bin_idx_iterator, n_out, n_threads, $func) - } - }; +pub fn min_max_with_x_parallel(x: &[Tx], arr: &[Ty], n_out: usize) -> Vec +where + for<'a> &'a [Ty]: ArgMinMax, + Tx: Num + FromPrimitive + AsPrimitive + Send + Sync, + Ty: Copy + PartialOrd + Send + Sync, +{ + assert_eq!(n_out % 2, 0); + let bin_idx_iterator = get_equidistant_bin_idx_iterator_parallel(x, n_out / 2); + min_max_generic_with_x_parallel(arr, bin_idx_iterator, n_out, |arr| arr.argminmax()) } min_max_with_x_parallel!(min_max_with_x_parallel, ArgMinMax, |arr| arr.argminmax()); @@ -80,20 +71,15 @@ min_max_with_x_parallel!(min_max_with_x_parallel_nan, NaNArgMinMax, |arr| arr // ----------- WITHOUT X -macro_rules! min_max_without_x_parallel { - ($func_name:ident, $trait:path, $func:expr) => { - pub fn $func_name( - arr: &[T], - n_out: usize, - n_threads: usize, - ) -> Vec - where - for<'a> &'a [T]: $trait, - { - assert_eq!(n_out % 2, 0); - min_max_generic_parallel(arr, n_out, n_threads, $func) - } - }; +pub fn min_max_without_x_parallel( + arr: &[T], + n_out: usize, +) -> Vec +where + for<'a> &'a [T]: ArgMinMax, +{ + assert_eq!(n_out % 2, 0); + min_max_generic_parallel(arr, n_out, |arr| arr.argminmax()) } min_max_without_x_parallel!(min_max_without_x_parallel, ArgMinMax, |arr| arr.argminmax()); @@ -148,7 +134,6 @@ pub(crate) fn min_max_generic( pub(crate) fn min_max_generic_parallel( arr: &[T], n_out: usize, - n_threads: usize, f_argminmax: fn(&[T]) -> (usize, usize), ) -> Vec { // Assumes n_out is a multiple of 2 @@ -160,38 +145,32 @@ pub(crate) fn min_max_generic_parallel( let block_size: f64 = (arr.len() - 1) as f64 / (n_out / 2) as f64; // Store the enumerated indexes in the output array - // let mut sampled_indices: Array1 = Array1::from_vec((0..n_out).collect::>()); + // These indexes are used to calculate the start and end indexes of each bin in + // the multi-threaded execution let mut sampled_indices: Vec = (0..n_out).collect::>(); - // to limit the amounts of threads Rayon uses, an explicit threadpool needs to be created - // in which the required code is "installed". This limits the amount of used threads. - // https://docs.rs/rayon/latest/rayon/struct.ThreadPool.html#method.install - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(n_threads) - .build(); - - let func = || { - for chunk in sampled_indices.chunks_exact_mut(2) { - let i: f64 = unsafe { *chunk.get_unchecked(0) >> 1 } as f64; - let start_idx: usize = (block_size * i) as usize + (i != 0.0) as usize; - let end_idx: usize = (block_size * (i + 1.0)) as usize + 1; - - let (min_index, max_index) = f_argminmax(&arr[start_idx..end_idx]); - - // Add the indexes in sorted order - if min_index < max_index { - chunk[0] = min_index + start_idx; - chunk[1] = max_index + start_idx; - } else { - chunk[0] = max_index + start_idx; - chunk[1] = min_index + start_idx; - } - } - }; + POOL.install(|| { + sampled_indices + .par_chunks_exact_mut(2) + .for_each(|sampled_index_chunk| { + let i: f64 = unsafe { *sampled_index_chunk.get_unchecked(0) >> 1 } as f64; + let start_idx: usize = (block_size * i) as usize + (i != 0.0) as usize; + let end_idx: usize = (block_size * (i + 1.0)) as usize + 1; - pool.unwrap().install(func); // allow panic if pool could not be created + let (min_index, max_index) = f_argminmax(&arr[start_idx..end_idx]); - sampled_indices.to_vec() + // Add the indexes in sorted order + if min_index < max_index { + sampled_index_chunk[0] = min_index + start_idx; + sampled_index_chunk[1] = max_index + start_idx; + } else { + sampled_index_chunk[0] = max_index + start_idx; + sampled_index_chunk[1] = min_index + start_idx; + } + }) + }); + + sampled_indices } // --------------------- WITH X @@ -242,7 +221,6 @@ pub(crate) fn min_max_generic_with_x_parallel( arr: &[T], bin_idx_iterator: impl IndexedParallelIterator>>, n_out: usize, - n_threads: usize, f_argminmax: fn(&[T]) -> (usize, usize), ) -> Vec { // Assumes n_out is a multiple of 2 @@ -250,14 +228,7 @@ pub(crate) fn min_max_generic_with_x_parallel( return (0..arr.len()).collect::>(); } - // to limit the amounts of threads Rayon uses, an explicit threadpool needs to be created - // in which the required code is "installed". This limits the amount of used threads. - // https://docs.rs/rayon/latest/rayon/struct.ThreadPool.html#method.install - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(n_threads) - .build(); - - let iter_func = || { + POOL.install(|| { bin_idx_iterator .flat_map(|bin_idx_iterator| { bin_idx_iterator @@ -289,9 +260,7 @@ pub(crate) fn min_max_generic_with_x_parallel( }) .flatten() .collect::>() - }; - - pool.unwrap().install(iter_func) // allow panic if pool could not be created + }) } #[cfg(test)] @@ -309,14 +278,13 @@ mod tests { utils::get_random_array(n, f32::MIN, f32::MAX) } - // Template for the n_threads matrix + // Template for n_out #[template] #[rstest] - #[case(1)] - #[case(utils::get_all_threads() / 2)] - #[case(utils::get_all_threads())] - #[case(utils::get_all_threads() * 2)] - fn threads(#[case] n_threads: usize) {} + #[case(198)] + #[case(200)] + #[case(202)] + fn n_outs(#[case] n_out: usize) {} #[test] fn test_min_max_scalar_without_x_correct() { @@ -338,11 +306,11 @@ mod tests { assert_eq!(sampled_values, expected_values); } - #[apply(threads)] - fn test_min_max_scalar_without_x_parallel_correct(n_threads: usize) { + #[test] + fn test_min_max_scalar_without_x_parallel_correct() { let arr: [f32; 100] = core::array::from_fn(|i| i.as_()); - let sampled_indices = min_max_without_x_parallel(&arr, 10, n_threads); + let sampled_indices = min_max_without_x_parallel(&arr, 10); let sampled_values = sampled_indices .iter() .map(|x| arr[*x]) @@ -379,12 +347,12 @@ mod tests { assert_eq!(sampled_values, expected_values); } - #[apply(threads)] - fn test_min_max_scalar_with_x_parallel_correct(n_threads: usize) { + #[test] + fn test_min_max_scalar_with_x_parallel_correct() { let x: [i32; 100] = core::array::from_fn(|i| i.as_()); let arr: [f32; 100] = core::array::from_fn(|i| i.as_()); - let sampled_indices = min_max_with_x_parallel(&x, &arr, 10, n_threads); + let sampled_indices = min_max_with_x_parallel(&x, &arr, 10); let sampled_values = sampled_indices .iter() .map(|x| arr[*x]) @@ -421,14 +389,14 @@ mod tests { assert_eq!(sampled_indices, expected_indices); } - #[apply(threads)] - fn test_min_max_scalar_with_x_parallel_gap(n_threads: usize) { + #[test] + fn test_min_max_scalar_with_x_parallel_gap() { // Create a gap in the middle of the array // Increment the second half of the array by 50 let x: [i32; 100] = core::array::from_fn(|i| if i > 50 { (i + 50).as_() } else { i.as_() }); let arr: [f32; 100] = core::array::from_fn(|i| i.as_()); - let sampled_indices = min_max_with_x_parallel(&x, &arr, 10, n_threads); + let sampled_indices = min_max_with_x_parallel(&x, &arr, 10); assert_eq!(sampled_indices.len(), 8); // One full gap let expected_indices = vec![0, 29, 30, 50, 51, 69, 70, 99]; assert_eq!(sampled_indices, expected_indices); @@ -436,24 +404,23 @@ mod tests { // Increment the second half of the array by 50 again let x = x.map(|i| if i > 101 { i + 50 } else { i }); - let sampled_indices = min_max_with_x_parallel(&x, &arr, 10, n_threads); + let sampled_indices = min_max_with_x_parallel(&x, &arr, 10); assert_eq!(sampled_indices.len(), 9); // Gap with 1 value let expected_indices = vec![0, 39, 40, 50, 51, 52, 59, 60, 99]; assert_eq!(sampled_indices, expected_indices); } - #[apply(threads)] - fn test_many_random_runs_same_output(n_threads: usize) { + #[apply(n_outs)] + fn test_many_random_runs_same_output(n_out: usize) { const N: usize = 20_003; - const N_OUT: usize = 202; let x: [i32; N] = core::array::from_fn(|i| i.as_()); for _ in 0..100 { let mut arr = get_array_f32(N); arr[N - 1] = f32::INFINITY; // Make sure the last value is always the max - let idxs1 = min_max_without_x(arr.as_slice(), N_OUT); - let idxs2 = min_max_without_x_parallel(arr.as_slice(), N_OUT, n_threads); - let idxs3 = min_max_with_x(&x, arr.as_slice(), N_OUT); - let idxs4 = min_max_with_x_parallel(&x, arr.as_slice(), N_OUT, n_threads); + let idxs1 = min_max_without_x(arr.as_slice(), n_out); + let idxs2 = min_max_without_x_parallel(arr.as_slice(), n_out); + let idxs3 = min_max_with_x(&x, arr.as_slice(), n_out); + let idxs4 = min_max_with_x_parallel(&x, arr.as_slice(), n_out); assert_eq!(idxs1, idxs2); assert_eq!(idxs1, idxs3); assert_eq!(idxs1, idxs4); diff --git a/downsample_rs/src/minmaxlttb.rs b/downsample_rs/src/minmaxlttb.rs index 9d8017a..1654fa4 100644 --- a/downsample_rs/src/minmaxlttb.rs +++ b/downsample_rs/src/minmaxlttb.rs @@ -10,27 +10,16 @@ use num_traits::{AsPrimitive, FromPrimitive}; // ----------- WITH X -macro_rules! minmaxlttb_with_x { - ($func_name:ident, $trait:path, $func:expr) => { - pub fn $func_name + FromPrimitive, Ty: Num + AsPrimitive>( - x: &[Tx], - y: &[Ty], - n_out: usize, - minmax_ratio: usize, - ) -> Vec - where - for<'a> &'a [Ty]: $trait, - { - minmaxlttb_generic( - x, - y, - n_out, - minmax_ratio, - None, - MinMaxFunctionWithX::Serial($func), - ) - } - }; +pub fn minmaxlttb_with_x + FromPrimitive, Ty: Num + AsPrimitive>( + x: &[Tx], + y: &[Ty], + n_out: usize, + minmax_ratio: usize, +) -> Vec +where + for<'a> &'a [Ty]: ArgMinMax, +{ + minmaxlttb_generic(x, y, n_out, minmax_ratio, minmax::min_max_with_x) } minmaxlttb_with_x!(minmaxlttb_with_x, ArgMinMax, minmax::min_max_with_x); @@ -42,25 +31,15 @@ minmaxlttb_with_x!( // ----------- WITHOUT X -macro_rules! minmaxlttb_without_x { - ($func_name:ident, $trait:path, $func:expr) => { - pub fn $func_name>( - y: &[Ty], - n_out: usize, - minmax_ratio: usize, - ) -> Vec - where - for<'a> &'a [Ty]: $trait, - { - minmaxlttb_generic_without_x( - y, - n_out, - minmax_ratio, - None, - MinMaxFunctionWithoutX::Serial($func), - ) - } - }; +pub fn minmaxlttb_without_x>( + y: &[Ty], + n_out: usize, + minmax_ratio: usize, +) -> Vec +where + for<'a> &'a [Ty]: ArgMinMax, +{ + minmaxlttb_generic_without_x(y, n_out, minmax_ratio, minmax::min_max_without_x) } minmaxlttb_without_x!(minmaxlttb_without_x, ArgMinMax, minmax::min_max_without_x); @@ -74,31 +53,19 @@ minmaxlttb_without_x!( // ----------- WITH X -macro_rules! minmaxlttb_with_x_parallel { - ($func_name:ident, $trait:path, $func:expr) => { - pub fn $func_name< - Tx: Num + AsPrimitive + FromPrimitive + Send + Sync, - Ty: Num + AsPrimitive + Send + Sync, - >( - x: &[Tx], - y: &[Ty], - n_out: usize, - minmax_ratio: usize, - n_threads: usize, - ) -> Vec - where - for<'a> &'a [Ty]: $trait, - { - minmaxlttb_generic( - x, - y, - n_out, - minmax_ratio, - Some(n_threads), - MinMaxFunctionWithX::Parallel($func), - ) - } - }; +pub fn minmaxlttb_with_x_parallel< + Tx: Num + AsPrimitive + FromPrimitive + Send + Sync, + Ty: Num + AsPrimitive + Send + Sync, +>( + x: &[Tx], + y: &[Ty], + n_out: usize, + minmax_ratio: usize, +) -> Vec +where + for<'a> &'a [Ty]: ArgMinMax, +{ + minmaxlttb_generic(x, y, n_out, minmax_ratio, minmax::min_max_with_x_parallel) } minmaxlttb_with_x_parallel!( @@ -114,26 +81,15 @@ minmaxlttb_with_x_parallel!( // ----------- WITHOUT X -macro_rules! minmaxlttb_without_x_parallel { - ($func_name:ident, $trait:path, $func:expr) => { - pub fn $func_name + Send + Sync>( - y: &[Ty], - n_out: usize, - minmax_ratio: usize, - n_threads: usize, - ) -> Vec - where - for<'a> &'a [Ty]: $trait, - { - minmaxlttb_generic_without_x( - y, - n_out, - minmax_ratio, - Some(n_threads), - MinMaxFunctionWithoutX::Parallel($func), - ) - } - }; +pub fn minmaxlttb_without_x_parallel + Send + Sync>( + y: &[Ty], + n_out: usize, + minmax_ratio: usize, +) -> Vec +where + for<'a> &'a [Ty]: ArgMinMax, +{ + minmaxlttb_generic_without_x(y, n_out, minmax_ratio, minmax::min_max_without_x_parallel) } minmaxlttb_without_x_parallel!( @@ -149,28 +105,13 @@ minmaxlttb_without_x_parallel!( // ----------------------------------- GENERICS ------------------------------------ -// types to make function signatures easier to read -type ThreadCount = usize; -type OutputCount = usize; - -pub enum MinMaxFunctionWithX, Ty: Num + AsPrimitive> { - Serial(fn(&[Tx], &[Ty], OutputCount) -> Vec), - Parallel(fn(&[Tx], &[Ty], OutputCount, ThreadCount) -> Vec), -} - -pub enum MinMaxFunctionWithoutX> { - Serial(fn(&[Ty], OutputCount) -> Vec), - Parallel(fn(&[Ty], OutputCount, ThreadCount) -> Vec), -} - #[inline(always)] pub(crate) fn minmaxlttb_generic, Ty: Num + AsPrimitive>( x: &[Tx], y: &[Ty], n_out: usize, minmax_ratio: usize, - n_threads: Option, - f_minmax: MinMaxFunctionWithX, + f_minmax: fn(&[Tx], &[Ty], usize) -> Vec, ) -> Vec where // for<'a> &'a [Ty]: ArgMinMax, @@ -180,19 +121,11 @@ where // Apply first min max aggregation (if above ratio) if x.len() / n_out > minmax_ratio { // Get index of min max points - let mut index = match f_minmax { - MinMaxFunctionWithX::Serial(func) => func( - &x[1..(x.len() - 1)], - &y[1..(x.len() - 1)], - n_out * minmax_ratio, - ), - MinMaxFunctionWithX::Parallel(func) => func( - &x[1..(x.len() - 1)], - &y[1..(x.len() - 1)], - n_out * minmax_ratio, - n_threads.unwrap(), // n_threads cannot be None - ), - }; + let mut index = f_minmax( + &x[1..(x.len() - 1)], + &y[1..(x.len() - 1)], + n_out * minmax_ratio, + ); // inplace + 1 index.iter_mut().for_each(|elem| *elem += 1); // Prepend first and last point @@ -228,8 +161,7 @@ pub(crate) fn minmaxlttb_generic_without_x>( y: &[Ty], n_out: usize, minmax_ratio: usize, - n_threads: Option, - f_minmax: MinMaxFunctionWithoutX, + f_minmax: fn(&[Ty], usize) -> Vec, ) -> Vec where // for<'a> &'a [Ty]: ArgMinMax, @@ -238,16 +170,7 @@ where // Apply first min max aggregation (if above ratio) if y.len() / n_out > minmax_ratio { // Get index of min max points - let mut index = match f_minmax { - MinMaxFunctionWithoutX::Serial(func) => { - func(&y[1..(y.len() - 1)], n_out * minmax_ratio) - } - MinMaxFunctionWithoutX::Parallel(func) => func( - &y[1..(y.len() - 1)], - n_out * minmax_ratio, - n_threads.unwrap(), // n_threads cannot be None - ), - }; + let mut index = f_minmax(&y[1..(y.len() - 1)], n_out * minmax_ratio); // inplace + 1 index.iter_mut().for_each(|elem| *elem += 1); // Prepend first and last point @@ -286,14 +209,13 @@ mod tests { utils::get_random_array(n, f32::MIN, f32::MAX) } - // Template for the n_threads matrix + // Template for n_out #[template] #[rstest] - #[case(1)] - #[case(utils::get_all_threads() / 2)] - #[case(utils::get_all_threads())] - #[case(utils::get_all_threads() * 2)] - fn threads(#[case] n_threads: usize) {} + #[case(98)] + #[case(100)] + #[case(102)] + fn n_outs(#[case] n_out: usize) {} #[test] fn test_minmaxlttb_with_x() { @@ -310,32 +232,30 @@ mod tests { assert_eq!(sampled_indices, vec![0, 1, 5, 9]); } - #[apply(threads)] - fn test_minmaxlttb_with_x_parallel(n_threads: usize) { + #[test] + fn test_minmaxlttb_with_x_parallel() { let x = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; let y = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]; - let sampled_indices = minmaxlttb_with_x_parallel(&x, &y, 4, 2, n_threads); + let sampled_indices = minmaxlttb_with_x_parallel(&x, &y, 4, 2); assert_eq!(sampled_indices, vec![0, 1, 5, 9]); } - #[apply(threads)] - fn test_minmaxlttb_without_x_parallel(n_threads: usize) { + #[test] + fn test_minmaxlttb_without_x_parallel() { let y = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]; - let sampled_indices = minmaxlttb_without_x_parallel(&y, 4, 2, n_threads); + let sampled_indices = minmaxlttb_without_x_parallel(&y, 4, 2); assert_eq!(sampled_indices, vec![0, 1, 5, 9]); } - #[apply(threads)] - fn test_many_random_runs_same_output(n_threads: usize) { + #[apply(n_outs)] + fn test_many_random_runs_same_output(n_out: usize) { const N: usize = 20_000; - const N_OUT: usize = 100; const MINMAX_RATIO: usize = 5; for _ in 0..100 { // TODO: test with x let arr = get_array_f32(N); - let idxs1 = minmaxlttb_without_x(arr.as_slice(), N_OUT, MINMAX_RATIO); - let idxs2 = - minmaxlttb_without_x_parallel(arr.as_slice(), N_OUT, MINMAX_RATIO, n_threads); + let idxs1 = minmaxlttb_without_x(arr.as_slice(), n_out, MINMAX_RATIO); + let idxs2 = minmaxlttb_without_x_parallel(arr.as_slice(), n_out, MINMAX_RATIO); assert_eq!(idxs1, idxs2); } } diff --git a/downsample_rs/src/searchsorted.rs b/downsample_rs/src/searchsorted.rs index af53a9c..c02a3e1 100644 --- a/downsample_rs/src/searchsorted.rs +++ b/downsample_rs/src/searchsorted.rs @@ -2,6 +2,7 @@ use rayon::iter::IndexedParallelIterator; use rayon::prelude::*; use super::types::Num; +use super::POOL; use num_traits::{AsPrimitive, FromPrimitive}; // ---------------------- Binary search ---------------------- @@ -128,7 +129,6 @@ fn sequential_add_mul(start_val: f64, add_val: f64, mul: usize) -> f64 { pub(crate) fn get_equidistant_bin_idx_iterator_parallel( arr: &[T], nb_bins: usize, - n_threads: usize, ) -> impl IndexedParallelIterator> + '_> + '_ where T: Num + FromPrimitive + AsPrimitive + Sync + Send, @@ -140,7 +140,7 @@ where (arr[arr.len() - 1].as_() / nb_bins as f64) - (arr[0].as_() / nb_bins as f64); let arr0: f64 = arr[0].as_(); // The first value of the array // 2. Compute the number of threads & bins per thread - let n_threads = std::cmp::min(n_threads, nb_bins); + let n_threads = std::cmp::min(POOL.current_num_threads(), nb_bins); let nb_bins_per_thread = nb_bins / n_threads; let nb_bins_last_thread = nb_bins - nb_bins_per_thread * (n_threads - 1); // 3. Iterate over the number of threads @@ -188,16 +188,15 @@ mod tests { use super::*; - use dev_utils::utils::{get_all_threads, get_random_array}; + use dev_utils::utils::get_random_array; - // Template for the n_threads matrix + // Template for nb_bins #[template] #[rstest] - #[case(1)] - #[case(get_all_threads() / 2)] - #[case(get_all_threads())] - #[case(get_all_threads() * 2)] - fn threads(#[case] n_threads: usize) {} + #[case(99)] + #[case(100)] + #[case(101)] + fn nb_bins(#[case] nb_bins: usize) {} #[test] fn test_search_sorted_identicial_to_np_linspace_searchsorted() { @@ -250,8 +249,8 @@ mod tests { // assert_eq!(binary_search_with_mid(&arr, 11, 0, arr.len() - 1, 9), 10); } - #[apply(threads)] - fn test_get_equidistant_bin_idxs(n_threads: usize) { + #[test] + fn test_get_equidistant_bin_idxs() { let expected_indices = vec![0, 4, 7]; let arr = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; @@ -259,7 +258,7 @@ mod tests { let bin_idxs = bin_idxs_iter.map(|x| x.unwrap().0).collect::>(); assert_eq!(bin_idxs, expected_indices); - let bin_idxs_iter = get_equidistant_bin_idx_iterator_parallel(&arr, 3, n_threads); + let bin_idxs_iter = get_equidistant_bin_idx_iterator_parallel(&arr, 3); let bin_idxs = bin_idxs_iter .map(|x| x.map(|x| x.unwrap().0).collect::>()) .flatten() @@ -267,10 +266,9 @@ mod tests { assert_eq!(bin_idxs, expected_indices); } - #[apply(threads)] - fn test_many_random_same_result(n_threads: usize) { + #[apply(nb_bins)] + fn test_many_random_same_result(nb_bins: usize) { let n = 5_000; - let nb_bins = 100; for _ in 0..100 { let mut arr = get_random_array::(n, i32::MIN, i32::MAX); @@ -282,8 +280,7 @@ mod tests { let bin_idxs = bin_idxs_iter.map(|x| x.unwrap().0).collect::>(); // Calculate the bin indexes in parallel - let bin_idxs_iter = - get_equidistant_bin_idx_iterator_parallel(&arr[..], nb_bins, n_threads); + let bin_idxs_iter = get_equidistant_bin_idx_iterator_parallel(&arr[..], nb_bins); let bin_idxs_parallel = bin_idxs_iter .map(|x| x.map(|x| x.unwrap().0).collect::>()) .flatten() diff --git a/pyproject.toml b/pyproject.toml index 8583728..e186284 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "maturin" [project] name = "tsdownsample" description = "Time series downsampling in rust" -version = "0.1.2" +version = "0.1.3rc2" requires-python = ">=3.7" dependencies = ["numpy"] authors = [{name = "Jeroen Van Der Donckt"}] @@ -21,6 +21,7 @@ classifiers = [ 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: 3.11', + 'Programming Language :: Python :: 3.12', 'Operating System :: POSIX', 'Operating System :: MacOS :: MacOS X', 'Operating System :: Microsoft :: Windows' diff --git a/src/lib.rs b/src/lib.rs index b7513c5..19967c0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,25 +32,6 @@ macro_rules! _create_pyfunc_without_x { }; } -macro_rules! _create_pyfunc_without_x_multithreaded { - ($name:ident, $resample_mod:ident, $resample_fn:ident, $type:ty, $mod:ident) => { - // Create the Python function - #[pyfunction] - fn $name<'py>( - py: Python<'py>, - y: PyReadonlyArray1<$type>, - n_out: usize, - n_threads: usize, - ) -> &'py PyArray1 { - let y = y.as_slice().unwrap(); - let sampled_indices = $resample_mod::$resample_fn(y, n_out, n_threads); - sampled_indices.into_pyarray(py) - } - // Add the function to the module - $mod.add_wrapped(wrap_pyfunction!($name))?; - }; -} - macro_rules! _create_pyfunc_without_x_with_ratio { ($name:ident, $resample_mod:ident, $resample_fn:ident, $type:ty, $mod:ident) => { // Create the Python function @@ -70,26 +51,6 @@ macro_rules! _create_pyfunc_without_x_with_ratio { }; } -macro_rules! _create_pyfunc_without_x_with_ratio_multithreaded { - ($name:ident, $resample_mod:ident, $resample_fn:ident, $type:ty, $mod:ident) => { - // Create the Python function - #[pyfunction] - fn $name<'py>( - py: Python<'py>, - y: PyReadonlyArray1<$type>, - n_out: usize, - ratio: usize, - n_threads: usize, - ) -> &'py PyArray1 { - let y = y.as_slice().unwrap(); - let sampled_indices = $resample_mod::$resample_fn(y, n_out, ratio, n_threads); - sampled_indices.into_pyarray(py) - } - // Add the function to the module - $mod.add_wrapped(wrap_pyfunction!($name))?; - }; -} - macro_rules! _create_pyfuncs_without_x_generic { ($create_macro:ident, $resample_mod:ident, $resample_fn:ident, $mod:ident, $($t:ty)*) => { $( @@ -130,27 +91,6 @@ macro_rules! _create_pyfunc_with_x { }; } -macro_rules! _create_pyfunc_with_x_multithreaded { - ($name:ident, $resample_mod:ident, $resample_fn:ident, $type_x:ty, $type_y:ty, $mod:ident) => { - // Create the Python function - #[pyfunction] - fn $name<'py>( - py: Python<'py>, - x: PyReadonlyArray1<$type_x>, - y: PyReadonlyArray1<$type_y>, - n_out: usize, - n_threads: usize, - ) -> &'py PyArray1 { - let x = x.as_slice().unwrap(); - let y = y.as_slice().unwrap(); - let sampled_indices = $resample_mod::$resample_fn(x, y, n_out, n_threads); - sampled_indices.into_pyarray(py) - } - // Add the function to the module - $mod.add_wrapped(wrap_pyfunction!($name))?; - }; -} - macro_rules! _create_pyfunc_with_x_with_ratio { ($name:ident, $resample_mod:ident, $resample_fn:ident, $type_x:ty, $type_y:ty, $mod:ident) => { // Create the Python function @@ -172,28 +112,6 @@ macro_rules! _create_pyfunc_with_x_with_ratio { }; } -macro_rules! _create_pyfunc_with_x_with_ratio_multithreaded { - ($name:ident, $resample_mod:ident, $resample_fn:ident, $type_x:ty, $type_y:ty, $mod:ident) => { - // Create the Python function - #[pyfunction] - fn $name<'py>( - py: Python<'py>, - x: PyReadonlyArray1<$type_x>, - y: PyReadonlyArray1<$type_y>, - n_out: usize, - ratio: usize, - n_threads: usize, - ) -> &'py PyArray1 { - let x = x.as_slice().unwrap(); - let y = y.as_slice().unwrap(); - let sampled_indices = $resample_mod::$resample_fn(x, y, n_out, ratio, n_threads); - sampled_indices.into_pyarray(py) - } - // Add the function to the module - $mod.add_wrapped(wrap_pyfunction!($name))?; - }; -} - macro_rules! _create_pyfuncs_with_x_generic { // ($create_macro:ident, $resample_mod:ident, $resample_fn:ident, $mod:ident, $($t:ty)+) => { // // The macro will implement the function for all combinations of $t (for type x and y). @@ -284,22 +202,6 @@ macro_rules! create_pyfuncs_without_x { $mod ); }; - (@threaded $resample_mod:ident, $resample_fn:ident, $mod:ident) => { - _create_pyfuncs_without_x_helper!( - _create_pyfunc_without_x_multithreaded, - $resample_mod, - $resample_fn, - $mod - ); - }; - (@nan @threaded $resample_mod:ident, $resample_fn:ident, $mod:ident) => { - _create_pyfuncs_without_x_helper!(@nan - _create_pyfunc_without_x_multithreaded, - $resample_mod, - $resample_fn, - $mod - ); - }; } macro_rules! create_pyfuncs_without_x_with_ratio { @@ -320,22 +222,6 @@ macro_rules! create_pyfuncs_without_x_with_ratio { $mod ); }; - (@threaded $resample_mod:ident, $resample_fn:ident, $mod:ident) => { - _create_pyfuncs_without_x_helper!( - _create_pyfunc_without_x_with_ratio_multithreaded, - $resample_mod, - $resample_fn, - $mod - ); - }; - (@nan @threaded $resample_mod:ident, $resample_fn:ident, $mod:ident) => { - _create_pyfuncs_without_x_helper!(@nan - _create_pyfunc_without_x_with_ratio_multithreaded, - $resample_mod, - $resample_fn, - $mod - ); - }; } macro_rules! _create_pyfuncs_with_x_helper { @@ -355,22 +241,6 @@ macro_rules! create_pyfuncs_with_x { (@nan $resample_mod:ident, $resample_fn:ident, $mod:ident) => { _create_pyfuncs_with_x_helper!(@nan _create_pyfunc_with_x, $resample_mod, $resample_fn, $mod); }; - (@threaded $resample_mod:ident, $resample_fn:ident, $mod:ident) => { - _create_pyfuncs_with_x_helper!( - _create_pyfunc_with_x_multithreaded, - $resample_mod, - $resample_fn, - $mod - ); - }; - (@nan @threaded $resample_mod:ident, $resample_fn:ident, $mod:ident) => { - _create_pyfuncs_with_x_helper!(@nan - _create_pyfunc_with_x_multithreaded, - $resample_mod, - $resample_fn, - $mod - ); - }; } macro_rules! create_pyfuncs_with_x_with_ratio { @@ -391,22 +261,6 @@ macro_rules! create_pyfuncs_with_x_with_ratio { $mod ); }; - (@threaded $resample_mod:ident, $resample_fn:ident, $mod:ident) => { - _create_pyfuncs_with_x_helper!( - _create_pyfunc_with_x_with_ratio_multithreaded, - $resample_mod, - $resample_fn, - $mod - ); - }; - (@nan @threaded $resample_mod:ident, $resample_fn:ident, $mod:ident) => { - _create_pyfuncs_with_x_helper!(@nan - _create_pyfunc_with_x_with_ratio_multithreaded, - $resample_mod, - $resample_fn, - $mod - ); - }; } // -------------------------------------- MINMAX --------------------------------------- @@ -438,14 +292,14 @@ fn minmax(_py: Python<'_>, m: &PyModule) -> PyResult<()> { // ----- WITHOUT X { - create_pyfuncs_without_x!(@threaded minmax_mod, min_max_without_x_parallel, parallel_mod); - create_pyfuncs_without_x!(@nan @threaded minmax_mod, min_max_without_x_parallel_nan, parallel_mod); + create_pyfuncs_without_x!(minmax_mod, min_max_without_x_parallel, parallel_mod); + create_pyfuncs_without_x!(@nan minmax_mod, min_max_without_x_parallel, parallel_mod); } // ----- WITH X { - create_pyfuncs_with_x!(@threaded minmax_mod, min_max_with_x_parallel, parallel_mod); - create_pyfuncs_with_x!(@nan @threaded minmax_mod, min_max_with_x_parallel_nan, parallel_mod); + create_pyfuncs_with_x!(minmax_mod, min_max_with_x_parallel, parallel_mod); + create_pyfuncs_with_x!(@nan minmax_mod, min_max_with_x_parallel, parallel_mod); } // Add the sub modules to the module @@ -484,14 +338,14 @@ fn m4(_py: Python, m: &PyModule) -> PyResult<()> { // ----- WITHOUT X { - create_pyfuncs_without_x!(@threaded m4_mod, m4_without_x_parallel, parallel_mod); - create_pyfuncs_without_x!(@nan @threaded m4_mod, m4_without_x_parallel, parallel_mod); + create_pyfuncs_without_x!(m4_mod, m4_without_x_parallel, parallel_mod); + create_pyfuncs_without_x!(@nan m4_mod, m4_without_x_parallel, parallel_mod); } // ----- WITH X { - create_pyfuncs_with_x!(@threaded m4_mod, m4_with_x_parallel, parallel_mod); - create_pyfuncs_with_x!(@nan @threaded m4_mod, m4_with_x_parallel, parallel_mod); + create_pyfuncs_with_x!(m4_mod, m4_with_x_parallel, parallel_mod); + create_pyfuncs_with_x!(@nan m4_mod, m4_with_x_parallel, parallel_mod); } // Add the sub modules to the module @@ -558,7 +412,7 @@ fn minmaxlttb(_py: Python, m: &PyModule) -> PyResult<()> { // ----- WITHOUT X { - create_pyfuncs_without_x_with_ratio!(@threaded + create_pyfuncs_without_x_with_ratio!( minmaxlttb_mod, minmaxlttb_without_x_parallel, parallel_mod @@ -572,16 +426,8 @@ fn minmaxlttb(_py: Python, m: &PyModule) -> PyResult<()> { // ----- WITH X { - create_pyfuncs_with_x_with_ratio!(@threaded - minmaxlttb_mod, - minmaxlttb_with_x_parallel, - parallel_mod - ); - create_pyfuncs_with_x_with_ratio!(@nan @threaded - minmaxlttb_mod, - minmaxlttb_with_x_parallel, - parallel_mod - ); + create_pyfuncs_with_x_with_ratio!(minmaxlttb_mod, minmaxlttb_with_x_parallel, parallel_mod); + create_pyfuncs_with_x_with_ratio!(@nan minmaxlttb_mod, minmaxlttb_with_x_parallel, parallel_mod); } // Add the submodules to the module diff --git a/tests/benchmarks/test_downsamplers.py b/tests/benchmarks/test_downsamplers.py index 515ac1e..e257487 100644 --- a/tests/benchmarks/test_downsamplers.py +++ b/tests/benchmarks/test_downsamplers.py @@ -1,5 +1,3 @@ -import os - import numpy as np import pytest @@ -16,10 +14,6 @@ Y_DTYPES = [np.float32, np.float64] + [np.int32, np.int64] -def _parallel_to_n_threads(parallel): - return 0 if not parallel else os.cpu_count() - - # --------------------------------------------------------------------------- # # MinMaxDownsampler # --------------------------------------------------------------------------- # @@ -35,11 +29,10 @@ def test_minmax_no_x(benchmark, n_samples, n_out, dtype, parallel): downsampler = MinMaxDownsampler() n_samples = int(n_samples.replace(",", "")) n_out = int(n_out.replace(",", "")) - n_threads = _parallel_to_n_threads(parallel) y = np.random.randn(n_samples).astype(dtype) - benchmark(downsampler.downsample, y, n_out=n_out, n_threads=n_threads) + benchmark(downsampler.downsample, y, n_out=n_out, parallel=parallel) @pytest.mark.benchmark(group="minmax") @@ -52,12 +45,11 @@ def test_minmax_with_x(benchmark, n_samples, n_out, dtype, parallel): downsampler = MinMaxDownsampler() n_samples = int(n_samples.replace(",", "")) n_out = int(n_out.replace(",", "")) - n_threads = _parallel_to_n_threads(parallel) x = np.arange(n_samples) y = np.random.randn(n_samples).astype(dtype) - benchmark(downsampler.downsample, x, y, n_out=n_out, n_threads=n_threads) + benchmark(downsampler.downsample, x, y, n_out=n_out, parallel=parallel) # --------------------------------------------------------------------------- # @@ -75,11 +67,10 @@ def test_m4_no_x(benchmark, n_samples, n_out, dtype, parallel): downsampler = M4Downsampler() n_samples = int(n_samples.replace(",", "")) n_out = int(n_out.replace(",", "")) - n_threads = _parallel_to_n_threads(parallel) y = np.random.randn(n_samples).astype(dtype) - benchmark(downsampler.downsample, y, n_out=n_out, n_threads=n_threads) + benchmark(downsampler.downsample, y, n_out=n_out, parallel=parallel) @pytest.mark.benchmark(group="m4") @@ -92,12 +83,11 @@ def test_m4_with_x(benchmark, n_samples, n_out, dtype, parallel): downsampler = M4Downsampler() n_samples = int(n_samples.replace(",", "")) n_out = int(n_out.replace(",", "")) - n_threads = _parallel_to_n_threads(parallel) x = np.arange(n_samples) y = np.random.randn(n_samples).astype(dtype) - benchmark(downsampler.downsample, x, y, n_out=n_out, n_threads=n_threads) + benchmark(downsampler.downsample, x, y, n_out=n_out, parallel=parallel) # --------------------------------------------------------------------------- # @@ -115,11 +105,10 @@ def test_lttb_no_x(benchmark, n_samples, n_out, dtype, parallel): downsampler = LTTBDownsampler() n_samples = int(n_samples.replace(",", "")) n_out = int(n_out.replace(",", "")) - n_threads = _parallel_to_n_threads(parallel) y = np.random.randn(n_samples).astype(dtype) - benchmark(downsampler.downsample, y, n_out=n_out, n_threads=n_threads) + benchmark(downsampler.downsample, y, n_out=n_out, parallel=parallel) @pytest.mark.benchmark(group="lttb") @@ -132,12 +121,11 @@ def test_lttb_with_x(benchmark, n_samples, n_out, dtype, parallel): downsampler = LTTBDownsampler() n_samples = int(n_samples.replace(",", "")) n_out = int(n_out.replace(",", "")) - n_threads = _parallel_to_n_threads(parallel) x = np.arange(n_samples) y = np.random.randn(n_samples).astype(dtype) - benchmark(downsampler.downsample, x, y, n_out=n_out, n_threads=n_threads) + benchmark(downsampler.downsample, x, y, n_out=n_out, parallel=parallel) # --------------------------------------------------------------------------- # @@ -155,11 +143,10 @@ def test_minmaxlttb_no_x(benchmark, n_samples, n_out, dtype, parallel): downsampler = MinMaxLTTBDownsampler() n_samples = int(n_samples.replace(",", "")) n_out = int(n_out.replace(",", "")) - n_threads = _parallel_to_n_threads(parallel) y = np.random.randn(n_samples).astype(dtype) - benchmark(downsampler.downsample, y, n_out=n_out, n_threads=n_threads) + benchmark(downsampler.downsample, y, n_out=n_out, parallel=parallel) @pytest.mark.benchmark(group="minmaxlttb") @@ -172,12 +159,11 @@ def test_minmaxlttb_with_x(benchmark, n_samples, n_out, dtype, parallel): downsampler = MinMaxLTTBDownsampler() n_samples = int(n_samples.replace(",", "")) n_out = int(n_out.replace(",", "")) - n_threads = _parallel_to_n_threads(parallel) x = np.arange(n_samples) y = np.random.randn(n_samples).astype(dtype) - benchmark(downsampler.downsample, x, y, n_out=n_out, n_threads=n_threads) + benchmark(downsampler.downsample, x, y, n_out=n_out, parallel=parallel) # --------------------------------------------------------------------------- # diff --git a/tests/test_tsdownsample.py b/tests/test_tsdownsample.py index d21ba64..d228482 100644 --- a/tests/test_tsdownsample.py +++ b/tests/test_tsdownsample.py @@ -140,7 +140,7 @@ def test_parallel_downsampling(downsampler: AbstractDownsampler): """Test parallel downsampling.""" arr = np.random.randn(10_000).astype(np.float32) s_downsampled = downsampler.downsample(arr, n_out=100) - s_downsampled_p = downsampler.downsample(arr, n_out=100, n_threads=2) + s_downsampled_p = downsampler.downsample(arr, n_out=100, parallel=True) assert np.all(s_downsampled == s_downsampled_p) @@ -150,7 +150,7 @@ def test_parallel_downsampling_with_x(downsampler: AbstractDownsampler): arr = np.random.randn(10_001).astype(np.float32) # 10_001 to test edge case idx = np.arange(len(arr)) s_downsampled = downsampler.downsample(idx, arr, n_out=100) - s_downsampled_p = downsampler.downsample(idx, arr, n_out=100, n_threads=2) + s_downsampled_p = downsampler.downsample(idx, arr, n_out=100, parallel=True) assert np.all(s_downsampled == s_downsampled_p) @@ -236,7 +236,7 @@ def test_downsampling_no_out_of_bounds_different_dtypes( for dtype in y_dtypes: arr = arr_orig.astype(dtype) s_downsampled = downsampler.downsample(arr, n_out=76) - s_downsampled_p = downsampler.downsample(arr, n_out=76, n_threads=2) + s_downsampled_p = downsampler.downsample(arr, n_out=76, parallel=True) assert np.all(s_downsampled == s_downsampled_p) if dtype is not np.bool_: res += [s_downsampled] @@ -262,7 +262,7 @@ def test_downsampling_no_out_of_bounds_different_dtypes_with_x( for dtype_y in y_dtypes: arr = arr_orig.astype(dtype_y) s_downsampled = downsampler.downsample(idx, arr, n_out=76) - s_downsampled_p = downsampler.downsample(idx, arr, n_out=76, n_threads=2) + s_downsampled_p = downsampler.downsample(idx, arr, n_out=76, parallel=True) assert np.all(s_downsampled == s_downsampled_p) if dtype_y is not np.bool_: res += [s_downsampled] @@ -319,23 +319,45 @@ def test_error_invalid_args(): arr = np.random.randint(0, 100, size=10_000) # No args with pytest.raises(ValueError) as e_msg: - MinMaxDownsampler().downsample(n_out=100, n_threads=2) + MinMaxDownsampler().downsample(n_out=100, parallel=True) assert "takes 1 or 2 positional arguments" in str(e_msg.value) # Too many args with pytest.raises(ValueError) as e_msg: - MinMaxDownsampler().downsample(arr, arr, arr, n_out=100, n_threads=2) + MinMaxDownsampler().downsample(arr, arr, arr, n_out=100, parallel=True) assert "takes 1 or 2 positional arguments" in str(e_msg.value) # Invalid y with pytest.raises(ValueError) as e_msg: - MinMaxDownsampler().downsample(arr.reshape(5, 2_000), n_out=100, n_threads=2) + MinMaxDownsampler().downsample(arr.reshape(5, 2_000), n_out=100, parallel=True) assert "y must be 1D" in str(e_msg.value) # Invalid x with pytest.raises(ValueError) as e_msg: MinMaxDownsampler().downsample( - arr.reshape(5, 2_000), arr, n_out=100, n_threads=2 + arr.reshape(5, 2_000), arr, n_out=100, parallel=True ) assert "x must be 1D" in str(e_msg.value) # Invalid x and y (different length) with pytest.raises(ValueError) as e_msg: - MinMaxDownsampler().downsample(arr, arr[:-1], n_out=100, n_threads=2) + MinMaxDownsampler().downsample(arr, arr[:-1], n_out=100, parallel=True) assert "x and y must have the same length" in str(e_msg.value) + + +@pytest.mark.parametrize("downsampler", generate_rust_downsamplers()) +def test_non_contiguous_array(downsampler: AbstractDownsampler): + """Test non contiguous array.""" + arr = np.random.randint(0, 100, size=10_000) + arr = arr[::2] + assert not arr.flags["C_CONTIGUOUS"] + with pytest.raises(ValueError) as e_msg: + downsampler.downsample(arr, n_out=100) + assert "must be contiguous" in str(e_msg.value) + + +def test_everynth_non_contiguous_array(): + """Test non contiguous array.""" + arr = np.random.randint(0, 100, size=10_000) + arr = arr[::2] + assert not arr.flags["C_CONTIGUOUS"] + downsampler = EveryNthDownsampler() + s_downsampled = downsampler.downsample(arr, n_out=100) + assert s_downsampled[0] == 0 + assert s_downsampled[-1] == 4950 diff --git a/tsdownsample/__init__.py b/tsdownsample/__init__.py index 14630ef..9de8d61 100644 --- a/tsdownsample/__init__.py +++ b/tsdownsample/__init__.py @@ -11,7 +11,7 @@ NaNMinMaxLTTBDownsampler, ) -__version__ = "0.1.2" +__version__ = "0.1.3rc2" __author__ = "Jeroen Van Der Donckt" __all__ = [ diff --git a/tsdownsample/downsamplers.py b/tsdownsample/downsamplers.py index 16c0a9b..1132337 100644 --- a/tsdownsample/downsamplers.py +++ b/tsdownsample/downsamplers.py @@ -73,11 +73,11 @@ def rust_mod(self): return _tsdownsample_rs.minmaxlttb def downsample( - self, *args, n_out: int, minmax_ratio: int = 4, n_threads: int = 1, **_ + self, *args, n_out: int, minmax_ratio: int = 4, parallel: bool = False, **_ ): assert minmax_ratio > 0, "minmax_ratio must be greater than 0" return super().downsample( - *args, n_out=n_out, n_threads=n_threads, ratio=minmax_ratio + *args, n_out=n_out, parallel=parallel, ratio=minmax_ratio ) @@ -99,6 +99,9 @@ def downsample( class EveryNthDownsampler(AbstractDownsampler): + def __init__(self, **kwargs): + super().__init__(check_contiguous=False, **kwargs) + def _downsample( self, x: Union[np.ndarray, None], y: np.ndarray, n_out: int, **_ ) -> np.ndarray: diff --git a/tsdownsample/downsampling_interface.py b/tsdownsample/downsampling_interface.py index e77f9a3..0a82d6c 100644 --- a/tsdownsample/downsampling_interface.py +++ b/tsdownsample/downsampling_interface.py @@ -17,12 +17,26 @@ class AbstractDownsampler(ABC): def __init__( self, + check_contiguous: bool = True, x_dtype_regex_list: Optional[List[str]] = None, y_dtype_regex_list: Optional[List[str]] = None, ): + self.check_contiguous = check_contiguous self.x_dtype_regex_list = x_dtype_regex_list self.y_dtype_regex_list = y_dtype_regex_list + def _check_contiguous(self, arr: np.ndarray, y: bool = True): + # necessary for rust downsamplers as they don't support non-contiguous arrays + # (we call .as_slice().unwrap() on the array) in the lib.rs file + # which will panic if the array is not contiguous + if not self.check_contiguous: + return + + if arr.flags["C_CONTIGUOUS"]: + return + + raise ValueError(f"{'y' if y else 'x'} array must be contiguous.") + def _supports_dtype(self, arr: np.ndarray, y: bool = True): dtype_regex_list = self.y_dtype_regex_list if y else self.x_dtype_regex_list # base case @@ -66,6 +80,7 @@ def _check_valid_downsample_args( raise ValueError("x must be 1D array") if len(x) != len(y): raise ValueError("x and y must have the same length") + return x, y @staticmethod @@ -113,8 +128,10 @@ def downsample(self, *args, n_out: int, **kwargs): # x and y are optional self._check_valid_n_out(n_out) x, y = self._check_valid_downsample_args(*args) self._supports_dtype(y, y=True) + self._check_contiguous(y, y=True) if x is not None: self._supports_dtype(x, y=False) + self._check_contiguous(x, y=False) return self._downsample(x, y, n_out, **kwargs) @@ -144,7 +161,7 @@ class AbstractRustDownsampler(AbstractDownsampler, ABC): """RustDownsampler interface-class, subclassed by concrete downsamplers.""" def __init__(self): - super().__init__(_rust_dtypes, _y_rust_dtypes) # same for x and y + super().__init__(True, _rust_dtypes, _y_rust_dtypes) # same for x and y @property def rust_mod(self) -> ModuleType: @@ -297,13 +314,11 @@ def _downsample( x: Union[np.ndarray, None], y: np.ndarray, n_out: int, - n_threads: int = 1, + parallel: bool = False, **kwargs, ) -> np.ndarray: """Downsample the data in x and y.""" mod = self.mod_single_core - is_multi_core = False - parallel = n_threads > 1 if parallel: if self.mod_multi_core is None: name = self.__class__.__name__ @@ -313,7 +328,6 @@ def _downsample( ) else: mod = self.mod_multi_core - is_multi_core = True ## Viewing the y-data as different dtype (if necessary) if y.dtype == "bool": # bool is viewed as int8 @@ -327,10 +341,7 @@ def _downsample( ## Viewing the x-data as different dtype (if necessary) if x is None: downsample_f = self._switch_mod_with_y(y.dtype, mod) - if is_multi_core: - return downsample_f(y, n_out, n_threads=n_threads, **kwargs) - else: - return downsample_f(y, n_out, **kwargs) + return downsample_f(y, n_out, **kwargs) elif np.issubdtype(x.dtype, np.datetime64): # datetime64 is viewed as int64 x = x.view(dtype=np.int64) @@ -339,16 +350,17 @@ def _downsample( x = x.view(dtype=np.int64) ## Getting the appropriate downsample function downsample_f = self._switch_mod_with_x_and_y(x.dtype, y.dtype, mod) - if is_multi_core: - return downsample_f(x, y, n_out, n_threads=n_threads, **kwargs) - else: - return downsample_f(x, y, n_out, **kwargs) + return downsample_f(x, y, n_out, **kwargs) - def downsample( - self, *args, n_out: int, n_threads: int = 1, **kwargs # x and y are optional - ): - """Downsample the data in x and y.""" - return super().downsample(*args, n_out=n_out, n_threads=n_threads, **kwargs) + def downsample(self, *args, n_out: int, parallel: bool = False, **kwargs): + """Downsample the data in x and y. + + The x and y arguments are positional-only arguments. If only one argument is + passed, it is considered to be the y-data. If two arguments are passed, the + first argument is considered to be the x-data and the second argument is + considered to be the y-data. + """ + return super().downsample(*args, n_out=n_out, parallel=parallel, **kwargs) def __deepcopy__(self, memo): """Deepcopy the object."""