diff --git a/.github/workflows/ci-tsdownsample.yml b/.github/workflows/ci-tsdownsample.yml index 8261300..9a13c06 100644 --- a/.github/workflows/ci-tsdownsample.yml +++ b/.github/workflows/ci-tsdownsample.yml @@ -46,7 +46,7 @@ jobs: matrix: os: ['windows-latest', 'macOS-latest', 'ubuntu-latest'] rust: ['nightly'] # ['stable', 'beta'] - python-version: ['3.7', '3.8', '3.9', '3.10', '3.11'] + python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', '3.12'] env: PYTHON: ${{ matrix.python-version }} @@ -155,7 +155,7 @@ jobs: target: ${{ matrix.target }} manylinux: ${{ matrix.manylinux || 'auto' }} container: ${{ matrix.container }} - args: --release --out dist --interpreter ${{ matrix.interpreter || '3.7 3.8 3.9 3.10 3.11' }} + args: --release --out dist --interpreter ${{ matrix.interpreter || '3.7 3.8 3.9 3.10 3.11 3.12' }} - run: ${{ matrix.ls || 'ls -lh' }} dist/ diff --git a/.github/workflows/codspeed.yml b/.github/workflows/codspeed.yml index c9e31cb..7475832 100644 --- a/.github/workflows/codspeed.yml +++ b/.github/workflows/codspeed.yml @@ -43,6 +43,6 @@ jobs: # - run: rm tests/__init__.py - name: Run CodSpeed benchmarks - uses: CodSpeedHQ/action@v1 + uses: CodSpeedHQ/action@v2 with: run: pytest tests/benchmarks/ --codspeed diff --git a/Cargo.toml b/Cargo.toml index bc18a23..184e8c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,10 +9,10 @@ license = "MIT" [dependencies] downsample_rs = { path = "downsample_rs", features = ["half"]} -pyo3 = { version = "0.19", features = ["extension-module"] } -numpy = { version = "0.19", features = ["half"] } -half = { version = "2.1", default-features = false } -paste = { version = "1.0.9", default-features = false } +pyo3 = { version = "0.20", features = ["extension-module"] } +numpy = { version = "0.20", features = ["half"] } +half = { version = "2.3.1", default-features = false } +paste = { version = "1.0.14", default-features = false } [lib] name = "tsdownsample" diff --git a/README.md b/README.md index 9d4022c..1766eac 100644 --- a/README.md +++ b/README.md @@ -6,13 +6,14 @@ [![CodeQL](https://github.com/predict-idlab/tsdownsample/actions/workflows/codeql.yml/badge.svg)](https://github.com/predict-idlab/tsdownsample/actions/workflows/codeql.yml) [![Testing](https://github.com/predict-idlab/tsdownsample/actions/workflows/ci-downsample_rs.yml/badge.svg)](https://github.com/predict-idlab/tsdownsample/actions/workflows/ci-downsample_rs.yml) [![Testing](https://github.com/predict-idlab/tsdownsample/actions/workflows/ci-tsdownsample.yml/badge.svg)](https://github.com/predict-idlab/tsdownsample/actions/workflows/ci-tsdownsample.yml) + Extremely fast **time series downsampling 📈** for visualization, written in Rust. ## Features ✨ -* **Fast**: written in rust with PyO3 bindings +- **Fast**: written in rust with PyO3 bindings - leverages optimized [argminmax](https://github.com/jvdd/argminmax) - which is SIMD accelerated with runtime feature detection - scales linearly with the number of data points @@ -25,13 +26,13 @@ Extremely fast **time series downsampling 📈** for visualization, written in R In Rust - which is a compiled language - there is no GIL, so CPU-bound tasks can be parallelized (with Rayon) with little to no overhead. -* **Efficient**: memory efficient +- **Efficient**: memory efficient - works on views of the data (no copies) - no intermediate data structures are created -* **Flexible**: works on any type of data - - supported datatypes are - - for `x`: `f32`, `f64`, `i16`, `i32`, `i64`, `u16`, `u32`, `u64`, `datetime64`, `timedelta64` - - for `y`: `f16`, `f32`, `f64`, `i8`, `i16`, `i32`, `i64`, `u8`, `u16`, `u32`, `u64`, `datetime64`, `timedelta64`, `bool` +- **Flexible**: works on any type of data + - supported datatypes are + - for `x`: `f32`, `f64`, `i16`, `i32`, `i64`, `u16`, `u32`, `u64`, `datetime64`, `timedelta64` + - for `y`: `f16`, `f32`, `f64`, `i8`, `i16`, `i32`, `i64`, `u8`, `u16`, `u32`, `u64`, `datetime64`, `timedelta64`, `bool`
!! 🚀 f16 argminmax is 200-300x faster than numpy In contrast with all other data types above, f16 is *not* hardware supported (i.e., no instructions for f16) by most modern CPUs!!
@@ -39,7 +40,7 @@ Extremely fast **time series downsampling 📈** for visualization, written in R 💡 As for argminmax, only comparisons are needed - and thus no arithmetic operations - creating a symmetrical ordinal mapping from f16 to i16 is sufficient. This mapping allows to use the hardware supported scalar and SIMD i16 instructions - while not producing any memory overhead 🎉
More details are described in argminmax PR #1.
-* **Easy to use**: simple & flexible API +- **Easy to use**: simple & flexible API ## Install @@ -83,35 +84,54 @@ downsample([x], y, n_out, **kwargs) -> ndarray[uint64] ``` **Arguments**: + - `x` is optional - `x` and `y` are both positional arguments - `n_out` is a mandatory keyword argument that defines the number of output values* - `**kwargs` are optional keyword arguments *(see [table below](#downsampling-algorithms-📈))*: - - `n_threads`: how many threads to use for multi-threading (default `1`, so no multi-threading) + - `parallel`: whether to use multi-threading (default: `False`) + ❗ The max number of threads can be configured with the `TSDOWNSAMPLE_MAX_THREADS` ENV var (e.g. `os.environ["TSDOWNSAMPLE_MAX_THREADS"] = "4"`) - ... **Returns**: a `ndarray[uint64]` of indices that can be used to index the original data. -*When there are gaps in the time series, fewer than `n_out` indices may be returned. +\*When there are gaps in the time series, fewer than `n_out` indices may be returned. + ### Downsampling algorithms 📈 The following downsampling algorithms (classes) are implemented: | Downsampler | Description | `**kwargs` | | ---:| --- |--- | -| `MinMaxDownsampler` | selects the **min and max** value in each bin | `n_threads` | -| `M4Downsampler` | selects the [**min, max, first and last**](https://dl.acm.org/doi/pdf/10.14778/2732951.2732953) value in each bin | `n_threads` | -| `LTTBDownsampler` | performs the [**Largest Triangle Three Buckets**](https://skemman.is/bitstream/1946/15343/3/SS_MSthesis.pdf) algorithm | `n_threads` | -| `MinMaxLTTBDownsampler` | (*new two-step algorithm 🎉*) first selects `n_out` * `minmax_ratio` **min and max** values, then further reduces these to `n_out` values using the **Largest Triangle Three Buckets** algorithm | `n_threads`, `minmax_ratio`* | +| `MinMaxDownsampler` | selects the **min and max** value in each bin | `parallel` | +| `M4Downsampler` | selects the [**min, max, first and last**](https://dl.acm.org/doi/pdf/10.14778/2732951.2732953) value in each bin | `parallel` | +| `LTTBDownsampler` | performs the [**Largest Triangle Three Buckets**](https://skemman.is/bitstream/1946/15343/3/SS_MSthesis.pdf) algorithm | `parallel` | +| `MinMaxLTTBDownsampler` | (*new two-step algorithm 🎉*) first selects `n_out` * `minmax_ratio` **min and max** values, then further reduces these to `n_out` values using the **Largest Triangle Three Buckets** algorithm | `parallel`, `minmax_ratio`* | + +*Default value for `minmax_ratio` is 4, which is empirically proven to be a good default. More details here: https://arxiv.org/abs/2305.00332 -*Default value for `minmax_ratio` is 30, which is empirically proven to be a good default. (More details in our upcomming paper) +### Handling NaNs +This library supports two `NaN`-policies: + +1. Omit `NaN`s (`NaN`s are ignored during downsampling). +2. Return index of first `NaN` once there is at least one present in the bin of the considered data. + +| Omit `NaN`s | Return `NaN`s | +| ----------------------: | :------------------------- | +| `MinMaxDownsampler` | `NaNMinMaxDownsampler` | +| `M4Downsampler` | `NaNM4Downsampler` | +| `MinMaxLTTBDownsampler` | `NaNMinMaxLTTBDownsampler` | +| `LTTBDownsampler` | | + +> Note that NaNs are not supported for `x`-data. ## Limitations & assumptions 🚨 Assumes; + 1. `x`-data is (non-strictly) monotonic increasing (i.e., sorted) -2. no `NaNs` in the data +2. no `NaN`s in `x`-data --- diff --git a/downsample_rs/Cargo.toml b/downsample_rs/Cargo.toml index 169e8e9..50b2185 100644 --- a/downsample_rs/Cargo.toml +++ b/downsample_rs/Cargo.toml @@ -11,16 +11,16 @@ license = "MIT" argminmax = { version = "0.6.1", features = ["half"] } # For some reason we need to explicitely add chrono, otherwise polars refuses to compile? chrono = "0.4.31" -# argminmax = { path = "../../argminmax" , features = ["half", "ndarray"] } -half = { version = "2.1", default-features = false , features=["num-traits"], optional = true} -num-traits = { version = "0.2.15", default-features = false } polars = { version = "0.33.2", features = ["lazy", "streaming", "zip_with"] } -rayon = { version = "1.6.0", default-features = false } +half = { version = "2.3.1", default-features = false , features=["num-traits"], optional = true} +num-traits = { version = "0.2.17", default-features = false } +once_cell = "1" +rayon = { version = "1.8.0", default-features = false } [dev-dependencies] -rstest = { version = "0.18.1", default-features = false } +rstest = { version = "0.18.2", default-features = false } rstest_reuse = { version = "0.6", default-features = false } -criterion = "0.4.0" +criterion = "0.5.1" dev_utils = { path = "dev_utils" } [[bench]] diff --git a/downsample_rs/benches/bench_m4.rs b/downsample_rs/benches/bench_m4.rs index c40df6a..2aa1732 100644 --- a/downsample_rs/benches/bench_m4.rs +++ b/downsample_rs/benches/bench_m4.rs @@ -14,15 +14,8 @@ fn m4_f32_random_array_long_single_core(c: &mut Criterion) { fn m4_f32_random_array_long_multi_core(c: &mut Criterion) { let n = config::ARRAY_LENGTH_LONG; let data = utils::get_random_array::(n, f32::MIN, f32::MAX); - let all_threads: usize = utils::get_all_threads(); c.bench_function("m4_p_f32", |b| { - b.iter(|| { - m4_mod::m4_without_x_parallel( - black_box(data.as_slice()), - black_box(2_000), - black_box(all_threads), - ) - }) + b.iter(|| m4_mod::m4_without_x_parallel(black_box(data.as_slice()), black_box(2_000))) }); } @@ -48,15 +41,8 @@ fn m4_f32_random_array_50M_multi_core(c: &mut Criterion) { let n = 50_000_000; let data = utils::get_random_array::(n, f32::MIN, f32::MAX); let x = (0..n).map(|i| i as i32).collect::>(); - let all_threads: usize = utils::get_all_threads(); c.bench_function("m4_p_50M_f32", |b| { - b.iter(|| { - m4_mod::m4_without_x_parallel( - black_box(data.as_slice()), - black_box(2_000), - black_box(all_threads), - ) - }) + b.iter(|| m4_mod::m4_without_x_parallel(black_box(data.as_slice()), black_box(2_000))) }); c.bench_function("m4_x_p_50M_f32", |b| { b.iter(|| { @@ -64,7 +50,6 @@ fn m4_f32_random_array_50M_multi_core(c: &mut Criterion) { black_box(x.as_slice()), black_box(data.as_slice()), black_box(2_000), - black_box(all_threads), ) }) }); diff --git a/downsample_rs/benches/bench_minmax.rs b/downsample_rs/benches/bench_minmax.rs index 599cdbd..97bf677 100644 --- a/downsample_rs/benches/bench_minmax.rs +++ b/downsample_rs/benches/bench_minmax.rs @@ -14,14 +14,9 @@ fn minmax_f32_random_array_long_single_core(c: &mut Criterion) { fn minmax_f32_random_array_long_multi_core(c: &mut Criterion) { let n = config::ARRAY_LENGTH_LONG; let data = utils::get_random_array::(n, f32::MIN, f32::MAX); - let all_threads: usize = utils::get_all_threads(); c.bench_function("minmax_p_f32", |b| { b.iter(|| { - minmax_mod::min_max_without_x_parallel( - black_box(data.as_slice()), - black_box(2_000), - black_box(all_threads), - ) + minmax_mod::min_max_without_x_parallel(black_box(data.as_slice()), black_box(2_000)) }) }); } @@ -55,14 +50,9 @@ fn minmax_f32_random_array_50M_long_multi_core(c: &mut Criterion) { let n = 50_000_000; let data = utils::get_random_array::(n, f32::MIN, f32::MAX); let x = (0..n).map(|i| i as i32).collect::>(); - let all_threads: usize = utils::get_all_threads(); c.bench_function("minmax_p_50M_f32", |b| { b.iter(|| { - minmax_mod::min_max_without_x_parallel( - black_box(data.as_slice()), - black_box(2_000), - black_box(all_threads), - ) + minmax_mod::min_max_without_x_parallel(black_box(data.as_slice()), black_box(2_000)) }) }); c.bench_function("minmax_x_p_50M_f32", |b| { @@ -71,7 +61,6 @@ fn minmax_f32_random_array_50M_long_multi_core(c: &mut Criterion) { black_box(x.as_slice()), black_box(data.as_slice()), black_box(2_000), - black_box(all_threads), ) }) }); diff --git a/downsample_rs/benches/bench_minmaxlttb.rs b/downsample_rs/benches/bench_minmaxlttb.rs index a0241de..bb78e09 100644 --- a/downsample_rs/benches/bench_minmaxlttb.rs +++ b/downsample_rs/benches/bench_minmaxlttb.rs @@ -25,7 +25,6 @@ fn minmaxlttb_f32_random_array_long_multi_core(c: &mut Criterion) { let n = config::ARRAY_LENGTH_LONG; let x = (0..n).map(|i| i as i32).collect::>(); let y = utils::get_random_array::(n, f32::MIN, f32::MAX); - let all_threads: usize = utils::get_all_threads(); c.bench_function("mlttb_x_p_f32", |b| { b.iter(|| { minmaxlttb_mod::minmaxlttb_with_x_parallel( @@ -33,7 +32,6 @@ fn minmaxlttb_f32_random_array_long_multi_core(c: &mut Criterion) { black_box(y.as_slice()), black_box(2_000), black_box(MINMAX_RATIO), - black_box(all_threads), ) }) }); @@ -59,7 +57,6 @@ fn minmaxlttb_f32_random_array_50M_multi_core(c: &mut Criterion) { let n = 50_000_000; let x = (0..n).map(|i| i as i32).collect::>(); let y = utils::get_random_array::(n, f32::MIN, f32::MAX); - let all_threads: usize = utils::get_all_threads(); c.bench_function("mlttb_x_p_50M_f32", |b| { b.iter(|| { minmaxlttb_mod::minmaxlttb_with_x_parallel( @@ -67,7 +64,6 @@ fn minmaxlttb_f32_random_array_50M_multi_core(c: &mut Criterion) { black_box(y.as_slice()), black_box(2_000), black_box(MINMAX_RATIO), - black_box(all_threads), ) }) }); @@ -90,14 +86,12 @@ fn minmaxlttb_without_x_f32_random_array_50M_single_core(c: &mut Criterion) { fn minmaxlttb_without_x_f32_random_array_50M_multi_core(c: &mut Criterion) { let n = 50_000_000; let y = utils::get_random_array::(n, f32::MIN, f32::MAX); - let all_threads: usize = utils::get_all_threads(); c.bench_function("mlttb_p_50M_f32", |b| { b.iter(|| { minmaxlttb_mod::minmaxlttb_without_x_parallel( black_box(y.as_slice()), black_box(2_000), black_box(MINMAX_RATIO), - black_box(all_threads), ) }) }); diff --git a/downsample_rs/dev_utils/src/utils.rs b/downsample_rs/dev_utils/src/utils.rs index f3413ae..975fa65 100644 --- a/downsample_rs/dev_utils/src/utils.rs +++ b/downsample_rs/dev_utils/src/utils.rs @@ -33,11 +33,3 @@ where } arr } - -// ------------- Multi-threading ------------- - -use std::thread::available_parallelism; - -pub fn get_all_threads() -> usize { - available_parallelism().map(|x| x.get()).unwrap_or(1) -} diff --git a/downsample_rs/src/lib.rs b/downsample_rs/src/lib.rs index f678a45..409294d 100644 --- a/downsample_rs/src/lib.rs +++ b/downsample_rs/src/lib.rs @@ -14,3 +14,22 @@ pub use m4::*; pub(crate) mod helpers; pub(crate) mod searchsorted; pub(crate) mod types; + +use once_cell::sync::Lazy; +use rayon::{ThreadPool, ThreadPoolBuilder}; + +// Inspired by: https://github.com/pola-rs/polars/blob/9a69062aa0beb2a1bc5d57294cac49961fc91058/crates/polars-core/src/lib.rs#L49 +pub static POOL: Lazy = Lazy::new(|| { + ThreadPoolBuilder::new() + .num_threads( + std::env::var("TSDOWNSAMPLE_MAX_THREADS") + .map(|s| s.parse::().expect("integer")) + .unwrap_or_else(|_| { + std::thread::available_parallelism() + .unwrap_or(std::num::NonZeroUsize::new(1).unwrap()) + .get() + }), + ) + .build() + .expect("could not spawn threads") +}); diff --git a/downsample_rs/src/m4.rs b/downsample_rs/src/m4.rs index 7f4e582..bbd93be 100644 --- a/downsample_rs/src/m4.rs +++ b/downsample_rs/src/m4.rs @@ -1,12 +1,13 @@ -use argminmax::ArgMinMax; +use argminmax::{ArgMinMax, NaNArgMinMax}; use num_traits::{AsPrimitive, FromPrimitive}; use rayon::iter::IndexedParallelIterator; use rayon::prelude::*; -use crate::searchsorted::{ +use super::searchsorted::{ get_equidistant_bin_idx_iterator, get_equidistant_bin_idx_iterator_parallel, }; -use crate::types::Num; +use super::types::Num; +use super::POOL; // ----------------------------------- NON-PARALLEL ------------------------------------ @@ -60,70 +61,89 @@ where // ----------- WITH X -pub fn m4_with_x(x: &[Tx], arr: &[Ty], n_out: usize) -> Vec -where - for<'a> &'a [Ty]: ArgMinMax, - Tx: Num + FromPrimitive + AsPrimitive, - Ty: Copy + PartialOrd, -{ - assert_eq!(n_out % 4, 0); - let bin_idx_iterator = get_equidistant_bin_idx_iterator(x, n_out / 4); - m4_generic_with_x::(arr, bin_idx_iterator, n_out) +macro_rules! m4_with_x { + ($func_name:ident, $trait:path, $f_argminmax:expr) => { + pub fn $func_name(x: &[Tx], arr: &[Ty], n_out: usize) -> Vec + where + for<'a> &'a [Ty]: $trait, + Tx: Num + FromPrimitive + AsPrimitive, + Ty: Copy + PartialOrd, + { + assert_eq!(n_out % 4, 0); + let bin_idx_iterator = get_equidistant_bin_idx_iterator(x, n_out / 4); + m4_generic_with_x(arr, bin_idx_iterator, n_out, $f_argminmax) + } + }; } +m4_with_x!(m4_with_x, ArgMinMax, |arr| arr.argminmax()); +m4_with_x!(m4_with_x_nan, NaNArgMinMax, |arr| arr.nanargminmax()); + // ----------- WITHOUT X -pub fn m4_without_x(arr: &[T], n_out: usize) -> Vec -where - for<'a> &'a [T]: ArgMinMax, -{ - assert_eq!(n_out % 4, 0); - m4_generic(arr, n_out) +macro_rules! m4_without_x { + ($func_name:ident, $trait:path, $f_argminmax:expr) => { + pub fn $func_name(arr: &[T], n_out: usize) -> Vec + where + for<'a> &'a [T]: $trait, + { + assert_eq!(n_out % 4, 0); + m4_generic(arr, n_out, $f_argminmax) + } + }; } +m4_without_x!(m4_without_x, ArgMinMax, |arr| arr.argminmax()); +m4_without_x!(m4_without_x_nan, NaNArgMinMax, |arr| arr.nanargminmax()); + // ------------------------------------- PARALLEL -------------------------------------- // ----------- WITH X -pub fn m4_with_x_parallel( - x: &[Tx], - arr: &[Ty], - n_out: usize, - n_threads: usize, -) -> Vec -where - for<'a> &'a [Ty]: ArgMinMax, - Tx: Num + FromPrimitive + AsPrimitive + Send + Sync, - Ty: Copy + PartialOrd + Send + Sync, -{ - assert_eq!(n_out % 4, 0); - let bin_idx_iterator = get_equidistant_bin_idx_iterator_parallel(x, n_out / 4, n_threads); - m4_generic_with_x_parallel(arr, bin_idx_iterator, n_out, n_threads, |arr| { - arr.argminmax() - }) +macro_rules! m4_with_x_parallel { + ($func_name:ident, $trait:path, $f_argminmax:expr) => { + pub fn $func_name(x: &[Tx], arr: &[Ty], n_out: usize) -> Vec + where + for<'a> &'a [Ty]: $trait, + Tx: Num + FromPrimitive + AsPrimitive + Send + Sync, + Ty: Copy + PartialOrd + Send + Sync, + { + assert_eq!(n_out % 4, 0); + let bin_idx_iterator = get_equidistant_bin_idx_iterator_parallel(x, n_out / 4); + m4_generic_with_x_parallel(arr, bin_idx_iterator, n_out, $f_argminmax) + } + }; } +m4_with_x_parallel!(m4_with_x_parallel, ArgMinMax, |arr| arr.argminmax()); +m4_with_x_parallel!(m4_with_x_parallel_nan, NaNArgMinMax, |arr| arr + .nanargminmax()); + // ----------- WITHOUT X -pub fn m4_without_x_parallel( - arr: &[T], - n_out: usize, - n_threads: usize, -) -> Vec -where - for<'a> &'a [T]: ArgMinMax, -{ - assert_eq!(n_out % 4, 0); - m4_generic_parallel(arr, n_out, n_threads, |arr| arr.argminmax()) +macro_rules! m4_without_x_parallel { + ($func_name:ident, $trait:path, $f_argminmax:expr) => { + pub fn $func_name(arr: &[T], n_out: usize) -> Vec + where + for<'a> &'a [T]: $trait, + { + assert_eq!(n_out % 4, 0); + m4_generic_parallel(arr, n_out, $f_argminmax) + } + }; } +m4_without_x_parallel!(m4_without_x_parallel, ArgMinMax, |arr| arr.argminmax()); +m4_without_x_parallel!(m4_without_x_parallel_nan, NaNArgMinMax, |arr| arr + .nanargminmax()); + // TODO: check for duplicate data in the output array // -> In the current implementation we always add 4 datapoints per bin (if of // course the bin has >= 4 datapoints). However, the argmin and argmax might // be the start and end of the bin, which would result in duplicate data in // the output array. (this is for example the case for monotonic data). -// ----------------- GENERICS +// ----------------------------------- GENERICS ------------------------------------ // --------------------- WITHOUT X @@ -170,7 +190,6 @@ pub(crate) fn m4_generic(arr: &T, n_out: usize) -> Vec( arr: &[T], n_out: usize, - n_threads: usize, f_argminmax: fn(&[T]) -> (usize, usize), ) -> Vec { // Assumes n_out is a multiple of 4 @@ -182,39 +201,34 @@ pub(crate) fn m4_generic_parallel( let block_size: f64 = (arr.len() - 1) as f64 / (n_out / 4) as f64; // Store the enumerated indexes in the output array + // These indexes are used to calculate the start and end indexes of each bin in + // the multi-threaded execution let mut sampled_indices: Vec = (0..n_out).collect::>(); - // to limit the amounts of threads Rayon uses, an explicit threadpool needs to be created - // in which the required code is "installed". This limits the amount of used threads. - // https://docs.rs/rayon/latest/rayon/struct.ThreadPool.html#method.install - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(n_threads) - .build(); - - let func = || { - for chunk in sampled_indices.chunks_exact_mut(4) { - let i: f64 = unsafe { *chunk.get_unchecked(0) >> 2 } as f64; - let start_idx: usize = (block_size * i) as usize + (i != 0.0) as usize; - let end_idx: usize = (block_size * (i + 1.0)) as usize + 1; - - let (min_index, max_index) = f_argminmax(&arr[start_idx..end_idx]); - - chunk[0] = start_idx; - // Add the indexes in sorted order - if min_index < max_index { - chunk[1] = min_index + start_idx; - chunk[2] = max_index + start_idx; - } else { - chunk[1] = max_index + start_idx; - chunk[2] = min_index + start_idx; - } - chunk[3] = end_idx - 1; - } - }; + POOL.install(|| { + sampled_indices + .par_chunks_exact_mut(4) + .for_each(|sampled_index_chunk| { + let i: f64 = unsafe { *sampled_index_chunk.get_unchecked(0) >> 2 } as f64; + let start_idx: usize = (block_size * i) as usize + (i != 0.0) as usize; + let end_idx: usize = (block_size * (i + 1.0)) as usize + 1; + + let (min_index, max_index) = f_argminmax(&arr[start_idx..end_idx]); - pool.unwrap().install(func); // allow panic if pool could not be created + sampled_index_chunk[0] = start_idx; + // Add the indexes in sorted order + if min_index < max_index { + sampled_index_chunk[1] = min_index + start_idx; + sampled_index_chunk[2] = max_index + start_idx; + } else { + sampled_index_chunk[1] = max_index + start_idx; + sampled_index_chunk[2] = min_index + start_idx; + } + sampled_index_chunk[3] = end_idx - 1; + }) + }); - sampled_indices.to_vec() + sampled_indices } // --------------------- WITH X @@ -268,7 +282,6 @@ pub(crate) fn m4_generic_with_x_parallel( arr: &[T], bin_idx_iterator: impl IndexedParallelIterator>>, n_out: usize, - n_threads: usize, f_argminmax: fn(&[T]) -> (usize, usize), ) -> Vec { // Assumes n_out is a multiple of 4 @@ -276,14 +289,7 @@ pub(crate) fn m4_generic_with_x_parallel( return (0..arr.len()).collect::>(); } - // to limit the amounts of threads Rayon uses, an explicit threadpool needs to be created - // in which the required code is "installed". This limits the amount of used threads. - // https://docs.rs/rayon/latest/rayon/struct.ThreadPool.html#method.install - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(n_threads) - .build(); - - let iter_func = || { + POOL.install(|| { bin_idx_iterator .flat_map(|bin_idx_iterator| { bin_idx_iterator @@ -319,10 +325,7 @@ pub(crate) fn m4_generic_with_x_parallel( }) .flatten() .collect::>() - }; - - let result = pool.unwrap().install(iter_func); // allow panic if pool could not be created - result + }) } #[cfg(test)] @@ -340,14 +343,13 @@ mod tests { utils::get_random_array(n, f32::MIN, f32::MAX) } - // Template for the n_threads matrix + // Template for n_out #[template] #[rstest] - #[case(1)] - #[case(utils::get_all_threads() / 2)] - #[case(utils::get_all_threads())] - #[case(utils::get_all_threads() * 2)] - fn threads(#[case] n_threads: usize) {} + #[case(196)] + #[case(200)] + #[case(204)] + fn n_outs(#[case] n_out: usize) {} #[test] fn test_m4_scalar_without_x_correct() { @@ -369,11 +371,11 @@ mod tests { assert_eq!(sampled_values, expected_values); } - #[apply(threads)] - fn test_m4_scalar_without_x_parallel_correct(n_threads: usize) { + #[test] + fn test_m4_scalar_without_x_parallel_correct() { let arr: [f32; 100] = core::array::from_fn(|i| i.as_()); - let sampled_indices = m4_without_x_parallel(&arr, 12, n_threads); + let sampled_indices = m4_without_x_parallel(&arr, 12); let sampled_values = sampled_indices .iter() .map(|x| arr[*x]) @@ -410,12 +412,12 @@ mod tests { assert_eq!(sampled_values, expected_values); } - #[apply(threads)] - fn test_m4_scalar_with_x_parallel_correct(n_threads: usize) { + #[test] + fn test_m4_scalar_with_x_parallel_correct() { let x: [i32; 100] = core::array::from_fn(|i| i.as_()); let arr: [f32; 100] = core::array::from_fn(|i| i.as_()); - let sampled_indices = m4_with_x_parallel(&x, &arr, 12, n_threads); + let sampled_indices = m4_with_x_parallel(&x, &arr, 12); let sampled_values = sampled_indices .iter() .map(|x| arr[*x]) @@ -454,14 +456,14 @@ mod tests { assert_eq!(sampled_indices, expected_indices); } - #[apply(threads)] - fn test_m4_scalar_with_x_gap_parallel(n_threads: usize) { + #[test] + fn test_m4_scalar_with_x_gap_parallel() { // We will create a gap in the middle of the array // Increment the second half of the array by 50 let x: [i32; 100] = core::array::from_fn(|i| if i > 50 { (i + 50).as_() } else { i.as_() }); let arr: [f32; 100] = core::array::from_fn(|i| i.as_()); - let sampled_indices = m4_with_x_parallel(&x, &arr, 20, n_threads); + let sampled_indices = m4_with_x_parallel(&x, &arr, 20); assert_eq!(sampled_indices.len(), 16); // One full gap let expected_indices = vec![0, 0, 29, 29, 30, 30, 50, 50, 51, 51, 69, 69, 70, 70, 99, 99]; assert_eq!(sampled_indices, expected_indices); @@ -469,7 +471,7 @@ mod tests { // Increment the second half of the array by 50 again let x = x.map(|x| if x > 101 { x + 50 } else { x }); - let sampled_indices = m4_with_x_parallel(&x, &arr, 20, n_threads); + let sampled_indices = m4_with_x_parallel(&x, &arr, 20); assert_eq!(sampled_indices.len(), 17); // Gap with 1 value let expected_indices = vec![ 0, 0, 39, 39, 40, 40, 50, 50, 51, 52, 52, 59, 59, 60, 60, 99, 99, @@ -477,20 +479,19 @@ mod tests { assert_eq!(sampled_indices, expected_indices); } - #[apply(threads)] - fn test_many_random_runs_correct(n_threads: usize) { + #[apply(n_outs)] + fn test_many_random_runs_correct(n_out: usize) { const N: usize = 20_003; - const N_OUT: usize = 204; let x: [i32; N] = core::array::from_fn(|i| i.as_()); for _ in 0..100 { let arr = get_array_f32(N); - let idxs1 = m4_without_x(arr.as_slice(), N_OUT); - let idxs2 = m4_with_x(&x, arr.as_slice(), N_OUT); + let idxs1 = m4_without_x(arr.as_slice(), n_out); + let idxs2 = m4_with_x(&x, arr.as_slice(), n_out); assert_eq!(idxs1, idxs2); - let idxs3 = m4_without_x_parallel(arr.as_slice(), N_OUT, n_threads); - let idxs4 = m4_with_x_parallel(&x, arr.as_slice(), N_OUT, n_threads); + let idxs3 = m4_without_x_parallel(arr.as_slice(), n_out); + let idxs4 = m4_with_x_parallel(&x, arr.as_slice(), n_out); assert_eq!(idxs1, idxs3); - assert_eq!(idxs1, idxs4); // TODO: this should not fail when n_threads = 16 + assert_eq!(idxs1, idxs4); // TODO: this fails when nb. of threads = 16 } } } diff --git a/downsample_rs/src/minmax.rs b/downsample_rs/src/minmax.rs index 6858164..12553d2 100644 --- a/downsample_rs/src/minmax.rs +++ b/downsample_rs/src/minmax.rs @@ -1,76 +1,98 @@ use rayon::iter::IndexedParallelIterator; use rayon::prelude::*; -use argminmax::ArgMinMax; +use argminmax::{ArgMinMax, NaNArgMinMax}; use num_traits::{AsPrimitive, FromPrimitive}; use super::searchsorted::{ get_equidistant_bin_idx_iterator, get_equidistant_bin_idx_iterator_parallel, }; use super::types::Num; +use super::POOL; // ----------------------------------- NON-PARALLEL ------------------------------------ // ----------- WITH X -pub fn min_max_with_x(x: &[Tx], arr: &[Ty], n_out: usize) -> Vec -where - for<'a> &'a [Ty]: ArgMinMax, - Tx: Num + FromPrimitive + AsPrimitive, - Ty: Copy + PartialOrd, -{ - assert_eq!(n_out % 2, 0); - let bin_idx_iterator = get_equidistant_bin_idx_iterator(x, n_out / 2); - min_max_generic_with_x(arr, bin_idx_iterator, n_out, |arr| arr.argminmax()) +macro_rules! min_max_with_x { + ($func_name:ident, $trait:path, $f_argminmax:expr) => { + pub fn $func_name(x: &[Tx], arr: &[Ty], n_out: usize) -> Vec + where + for<'a> &'a [Ty]: $trait, + Tx: Num + FromPrimitive + AsPrimitive, + Ty: Copy + PartialOrd, + { + assert_eq!(n_out % 2, 0); + let bin_idx_iterator = get_equidistant_bin_idx_iterator(x, n_out / 2); + min_max_generic_with_x(arr, bin_idx_iterator, n_out, $f_argminmax) + } + }; } +min_max_with_x!(min_max_with_x, ArgMinMax, |arr| arr.argminmax()); +min_max_with_x!(min_max_with_x_nan, NaNArgMinMax, |arr| arr.nanargminmax()); + // ----------- WITHOUT X -pub fn min_max_without_x(arr: &[T], n_out: usize) -> Vec -where - for<'a> &'a [T]: ArgMinMax, -{ - assert_eq!(n_out % 2, 0); - min_max_generic(arr, n_out, |arr| arr.argminmax()) +macro_rules! min_max_without_x { + ($func_name:ident, $trait:path, $f_argminmax:expr) => { + pub fn $func_name(arr: &[T], n_out: usize) -> Vec + where + for<'a> &'a [T]: $trait, + { + assert_eq!(n_out % 2, 0); + min_max_generic(arr, n_out, $f_argminmax) + } + }; } +min_max_without_x!(min_max_without_x, ArgMinMax, |arr| arr.argminmax()); +min_max_without_x!(min_max_without_x_nan, NaNArgMinMax, |arr| arr + .nanargminmax()); + // ------------------------------------- PARALLEL -------------------------------------- // ----------- WITH X -pub fn min_max_with_x_parallel( - x: &[Tx], - arr: &[Ty], - n_out: usize, - n_threads: usize, -) -> Vec -where - for<'a> &'a [Ty]: ArgMinMax, - Tx: Num + FromPrimitive + AsPrimitive + Send + Sync, - Ty: Copy + PartialOrd + Send + Sync, -{ - assert_eq!(n_out % 2, 0); - let bin_idx_iterator = get_equidistant_bin_idx_iterator_parallel(x, n_out / 2, n_threads); - min_max_generic_with_x_parallel(arr, bin_idx_iterator, n_out, n_threads, |arr| { - arr.argminmax() - }) +macro_rules! min_max_with_x_parallel { + ($func_name:ident, $trait:path, $f_argminmax:expr) => { + pub fn $func_name(x: &[Tx], arr: &[Ty], n_out: usize) -> Vec + where + for<'a> &'a [Ty]: $trait, + Tx: Num + FromPrimitive + AsPrimitive + Send + Sync, + Ty: Copy + PartialOrd + Send + Sync, + { + assert_eq!(n_out % 2, 0); + let bin_idx_iterator = get_equidistant_bin_idx_iterator_parallel(x, n_out / 2); + min_max_generic_with_x_parallel(arr, bin_idx_iterator, n_out, $f_argminmax) + } + }; } +min_max_with_x_parallel!(min_max_with_x_parallel, ArgMinMax, |arr| arr.argminmax()); +min_max_with_x_parallel!(min_max_with_x_parallel_nan, NaNArgMinMax, |arr| arr + .nanargminmax()); + // ----------- WITHOUT X -pub fn min_max_without_x_parallel( - arr: &[T], - n_out: usize, - n_threads: usize, -) -> Vec -where - for<'a> &'a [T]: ArgMinMax, -{ - assert_eq!(n_out % 2, 0); - min_max_generic_parallel(arr, n_out, n_threads, |arr| arr.argminmax()) +macro_rules! min_max_without_x_parallel { + ($func_name:ident, $trait:path, $f_argminmax:expr) => { + pub fn $func_name(arr: &[T], n_out: usize) -> Vec + where + for<'a> &'a [T]: $trait, + { + assert_eq!(n_out % 2, 0); + min_max_generic_parallel(arr, n_out, $f_argminmax) + } + }; } -// ----------------- GENERICS -// + +min_max_without_x_parallel!(min_max_without_x_parallel, ArgMinMax, |arr| arr.argminmax()); +min_max_without_x_parallel!(min_max_without_x_parallel_nan, NaNArgMinMax, |arr| arr + .nanargminmax()); + +// ----------------------------------- GENERICS ------------------------------------ + // --------------------- WITHOUT X #[inline(always)] @@ -117,7 +139,6 @@ pub(crate) fn min_max_generic( pub(crate) fn min_max_generic_parallel( arr: &[T], n_out: usize, - n_threads: usize, f_argminmax: fn(&[T]) -> (usize, usize), ) -> Vec { // Assumes n_out is a multiple of 2 @@ -129,38 +150,32 @@ pub(crate) fn min_max_generic_parallel( let block_size: f64 = (arr.len() - 1) as f64 / (n_out / 2) as f64; // Store the enumerated indexes in the output array - // let mut sampled_indices: Array1 = Array1::from_vec((0..n_out).collect::>()); + // These indexes are used to calculate the start and end indexes of each bin in + // the multi-threaded execution let mut sampled_indices: Vec = (0..n_out).collect::>(); - // to limit the amounts of threads Rayon uses, an explicit threadpool needs to be created - // in which the required code is "installed". This limits the amount of used threads. - // https://docs.rs/rayon/latest/rayon/struct.ThreadPool.html#method.install - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(n_threads) - .build(); - - let func = || { - for chunk in sampled_indices.chunks_exact_mut(2) { - let i: f64 = unsafe { *chunk.get_unchecked(0) >> 1 } as f64; - let start_idx: usize = (block_size * i) as usize + (i != 0.0) as usize; - let end_idx: usize = (block_size * (i + 1.0)) as usize + 1; - - let (min_index, max_index) = f_argminmax(&arr[start_idx..end_idx]); - - // Add the indexes in sorted order - if min_index < max_index { - chunk[0] = min_index + start_idx; - chunk[1] = max_index + start_idx; - } else { - chunk[0] = max_index + start_idx; - chunk[1] = min_index + start_idx; - } - } - }; + POOL.install(|| { + sampled_indices + .par_chunks_exact_mut(2) + .for_each(|sampled_index_chunk| { + let i: f64 = unsafe { *sampled_index_chunk.get_unchecked(0) >> 1 } as f64; + let start_idx: usize = (block_size * i) as usize + (i != 0.0) as usize; + let end_idx: usize = (block_size * (i + 1.0)) as usize + 1; - pool.unwrap().install(func); // allow panic if pool could not be created + let (min_index, max_index) = f_argminmax(&arr[start_idx..end_idx]); + + // Add the indexes in sorted order + if min_index < max_index { + sampled_index_chunk[0] = min_index + start_idx; + sampled_index_chunk[1] = max_index + start_idx; + } else { + sampled_index_chunk[0] = max_index + start_idx; + sampled_index_chunk[1] = min_index + start_idx; + } + }) + }); - sampled_indices.to_vec() + sampled_indices } // --------------------- WITH X @@ -211,7 +226,6 @@ pub(crate) fn min_max_generic_with_x_parallel( arr: &[T], bin_idx_iterator: impl IndexedParallelIterator>>, n_out: usize, - n_threads: usize, f_argminmax: fn(&[T]) -> (usize, usize), ) -> Vec { // Assumes n_out is a multiple of 2 @@ -219,14 +233,7 @@ pub(crate) fn min_max_generic_with_x_parallel( return (0..arr.len()).collect::>(); } - // to limit the amounts of threads Rayon uses, an explicit threadpool needs to be created - // in which the required code is "installed". This limits the amount of used threads. - // https://docs.rs/rayon/latest/rayon/struct.ThreadPool.html#method.install - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(n_threads) - .build(); - - let iter_func = || { + POOL.install(|| { bin_idx_iterator .flat_map(|bin_idx_iterator| { bin_idx_iterator @@ -258,9 +265,7 @@ pub(crate) fn min_max_generic_with_x_parallel( }) .flatten() .collect::>() - }; - - pool.unwrap().install(iter_func) // allow panic if pool could not be created + }) } #[cfg(test)] @@ -278,14 +283,13 @@ mod tests { utils::get_random_array(n, f32::MIN, f32::MAX) } - // Template for the n_threads matrix + // Template for n_out #[template] #[rstest] - #[case(1)] - #[case(utils::get_all_threads() / 2)] - #[case(utils::get_all_threads())] - #[case(utils::get_all_threads() * 2)] - fn threads(#[case] n_threads: usize) {} + #[case(198)] + #[case(200)] + #[case(202)] + fn n_outs(#[case] n_out: usize) {} #[test] fn test_min_max_scalar_without_x_correct() { @@ -307,11 +311,11 @@ mod tests { assert_eq!(sampled_values, expected_values); } - #[apply(threads)] - fn test_min_max_scalar_without_x_parallel_correct(n_threads: usize) { + #[test] + fn test_min_max_scalar_without_x_parallel_correct() { let arr: [f32; 100] = core::array::from_fn(|i| i.as_()); - let sampled_indices = min_max_without_x_parallel(&arr, 10, n_threads); + let sampled_indices = min_max_without_x_parallel(&arr, 10); let sampled_values = sampled_indices .iter() .map(|x| arr[*x]) @@ -348,12 +352,12 @@ mod tests { assert_eq!(sampled_values, expected_values); } - #[apply(threads)] - fn test_min_max_scalar_with_x_parallel_correct(n_threads: usize) { + #[test] + fn test_min_max_scalar_with_x_parallel_correct() { let x: [i32; 100] = core::array::from_fn(|i| i.as_()); let arr: [f32; 100] = core::array::from_fn(|i| i.as_()); - let sampled_indices = min_max_with_x_parallel(&x, &arr, 10, n_threads); + let sampled_indices = min_max_with_x_parallel(&x, &arr, 10); let sampled_values = sampled_indices .iter() .map(|x| arr[*x]) @@ -390,14 +394,14 @@ mod tests { assert_eq!(sampled_indices, expected_indices); } - #[apply(threads)] - fn test_min_max_scalar_with_x_parallel_gap(n_threads: usize) { + #[test] + fn test_min_max_scalar_with_x_parallel_gap() { // Create a gap in the middle of the array // Increment the second half of the array by 50 let x: [i32; 100] = core::array::from_fn(|i| if i > 50 { (i + 50).as_() } else { i.as_() }); let arr: [f32; 100] = core::array::from_fn(|i| i.as_()); - let sampled_indices = min_max_with_x_parallel(&x, &arr, 10, n_threads); + let sampled_indices = min_max_with_x_parallel(&x, &arr, 10); assert_eq!(sampled_indices.len(), 8); // One full gap let expected_indices = vec![0, 29, 30, 50, 51, 69, 70, 99]; assert_eq!(sampled_indices, expected_indices); @@ -405,24 +409,23 @@ mod tests { // Increment the second half of the array by 50 again let x = x.map(|i| if i > 101 { i + 50 } else { i }); - let sampled_indices = min_max_with_x_parallel(&x, &arr, 10, n_threads); + let sampled_indices = min_max_with_x_parallel(&x, &arr, 10); assert_eq!(sampled_indices.len(), 9); // Gap with 1 value let expected_indices = vec![0, 39, 40, 50, 51, 52, 59, 60, 99]; assert_eq!(sampled_indices, expected_indices); } - #[apply(threads)] - fn test_many_random_runs_same_output(n_threads: usize) { + #[apply(n_outs)] + fn test_many_random_runs_same_output(n_out: usize) { const N: usize = 20_003; - const N_OUT: usize = 202; let x: [i32; N] = core::array::from_fn(|i| i.as_()); for _ in 0..100 { let mut arr = get_array_f32(N); arr[N - 1] = f32::INFINITY; // Make sure the last value is always the max - let idxs1 = min_max_without_x(arr.as_slice(), N_OUT); - let idxs2 = min_max_without_x_parallel(arr.as_slice(), N_OUT, n_threads); - let idxs3 = min_max_with_x(&x, arr.as_slice(), N_OUT); - let idxs4 = min_max_with_x_parallel(&x, arr.as_slice(), N_OUT, n_threads); + let idxs1 = min_max_without_x(arr.as_slice(), n_out); + let idxs2 = min_max_without_x_parallel(arr.as_slice(), n_out); + let idxs3 = min_max_with_x(&x, arr.as_slice(), n_out); + let idxs4 = min_max_with_x_parallel(&x, arr.as_slice(), n_out); assert_eq!(idxs1, idxs2); assert_eq!(idxs1, idxs3); assert_eq!(idxs1, idxs4); diff --git a/downsample_rs/src/minmaxlttb.rs b/downsample_rs/src/minmaxlttb.rs index 9a0b708..60cc796 100644 --- a/downsample_rs/src/minmaxlttb.rs +++ b/downsample_rs/src/minmaxlttb.rs @@ -1,4 +1,4 @@ -use argminmax::ArgMinMax; +use argminmax::{ArgMinMax, NaNArgMinMax}; use super::lttb::{lttb_with_x, lttb_without_x}; use super::types::Num; @@ -10,106 +10,117 @@ use num_traits::{AsPrimitive, FromPrimitive}; // ----------- WITH X -pub fn minmaxlttb_with_x + FromPrimitive, Ty: Num + AsPrimitive>( - x: &[Tx], - y: &[Ty], - n_out: usize, - minmax_ratio: usize, -) -> Vec -where - for<'a> &'a [Ty]: ArgMinMax, -{ - minmaxlttb_generic( - x, - y, - n_out, - minmax_ratio, - None, - MinMaxFunctionWithX::Serial(minmax::min_max_with_x), - ) +macro_rules! minmaxlttb_with_x { + ($func_name:ident, $trait:ident, $f_minmax:expr) => { + pub fn $func_name( + x: &[Tx], + y: &[Ty], + n_out: usize, + minmax_ratio: usize, + ) -> Vec + where + for<'a> &'a [Ty]: $trait, + Tx: Num + AsPrimitive + FromPrimitive, + Ty: Num + AsPrimitive, + { + minmaxlttb_generic(x, y, n_out, minmax_ratio, $f_minmax) + } + }; } +minmaxlttb_with_x!(minmaxlttb_with_x, ArgMinMax, minmax::min_max_with_x); +minmaxlttb_with_x!( + minmaxlttb_with_x_nan, + NaNArgMinMax, + minmax::min_max_with_x_nan +); + // ----------- WITHOUT X -pub fn minmaxlttb_without_x>( - y: &[Ty], - n_out: usize, - minmax_ratio: usize, -) -> Vec -where - for<'a> &'a [Ty]: ArgMinMax, -{ - minmaxlttb_generic_without_x( - y, - n_out, - minmax_ratio, - None, - MinMaxFunctionWithoutX::Serial(minmax::min_max_without_x), - ) +macro_rules! minmaxlttb_without_x { + ($func_name:ident, $trait:ident, $f_minmax:expr) => { + pub fn $func_name>( + y: &[Ty], + n_out: usize, + minmax_ratio: usize, + ) -> Vec + where + for<'a> &'a [Ty]: $trait, + { + minmaxlttb_generic_without_x(y, n_out, minmax_ratio, $f_minmax) + } + }; } +minmaxlttb_without_x!(minmaxlttb_without_x, ArgMinMax, minmax::min_max_without_x); +minmaxlttb_without_x!( + minmaxlttb_without_x_nan, + NaNArgMinMax, + minmax::min_max_without_x_nan +); + // ------------------------------------- PARALLEL -------------------------------------- // ----------- WITH X -pub fn minmaxlttb_with_x_parallel< - Tx: Num + AsPrimitive + FromPrimitive + Send + Sync, - Ty: Num + AsPrimitive + Send + Sync, ->( - x: &[Tx], - y: &[Ty], - n_out: usize, - minmax_ratio: usize, - n_threads: usize, -) -> Vec -where - for<'a> &'a [Ty]: ArgMinMax, -{ - minmaxlttb_generic( - x, - y, - n_out, - minmax_ratio, - Some(n_threads), - MinMaxFunctionWithX::Parallel(minmax::min_max_with_x_parallel), - ) +macro_rules! minmaxlttb_with_x_parallel { + ($func_name:ident, $trait:ident, $f_minmax:expr) => { + pub fn $func_name( + x: &[Tx], + y: &[Ty], + n_out: usize, + minmax_ratio: usize, + ) -> Vec + where + for<'a> &'a [Ty]: $trait, + Tx: Num + AsPrimitive + FromPrimitive + Send + Sync, + Ty: Num + AsPrimitive + Send + Sync, + { + minmaxlttb_generic(x, y, n_out, minmax_ratio, $f_minmax) + } + }; } +minmaxlttb_with_x_parallel!( + minmaxlttb_with_x_parallel, + ArgMinMax, + minmax::min_max_with_x_parallel +); +minmaxlttb_with_x_parallel!( + minmaxlttb_with_x_parallel_nan, + NaNArgMinMax, + minmax::min_max_with_x_parallel_nan +); + // ----------- WITHOUT X -pub fn minmaxlttb_without_x_parallel + Send + Sync>( - y: &[Ty], - n_out: usize, - minmax_ratio: usize, - n_threads: usize, -) -> Vec -where - for<'a> &'a [Ty]: ArgMinMax, -{ - minmaxlttb_generic_without_x( - y, - n_out, - minmax_ratio, - Some(n_threads), - MinMaxFunctionWithoutX::Parallel(minmax::min_max_without_x_parallel), - ) +macro_rules! minmaxlttb_without_x_parallel { + ($func_name:ident, $trait:ident, $f_minmax:expr) => { + pub fn $func_name + Send + Sync>( + y: &[Ty], + n_out: usize, + minmax_ratio: usize, + ) -> Vec + where + for<'a> &'a [Ty]: $trait, + { + minmaxlttb_generic_without_x(y, n_out, minmax_ratio, $f_minmax) + } + }; } -// ----------------------------------- GENERICS ------------------------------------ - -// types to make function signatures easier to read -type ThreadCount = usize; -type OutputCount = usize; - -pub enum MinMaxFunctionWithX, Ty: Num + AsPrimitive> { - Serial(fn(&[Tx], &[Ty], OutputCount) -> Vec), - Parallel(fn(&[Tx], &[Ty], OutputCount, ThreadCount) -> Vec), -} +minmaxlttb_without_x_parallel!( + minmaxlttb_without_x_parallel, + ArgMinMax, + minmax::min_max_without_x_parallel +); +minmaxlttb_without_x_parallel!( + minmaxlttb_without_x_parallel_nan, + NaNArgMinMax, + minmax::min_max_without_x_parallel_nan +); -pub enum MinMaxFunctionWithoutX> { - Serial(fn(&[Ty], OutputCount) -> Vec), - Parallel(fn(&[Ty], OutputCount, ThreadCount) -> Vec), -} +// ----------------------------------- GENERICS ------------------------------------ #[inline(always)] pub(crate) fn minmaxlttb_generic, Ty: Num + AsPrimitive>( @@ -117,30 +128,18 @@ pub(crate) fn minmaxlttb_generic, Ty: Num + AsPrimiti y: &[Ty], n_out: usize, minmax_ratio: usize, - n_threads: Option, - f_minmax: MinMaxFunctionWithX, -) -> Vec -where - for<'a> &'a [Ty]: ArgMinMax, -{ + f_minmax: fn(&[Tx], &[Ty], usize) -> Vec, +) -> Vec { assert_eq!(x.len(), y.len()); assert!(minmax_ratio > 1); // Apply first min max aggregation (if above ratio) if x.len() / n_out > minmax_ratio { // Get index of min max points - let mut index = match f_minmax { - MinMaxFunctionWithX::Serial(func) => func( - &x[1..(x.len() - 1)], - &y[1..(x.len() - 1)], - n_out * minmax_ratio, - ), - MinMaxFunctionWithX::Parallel(func) => func( - &x[1..(x.len() - 1)], - &y[1..(x.len() - 1)], - n_out * minmax_ratio, - n_threads.unwrap(), // n_threads cannot be None - ), - }; + let mut index = f_minmax( + &x[1..(x.len() - 1)], + &y[1..(x.len() - 1)], + n_out * minmax_ratio, + ); // inplace + 1 index.iter_mut().for_each(|elem| *elem += 1); // Prepend first and last point @@ -176,26 +175,13 @@ pub(crate) fn minmaxlttb_generic_without_x>( y: &[Ty], n_out: usize, minmax_ratio: usize, - n_threads: Option, - f_minmax: MinMaxFunctionWithoutX, -) -> Vec -where - for<'a> &'a [Ty]: ArgMinMax, -{ + f_minmax: fn(&[Ty], usize) -> Vec, +) -> Vec { assert!(minmax_ratio > 1); // Apply first min max aggregation (if above ratio) if y.len() / n_out > minmax_ratio { // Get index of min max points - let mut index = match f_minmax { - MinMaxFunctionWithoutX::Serial(func) => { - func(&y[1..(y.len() - 1)], n_out * minmax_ratio) - } - MinMaxFunctionWithoutX::Parallel(func) => func( - &y[1..(y.len() - 1)], - n_out * minmax_ratio, - n_threads.unwrap(), // n_threads cannot be None - ), - }; + let mut index = f_minmax(&y[1..(y.len() - 1)], n_out * minmax_ratio); // inplace + 1 index.iter_mut().for_each(|elem| *elem += 1); // Prepend first and last point @@ -208,8 +194,8 @@ where .map(|i| *y.get_unchecked(*i)) .collect::>() }; - // Apply lttb on the reduced data - let index_points_selected = lttb_without_x(y.as_slice(), n_out); + // Apply lttb on the reduced data (using the preselect data its index) + let index_points_selected = lttb_with_x(index.as_slice(), y.as_slice(), n_out); // Return the original index return index_points_selected .iter() @@ -234,14 +220,13 @@ mod tests { utils::get_random_array(n, f32::MIN, f32::MAX) } - // Template for the n_threads matrix + // Template for n_out #[template] #[rstest] - #[case(1)] - #[case(utils::get_all_threads() / 2)] - #[case(utils::get_all_threads())] - #[case(utils::get_all_threads() * 2)] - fn threads(#[case] n_threads: usize) {} + #[case(98)] + #[case(100)] + #[case(102)] + fn n_outs(#[case] n_out: usize) {} #[test] fn test_minmaxlttb_with_x() { @@ -258,32 +243,30 @@ mod tests { assert_eq!(sampled_indices, vec![0, 1, 5, 9]); } - #[apply(threads)] - fn test_minmaxlttb_with_x_parallel(n_threads: usize) { + #[test] + fn test_minmaxlttb_with_x_parallel() { let x = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; let y = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]; - let sampled_indices = minmaxlttb_with_x_parallel(&x, &y, 4, 2, n_threads); + let sampled_indices = minmaxlttb_with_x_parallel(&x, &y, 4, 2); assert_eq!(sampled_indices, vec![0, 1, 5, 9]); } - #[apply(threads)] - fn test_minmaxlttb_without_x_parallel(n_threads: usize) { + #[test] + fn test_minmaxlttb_without_x_parallel() { let y = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]; - let sampled_indices = minmaxlttb_without_x_parallel(&y, 4, 2, n_threads); + let sampled_indices = minmaxlttb_without_x_parallel(&y, 4, 2); assert_eq!(sampled_indices, vec![0, 1, 5, 9]); } - #[apply(threads)] - fn test_many_random_runs_same_output(n_threads: usize) { + #[apply(n_outs)] + fn test_many_random_runs_same_output(n_out: usize) { const N: usize = 20_000; - const N_OUT: usize = 100; const MINMAX_RATIO: usize = 5; for _ in 0..100 { // TODO: test with x let arr = get_array_f32(N); - let idxs1 = minmaxlttb_without_x(arr.as_slice(), N_OUT, MINMAX_RATIO); - let idxs2 = - minmaxlttb_without_x_parallel(arr.as_slice(), N_OUT, MINMAX_RATIO, n_threads); + let idxs1 = minmaxlttb_without_x(arr.as_slice(), n_out, MINMAX_RATIO); + let idxs2 = minmaxlttb_without_x_parallel(arr.as_slice(), n_out, MINMAX_RATIO); assert_eq!(idxs1, idxs2); } } diff --git a/downsample_rs/src/searchsorted.rs b/downsample_rs/src/searchsorted.rs index 5a0d552..4c544e2 100644 --- a/downsample_rs/src/searchsorted.rs +++ b/downsample_rs/src/searchsorted.rs @@ -4,6 +4,7 @@ use rayon::prelude::*; use crate::types::{BinSearchParam, Indexable}; use super::types::Num; +use super::POOL; use num_traits::{AsPrimitive, FromPrimitive}; // ---------------------- Binary search ---------------------- @@ -176,7 +177,6 @@ fn sequential_add_mul(start_val: f64, add_val: f64, mul: usize) -> f64 { pub(crate) fn get_equidistant_bin_idx_iterator_parallel( arr: &[T], nb_bins: usize, - n_threads: usize, ) -> impl IndexedParallelIterator> + '_> + '_ where T: Num + FromPrimitive + AsPrimitive + Sync + Send, @@ -189,7 +189,7 @@ where (arr[arr.len() - 1].as_() / nb_bins as f64) - (arr[0].as_() / nb_bins as f64); let arr0: f64 = arr[0].as_(); // The first value of the array // 2. Compute the number of threads & bins per thread - let n_threads = std::cmp::min(n_threads, nb_bins); + let n_threads = std::cmp::min(POOL.current_num_threads(), nb_bins); let nb_bins_per_thread = nb_bins / n_threads; let nb_bins_last_thread = nb_bins - nb_bins_per_thread * (n_threads - 1); // 3. Iterate over the number of threads @@ -237,16 +237,15 @@ mod tests { use super::*; - use dev_utils::utils::{get_all_threads, get_random_array}; + use dev_utils::utils::get_random_array; - // Template for the n_threads matrix + // Template for nb_bins #[template] #[rstest] - #[case(1)] - #[case(get_all_threads() / 2)] - #[case(get_all_threads())] - #[case(get_all_threads() * 2)] - fn threads(#[case] n_threads: usize) {} + #[case(99)] + #[case(100)] + #[case(101)] + fn nb_bins(#[case] nb_bins: usize) {} #[test] fn test_search_sorted_identicial_to_np_linspace_searchsorted() { @@ -302,8 +301,8 @@ mod tests { // assert_eq!(binary_search_with_mid(&arr, 11, 0, arr.len() - 1, 9), 10); } - #[apply(threads)] - fn test_get_equidistant_bin_idxs(n_threads: usize) { + #[test] + fn test_get_equidistant_bin_idxs() { let expected_indices = vec![0, 4, 7]; let arr = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; @@ -312,7 +311,7 @@ mod tests { let bin_idxs = bin_idxs_iter.map(|x| x.unwrap().0).collect::>(); assert_eq!(bin_idxs, expected_indices); - let bin_idxs_iter = get_equidistant_bin_idx_iterator_parallel(&arr, 3, n_threads); + let bin_idxs_iter = get_equidistant_bin_idx_iterator_parallel(&arr, 3); let bin_idxs = bin_idxs_iter .map(|x| x.map(|x| x.unwrap().0).collect::>()) .flatten() @@ -320,10 +319,9 @@ mod tests { assert_eq!(bin_idxs, expected_indices); } - #[apply(threads)] - fn test_many_random_same_result(n_threads: usize) { + #[apply(nb_bins)] + fn test_many_random_same_result(nb_bins: usize) { let n = 5_000; - let nb_bins = 100; for _ in 0..100 { let mut arr = get_random_array::(n, i32::MIN, i32::MAX); @@ -335,8 +333,7 @@ mod tests { let bin_idxs = bin_idxs_iter.map(|x| x.unwrap().0).collect::>(); // Calculate the bin indexes in parallel - let bin_idxs_iter = - get_equidistant_bin_idx_iterator_parallel(&arr[..], nb_bins, n_threads); + let bin_idxs_iter = get_equidistant_bin_idx_iterator_parallel(&arr[..], nb_bins); let bin_idxs_parallel = bin_idxs_iter .map(|x| x.map(|x| x.unwrap().0).collect::>()) .flatten() diff --git a/pyproject.toml b/pyproject.toml index 8583728..074b23c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "maturin" [project] name = "tsdownsample" description = "Time series downsampling in rust" -version = "0.1.2" +version = "0.1.3" requires-python = ">=3.7" dependencies = ["numpy"] authors = [{name = "Jeroen Van Der Donckt"}] @@ -21,6 +21,7 @@ classifiers = [ 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: 3.11', + 'Programming Language :: Python :: 3.12', 'Operating System :: POSIX', 'Operating System :: MacOS :: MacOS X', 'Operating System :: Microsoft :: Windows' diff --git a/src/lib.rs b/src/lib.rs index 5a93afc..9348e01 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,25 +32,6 @@ macro_rules! _create_pyfunc_without_x { }; } -macro_rules! _create_pyfunc_without_x_multithreaded { - ($name:ident, $resample_mod:ident, $resample_fn:ident, $type:ty, $mod:ident) => { - // Create the Python function - #[pyfunction] - fn $name<'py>( - py: Python<'py>, - y: PyReadonlyArray1<$type>, - n_out: usize, - n_threads: usize, - ) -> &'py PyArray1 { - let y = y.as_slice().unwrap(); - let sampled_indices = $resample_mod::$resample_fn(y, n_out, n_threads); - sampled_indices.into_pyarray(py) - } - // Add the function to the module - $mod.add_wrapped(wrap_pyfunction!($name))?; - }; -} - macro_rules! _create_pyfunc_without_x_with_ratio { ($name:ident, $resample_mod:ident, $resample_fn:ident, $type:ty, $mod:ident) => { // Create the Python function @@ -70,26 +51,6 @@ macro_rules! _create_pyfunc_without_x_with_ratio { }; } -macro_rules! _create_pyfunc_without_x_with_ratio_multithreaded { - ($name:ident, $resample_mod:ident, $resample_fn:ident, $type:ty, $mod:ident) => { - // Create the Python function - #[pyfunction] - fn $name<'py>( - py: Python<'py>, - y: PyReadonlyArray1<$type>, - n_out: usize, - ratio: usize, - n_threads: usize, - ) -> &'py PyArray1 { - let y = y.as_slice().unwrap(); - let sampled_indices = $resample_mod::$resample_fn(y, n_out, ratio, n_threads); - sampled_indices.into_pyarray(py) - } - // Add the function to the module - $mod.add_wrapped(wrap_pyfunction!($name))?; - }; -} - macro_rules! _create_pyfuncs_without_x_generic { ($create_macro:ident, $resample_mod:ident, $resample_fn:ident, $mod:ident, $($t:ty)*) => { $( @@ -98,6 +59,14 @@ macro_rules! _create_pyfuncs_without_x_generic { } )* }; + + (@nan $create_macro:ident, $resample_mod:ident, $resample_fn:ident, $mod:ident, $($t:ty)*) => { + $( + paste! { + $create_macro!([], $resample_mod, $resample_fn, $t, $mod); + } + )* + }; } // With x-range @@ -122,27 +91,6 @@ macro_rules! _create_pyfunc_with_x { }; } -macro_rules! _create_pyfunc_with_x_multithreaded { - ($name:ident, $resample_mod:ident, $resample_fn:ident, $type_x:ty, $type_y:ty, $mod:ident) => { - // Create the Python function - #[pyfunction] - fn $name<'py>( - py: Python<'py>, - x: PyReadonlyArray1<$type_x>, - y: PyReadonlyArray1<$type_y>, - n_out: usize, - n_threads: usize, - ) -> &'py PyArray1 { - let x = x.as_slice().unwrap(); - let y = y.as_slice().unwrap(); - let sampled_indices = $resample_mod::$resample_fn(x, y, n_out, n_threads); - sampled_indices.into_pyarray(py) - } - // Add the function to the module - $mod.add_wrapped(wrap_pyfunction!($name))?; - }; -} - macro_rules! _create_pyfunc_with_x_with_ratio { ($name:ident, $resample_mod:ident, $resample_fn:ident, $type_x:ty, $type_y:ty, $mod:ident) => { // Create the Python function @@ -164,34 +112,7 @@ macro_rules! _create_pyfunc_with_x_with_ratio { }; } -macro_rules! _create_pyfunc_with_x_with_ratio_multithreaded { - ($name:ident, $resample_mod:ident, $resample_fn:ident, $type_x:ty, $type_y:ty, $mod:ident) => { - // Create the Python function - #[pyfunction] - fn $name<'py>( - py: Python<'py>, - x: PyReadonlyArray1<$type_x>, - y: PyReadonlyArray1<$type_y>, - n_out: usize, - ratio: usize, - n_threads: usize, - ) -> &'py PyArray1 { - let x = x.as_slice().unwrap(); - let y = y.as_slice().unwrap(); - let sampled_indices = $resample_mod::$resample_fn(x, y, n_out, ratio, n_threads); - sampled_indices.into_pyarray(py) - } - // Add the function to the module - $mod.add_wrapped(wrap_pyfunction!($name))?; - }; -} - macro_rules! _create_pyfuncs_with_x_generic { - // ($create_macro:ident, $resample_mod:ident, $resample_fn:ident, $mod:ident, $($t:ty)+) => { - // // The macro will implement the function for all combinations of $t (for type x and y). - // // (duplicate the list of types to iterate over all combinations) - // _create_pyfuncs_with_x_generic!(@inner $create_macro, $resample_mod, $resample_fn, $mod, $($t)+; $($t),+); - // }; ($create_macro:ident, $resample_mod:ident, $resample_fn:ident, $mod:ident, $($tx:ty)+, $($ty:ty)+) => { // The macro will implement the function for all combinations of $tx and $ty (for respectively type x and y). @@ -216,16 +137,44 @@ macro_rules! _create_pyfuncs_with_x_generic { // and https://users.rust-lang.org/t/tail-recursive-macros/905/3 } +// TODO: there must be a better way to combine normal and nan macros +macro_rules! _create_nan_pyfuncs_with_x_generic { + + ($create_macro:ident, $resample_mod:ident, $resample_fn:ident, $mod:ident, $($tx:ty)+, $($ty:ty)+) => { + // The macro will implement the function for all combinations of $tx and $ty (for respectively type x and y). + _create_nan_pyfuncs_with_x_generic!(@inner $create_macro, $resample_mod, $resample_fn, $mod, $($tx)+; $($ty),+); + }; + + // Base case: there is only one type (for y) left + (@inner $create_macro:ident, $resample_mod:ident, $resample_fn:ident, $mod:ident, $($tx:ty)+; $ty:ty) => { + $( + paste! { + $create_macro!([], $resample_mod, $resample_fn, $tx, $ty, $mod); + } + )* + }; + // The head/tail recursion: pick the first element -> apply the base case, and recurse over the rest. + (@inner $create_macro:ident, $resample_mod:ident, $resample_fn:ident, $mod:ident, $($tx:ty)+; $ty_head:ty, $($ty_rest:ty),+) => { + _create_nan_pyfuncs_with_x_generic!(@inner $create_macro, $resample_mod, $resample_fn, $mod, $($tx)+; $ty_head); + _create_nan_pyfuncs_with_x_generic!(@inner $create_macro, $resample_mod, $resample_fn, $mod, $($tx)+; $($ty_rest),+); + }; + + // Huge thx to https://stackoverflow.com/a/54552848 + // and https://users.rust-lang.org/t/tail-recursive-macros/905/3 +} // ------ Main macros ------ macro_rules! _create_pyfuncs_without_x_helper { ($pyfunc_fn:ident, $resample_mod:ident, $resample_fn:ident, $mod:ident) => { _create_pyfuncs_without_x_generic!($pyfunc_fn, $resample_mod, $resample_fn, $mod, f16 f32 f64 i8 i16 i32 i64 u8 u16 u32 u64); }; + + (@nan $pyfunc_fn:ident, $resample_mod:ident, $resample_fn:ident, $mod:ident) => { + _create_pyfuncs_without_x_generic!(@nan $pyfunc_fn, $resample_mod, $resample_fn, $mod, f16 f32 f64); + }; } macro_rules! create_pyfuncs_without_x { - // Use @threaded to differentiate between the single and multithreaded versions ($resample_mod:ident, $resample_fn:ident, $mod:ident) => { _create_pyfuncs_without_x_helper!( _create_pyfunc_without_x, @@ -234,9 +183,9 @@ macro_rules! create_pyfuncs_without_x { $mod ); }; - (@threaded $resample_mod:ident, $resample_fn:ident, $mod:ident) => { - _create_pyfuncs_without_x_helper!( - _create_pyfunc_without_x_multithreaded, + (@nan $resample_mod:ident, $resample_fn:ident, $mod:ident) => { + _create_pyfuncs_without_x_helper!(@nan + _create_pyfunc_without_x, $resample_mod, $resample_fn, $mod @@ -245,7 +194,6 @@ macro_rules! create_pyfuncs_without_x { } macro_rules! create_pyfuncs_without_x_with_ratio { - // Use @threaded to differentiate between the single and multithreaded versions ($resample_mod:ident, $resample_fn:ident, $mod:ident) => { _create_pyfuncs_without_x_helper!( _create_pyfunc_without_x_with_ratio, @@ -254,9 +202,9 @@ macro_rules! create_pyfuncs_without_x_with_ratio { $mod ); }; - (@threaded $resample_mod:ident, $resample_fn:ident, $mod:ident) => { - _create_pyfuncs_without_x_helper!( - _create_pyfunc_without_x_with_ratio_multithreaded, + (@nan $resample_mod:ident, $resample_fn:ident, $mod:ident) => { + _create_pyfuncs_without_x_helper!(@nan + _create_pyfunc_without_x_with_ratio, $resample_mod, $resample_fn, $mod @@ -268,25 +216,21 @@ macro_rules! _create_pyfuncs_with_x_helper { ($pyfunc_fn:ident, $resample_mod:ident, $resample_fn:ident, $mod:ident) => { _create_pyfuncs_with_x_generic!($pyfunc_fn, $resample_mod, $resample_fn, $mod, f32 f64 i16 i32 i64 u16 u32 u64, f16 f32 f64 i8 i16 i32 i64 u8 u16 u32 u64); }; + (@nan $pyfunc_fn:ident, $resample_mod:ident, $resample_fn:ident, $mod:ident) => { + _create_nan_pyfuncs_with_x_generic!($pyfunc_fn, $resample_mod, $resample_fn, $mod, f32 f64 i16 i32 i64 u16 u32 u64, f16 f32 f64); + }; } macro_rules! create_pyfuncs_with_x { - // Use @threaded to differentiate between the single and multithreaded versions ($resample_mod:ident, $resample_fn:ident, $mod:ident) => { _create_pyfuncs_with_x_helper!(_create_pyfunc_with_x, $resample_mod, $resample_fn, $mod); }; - (@threaded $resample_mod:ident, $resample_fn:ident, $mod:ident) => { - _create_pyfuncs_with_x_helper!( - _create_pyfunc_with_x_multithreaded, - $resample_mod, - $resample_fn, - $mod - ); + (@nan $resample_mod:ident, $resample_fn:ident, $mod:ident) => { + _create_pyfuncs_with_x_helper!(@nan _create_pyfunc_with_x, $resample_mod, $resample_fn, $mod); }; } macro_rules! create_pyfuncs_with_x_with_ratio { - // Use @threaded to differentiate between the single and multithreaded versions ($resample_mod:ident, $resample_fn:ident, $mod:ident) => { _create_pyfuncs_with_x_helper!( _create_pyfunc_with_x_with_ratio, @@ -295,9 +239,9 @@ macro_rules! create_pyfuncs_with_x_with_ratio { $mod ); }; - (@threaded $resample_mod:ident, $resample_fn:ident, $mod:ident) => { - _create_pyfuncs_with_x_helper!( - _create_pyfunc_with_x_with_ratio_multithreaded, + (@nan $resample_mod:ident, $resample_fn:ident, $mod:ident) => { + _create_pyfuncs_with_x_helper!(@nan + _create_pyfunc_with_x_with_ratio, $resample_mod, $resample_fn, $mod @@ -319,11 +263,13 @@ fn minmax(_py: Python<'_>, m: &PyModule) -> PyResult<()> { // ----- WITHOUT X { create_pyfuncs_without_x!(minmax_mod, min_max_without_x, sequential_mod); + create_pyfuncs_without_x!(@nan minmax_mod, min_max_without_x_nan, sequential_mod); } // ----- WITH X { create_pyfuncs_with_x!(minmax_mod, min_max_with_x, sequential_mod); + create_pyfuncs_with_x!(@nan minmax_mod, min_max_with_x_nan, sequential_mod); } // ----------------- PARALLEL @@ -332,12 +278,14 @@ fn minmax(_py: Python<'_>, m: &PyModule) -> PyResult<()> { // ----- WITHOUT X { - create_pyfuncs_without_x!(@threaded minmax_mod, min_max_without_x_parallel, parallel_mod); + create_pyfuncs_without_x!(minmax_mod, min_max_without_x_parallel, parallel_mod); + create_pyfuncs_without_x!(@nan minmax_mod, min_max_without_x_parallel, parallel_mod); } // ----- WITH X { - create_pyfuncs_with_x!(@threaded minmax_mod, min_max_with_x_parallel, parallel_mod); + create_pyfuncs_with_x!(minmax_mod, min_max_with_x_parallel, parallel_mod); + create_pyfuncs_with_x!(@nan minmax_mod, min_max_with_x_parallel, parallel_mod); } // Add the sub modules to the module @@ -361,11 +309,13 @@ fn m4(_py: Python, m: &PyModule) -> PyResult<()> { // ----- WITHOUT X { create_pyfuncs_without_x!(m4_mod, m4_without_x, sequential_mod); + create_pyfuncs_without_x!(@nan m4_mod, m4_without_x_nan, sequential_mod); } // ----- WITH X { create_pyfuncs_with_x!(m4_mod, m4_with_x, sequential_mod); + create_pyfuncs_with_x!(@nan m4_mod, m4_with_x_nan, sequential_mod); } // ----------------- PARALLEL @@ -374,12 +324,14 @@ fn m4(_py: Python, m: &PyModule) -> PyResult<()> { // ----- WITHOUT X { - create_pyfuncs_without_x!(@threaded m4_mod, m4_without_x_parallel, parallel_mod); + create_pyfuncs_without_x!(m4_mod, m4_without_x_parallel, parallel_mod); + create_pyfuncs_without_x!(@nan m4_mod, m4_without_x_parallel, parallel_mod); } // ----- WITH X { - create_pyfuncs_with_x!(@threaded m4_mod, m4_with_x_parallel, parallel_mod); + create_pyfuncs_with_x!(m4_mod, m4_with_x_parallel, parallel_mod); + create_pyfuncs_with_x!(@nan m4_mod, m4_with_x_parallel, parallel_mod); } // Add the sub modules to the module @@ -431,11 +383,13 @@ fn minmaxlttb(_py: Python, m: &PyModule) -> PyResult<()> { // ----- WITHOUT X { create_pyfuncs_without_x_with_ratio!(minmaxlttb_mod, minmaxlttb_without_x, sequential_mod); + create_pyfuncs_without_x_with_ratio!(@nan minmaxlttb_mod, minmaxlttb_without_x_nan, sequential_mod); } // ----- WITH X { create_pyfuncs_with_x_with_ratio!(minmaxlttb_mod, minmaxlttb_with_x, sequential_mod); + create_pyfuncs_with_x_with_ratio!(@nan minmaxlttb_mod, minmaxlttb_with_x_nan, sequential_mod); } // ----------------- PARALLEL @@ -444,7 +398,12 @@ fn minmaxlttb(_py: Python, m: &PyModule) -> PyResult<()> { // ----- WITHOUT X { - create_pyfuncs_without_x_with_ratio!(@threaded + create_pyfuncs_without_x_with_ratio!( + minmaxlttb_mod, + minmaxlttb_without_x_parallel, + parallel_mod + ); + create_pyfuncs_without_x_with_ratio!(@nan minmaxlttb_mod, minmaxlttb_without_x_parallel, parallel_mod @@ -453,11 +412,8 @@ fn minmaxlttb(_py: Python, m: &PyModule) -> PyResult<()> { // ----- WITH X { - create_pyfuncs_with_x_with_ratio!(@threaded - minmaxlttb_mod, - minmaxlttb_with_x_parallel, - parallel_mod - ); + create_pyfuncs_with_x_with_ratio!(minmaxlttb_mod, minmaxlttb_with_x_parallel, parallel_mod); + create_pyfuncs_with_x_with_ratio!(@nan minmaxlttb_mod, minmaxlttb_with_x_parallel, parallel_mod); } // Add the submodules to the module diff --git a/tests/benchmarks/test_downsamplers.py b/tests/benchmarks/test_downsamplers.py index 515ac1e..c7ca255 100644 --- a/tests/benchmarks/test_downsamplers.py +++ b/tests/benchmarks/test_downsamplers.py @@ -1,5 +1,3 @@ -import os - import numpy as np import pytest @@ -9,6 +7,9 @@ M4Downsampler, MinMaxDownsampler, MinMaxLTTBDownsampler, + NaNM4Downsampler, + NaNMinMaxDownsampler, + NaNMinMaxLTTBDownsampler, ) NB_SAMPLES = ["100,000", "1,000,000"] @@ -16,10 +17,6 @@ Y_DTYPES = [np.float32, np.float64] + [np.int32, np.int64] -def _parallel_to_n_threads(parallel): - return 0 if not parallel else os.cpu_count() - - # --------------------------------------------------------------------------- # # MinMaxDownsampler # --------------------------------------------------------------------------- # @@ -35,11 +32,10 @@ def test_minmax_no_x(benchmark, n_samples, n_out, dtype, parallel): downsampler = MinMaxDownsampler() n_samples = int(n_samples.replace(",", "")) n_out = int(n_out.replace(",", "")) - n_threads = _parallel_to_n_threads(parallel) y = np.random.randn(n_samples).astype(dtype) - benchmark(downsampler.downsample, y, n_out=n_out, n_threads=n_threads) + benchmark(downsampler.downsample, y, n_out=n_out, parallel=parallel) @pytest.mark.benchmark(group="minmax") @@ -52,12 +48,44 @@ def test_minmax_with_x(benchmark, n_samples, n_out, dtype, parallel): downsampler = MinMaxDownsampler() n_samples = int(n_samples.replace(",", "")) n_out = int(n_out.replace(",", "")) - n_threads = _parallel_to_n_threads(parallel) x = np.arange(n_samples) y = np.random.randn(n_samples).astype(dtype) - benchmark(downsampler.downsample, x, y, n_out=n_out, n_threads=n_threads) + benchmark(downsampler.downsample, x, y, n_out=n_out, parallel=parallel) + + +@pytest.mark.benchmark(group="nanminmax") +@pytest.mark.parametrize("n_samples", NB_SAMPLES) +@pytest.mark.parametrize("n_out", N_OUT) +@pytest.mark.parametrize("dtype", Y_DTYPES) +@pytest.mark.parametrize("parallel", [False, True]) +def test_nanminmax_no_x(benchmark, n_samples, n_out, dtype, parallel): + """Test the MinMaxDownsampler.""" + downsampler = NaNMinMaxDownsampler() + n_samples = int(n_samples.replace(",", "")) + n_out = int(n_out.replace(",", "")) + + y = np.random.randn(n_samples).astype(dtype) + + benchmark(downsampler.downsample, y, n_out=n_out, parallel=parallel) + + +@pytest.mark.benchmark(group="nanminmax") +@pytest.mark.parametrize("n_samples", NB_SAMPLES) +@pytest.mark.parametrize("n_out", N_OUT) +@pytest.mark.parametrize("dtype", Y_DTYPES) +@pytest.mark.parametrize("parallel", [False, True]) +def test_nanminmax_with_x(benchmark, n_samples, n_out, dtype, parallel): + """Test the MinMaxDownsampler.""" + downsampler = NaNMinMaxDownsampler() + n_samples = int(n_samples.replace(",", "")) + n_out = int(n_out.replace(",", "")) + + x = np.arange(n_samples) + y = np.random.randn(n_samples).astype(dtype) + + benchmark(downsampler.downsample, x, y, n_out=n_out, parallel=parallel) # --------------------------------------------------------------------------- # @@ -75,11 +103,10 @@ def test_m4_no_x(benchmark, n_samples, n_out, dtype, parallel): downsampler = M4Downsampler() n_samples = int(n_samples.replace(",", "")) n_out = int(n_out.replace(",", "")) - n_threads = _parallel_to_n_threads(parallel) y = np.random.randn(n_samples).astype(dtype) - benchmark(downsampler.downsample, y, n_out=n_out, n_threads=n_threads) + benchmark(downsampler.downsample, y, n_out=n_out, parallel=parallel) @pytest.mark.benchmark(group="m4") @@ -92,12 +119,44 @@ def test_m4_with_x(benchmark, n_samples, n_out, dtype, parallel): downsampler = M4Downsampler() n_samples = int(n_samples.replace(",", "")) n_out = int(n_out.replace(",", "")) - n_threads = _parallel_to_n_threads(parallel) x = np.arange(n_samples) y = np.random.randn(n_samples).astype(dtype) - benchmark(downsampler.downsample, x, y, n_out=n_out, n_threads=n_threads) + benchmark(downsampler.downsample, x, y, n_out=n_out, parallel=parallel) + + +@pytest.mark.benchmark(group="nanm4") +@pytest.mark.parametrize("n_samples", NB_SAMPLES) +@pytest.mark.parametrize("n_out", N_OUT) +@pytest.mark.parametrize("dtype", Y_DTYPES) +@pytest.mark.parametrize("parallel", [False, True]) +def test_nanm4_no_x(benchmark, n_samples, n_out, dtype, parallel): + """Test the M4Downsampler.""" + downsampler = NaNM4Downsampler() + n_samples = int(n_samples.replace(",", "")) + n_out = int(n_out.replace(",", "")) + + y = np.random.randn(n_samples).astype(dtype) + + benchmark(downsampler.downsample, y, n_out=n_out, parallel=parallel) + + +@pytest.mark.benchmark(group="nanm4") +@pytest.mark.parametrize("n_samples", NB_SAMPLES) +@pytest.mark.parametrize("n_out", N_OUT) +@pytest.mark.parametrize("dtype", Y_DTYPES) +@pytest.mark.parametrize("parallel", [False, True]) +def test_nanm4_with_x(benchmark, n_samples, n_out, dtype, parallel): + """Test the M4Downsampler.""" + downsampler = NaNM4Downsampler() + n_samples = int(n_samples.replace(",", "")) + n_out = int(n_out.replace(",", "")) + + x = np.arange(n_samples) + y = np.random.randn(n_samples).astype(dtype) + + benchmark(downsampler.downsample, x, y, n_out=n_out, parallel=parallel) # --------------------------------------------------------------------------- # @@ -115,11 +174,10 @@ def test_lttb_no_x(benchmark, n_samples, n_out, dtype, parallel): downsampler = LTTBDownsampler() n_samples = int(n_samples.replace(",", "")) n_out = int(n_out.replace(",", "")) - n_threads = _parallel_to_n_threads(parallel) y = np.random.randn(n_samples).astype(dtype) - benchmark(downsampler.downsample, y, n_out=n_out, n_threads=n_threads) + benchmark(downsampler.downsample, y, n_out=n_out, parallel=parallel) @pytest.mark.benchmark(group="lttb") @@ -132,12 +190,11 @@ def test_lttb_with_x(benchmark, n_samples, n_out, dtype, parallel): downsampler = LTTBDownsampler() n_samples = int(n_samples.replace(",", "")) n_out = int(n_out.replace(",", "")) - n_threads = _parallel_to_n_threads(parallel) x = np.arange(n_samples) y = np.random.randn(n_samples).astype(dtype) - benchmark(downsampler.downsample, x, y, n_out=n_out, n_threads=n_threads) + benchmark(downsampler.downsample, x, y, n_out=n_out, parallel=parallel) # --------------------------------------------------------------------------- # @@ -155,11 +212,10 @@ def test_minmaxlttb_no_x(benchmark, n_samples, n_out, dtype, parallel): downsampler = MinMaxLTTBDownsampler() n_samples = int(n_samples.replace(",", "")) n_out = int(n_out.replace(",", "")) - n_threads = _parallel_to_n_threads(parallel) y = np.random.randn(n_samples).astype(dtype) - benchmark(downsampler.downsample, y, n_out=n_out, n_threads=n_threads) + benchmark(downsampler.downsample, y, n_out=n_out, parallel=parallel) @pytest.mark.benchmark(group="minmaxlttb") @@ -172,12 +228,44 @@ def test_minmaxlttb_with_x(benchmark, n_samples, n_out, dtype, parallel): downsampler = MinMaxLTTBDownsampler() n_samples = int(n_samples.replace(",", "")) n_out = int(n_out.replace(",", "")) - n_threads = _parallel_to_n_threads(parallel) x = np.arange(n_samples) y = np.random.randn(n_samples).astype(dtype) - benchmark(downsampler.downsample, x, y, n_out=n_out, n_threads=n_threads) + benchmark(downsampler.downsample, x, y, n_out=n_out, parallel=parallel) + + +@pytest.mark.benchmark(group="nanminmaxlttb") +@pytest.mark.parametrize("n_samples", NB_SAMPLES) +@pytest.mark.parametrize("n_out", N_OUT) +@pytest.mark.parametrize("dtype", Y_DTYPES) +@pytest.mark.parametrize("parallel", [False, True]) +def test_nanminmaxlttb_no_x(benchmark, n_samples, n_out, dtype, parallel): + """Test the MinMaxLTTBDownsampler.""" + downsampler = NaNMinMaxLTTBDownsampler() + n_samples = int(n_samples.replace(",", "")) + n_out = int(n_out.replace(",", "")) + + y = np.random.randn(n_samples).astype(dtype) + + benchmark(downsampler.downsample, y, n_out=n_out, parallel=parallel) + + +@pytest.mark.benchmark(group="nanminmaxlttb") +@pytest.mark.parametrize("n_samples", NB_SAMPLES) +@pytest.mark.parametrize("n_out", N_OUT) +@pytest.mark.parametrize("dtype", Y_DTYPES) +@pytest.mark.parametrize("parallel", [False, True]) +def test_nanminmaxlttb_with_x(benchmark, n_samples, n_out, dtype, parallel): + """Test the MinMaxLTTBDownsampler.""" + downsampler = NaNMinMaxLTTBDownsampler() + n_samples = int(n_samples.replace(",", "")) + n_out = int(n_out.replace(",", "")) + + x = np.arange(n_samples) + y = np.random.randn(n_samples).astype(dtype) + + benchmark(downsampler.downsample, x, y, n_out=n_out, parallel=parallel) # --------------------------------------------------------------------------- # diff --git a/tests/test_algos_python_compliance.py b/tests/test_algos_python_compliance.py index d7995a1..8a64163 100644 --- a/tests/test_algos_python_compliance.py +++ b/tests/test_algos_python_compliance.py @@ -1,8 +1,20 @@ import numpy as np import pytest -from tsdownsample import LTTBDownsampler, M4Downsampler, MinMaxDownsampler -from tsdownsample._python.downsamplers import LTTB_py, M4_py, MinMax_py +from tsdownsample import ( + LTTBDownsampler, + M4Downsampler, + MinMaxDownsampler, + NaNM4Downsampler, + NaNMinMaxDownsampler, +) +from tsdownsample._python.downsamplers import ( + LTTB_py, + M4_py, + MinMax_py, + NaNM4_py, + NaNMinMax_py, +) @pytest.mark.parametrize( @@ -11,6 +23,9 @@ (MinMaxDownsampler(), MinMax_py()), (M4Downsampler(), M4_py()), (LTTBDownsampler(), LTTB_py()), + # Include NaN downsamplers + (NaNMinMaxDownsampler(), NaNMinMax_py()), + (NaNM4Downsampler(), NaNM4_py()), ], ) @pytest.mark.parametrize("n", [10_000, 10_032, 20_321, 23_489]) @@ -29,3 +44,26 @@ def test_resampler_accordance(rust_python_pair, n, n_out): rust_downsampler.downsample(x, y, n_out=n_out), python_downsampler.downsample(x, y, n_out=n_out), ) + + +@pytest.mark.parametrize( + "rust_python_pair", + [(NaNMinMaxDownsampler(), NaNMinMax_py()), (NaNM4Downsampler(), NaNM4_py())], +) +@pytest.mark.parametrize("n", [10_000, 10_032, 20_321, 23_489]) +@pytest.mark.parametrize("n_random_nans", [100, 200, 500, 2000, 5000]) +@pytest.mark.parametrize("n_out", [100, 200, 252]) +def test_nan_resampler_accordance(rust_python_pair, n, n_random_nans, n_out): + rust_downsampler, python_downsampler = rust_python_pair + x = np.arange(n) + y = np.random.randn(n) + y[np.random.choice(y.size, n_random_nans, replace=False)] = np.nan + # Without x passed to the rust downsampler + rust_result = rust_downsampler.downsample(y, n_out=n_out) + python_result = python_downsampler.downsample(x, y, n_out=n_out) + assert np.allclose(rust_result, python_result) + # With x passed to the rust downsampler + assert np.allclose( + rust_downsampler.downsample(x, y, n_out=n_out), + python_downsampler.downsample(x, y, n_out=n_out), + ) diff --git a/tests/test_config.py b/tests/test_config.py index 3ab5f41..f7632a7 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -22,3 +22,4 @@ rust_primitive_types_x = _core_rust_primitive_types rust_primitive_types_y = _core_rust_primitive_types + ["f16", "i8", "u8"] +rust_primitive_types_y_nan = ["f16", "f32", "f64"] diff --git a/tests/test_rust_mods.py b/tests/test_rust_mods.py index a6532d3..8a50ab8 100644 --- a/tests/test_rust_mods.py +++ b/tests/test_rust_mods.py @@ -1,4 +1,8 @@ -from test_config import rust_primitive_types_x, rust_primitive_types_y +from test_config import ( + rust_primitive_types_x, + rust_primitive_types_y, + rust_primitive_types_y_nan, +) import tsdownsample._rust._tsdownsample_rs as tsds_rs @@ -21,16 +25,37 @@ def _test_rust_mod_correctly_build(mod, sub_mods, has_x_impl: bool): assert hasattr(m, f"downsample_{tx}_{ty}") +def _test_rust_nan_mod_correctly_build(mod, sub_mods, has_x_impl: bool): + # without x + for sub_mod in sub_mods: + assert hasattr(mod, sub_mod) + m = getattr(mod, sub_mod) + for ty in rust_primitive_types_y_nan: + assert hasattr(m, f"downsample_nan_{ty}") + + # with x + if not has_x_impl: + return + for sub_mod in sub_mods: + assert hasattr(mod, sub_mod) + m = getattr(mod, sub_mod) + for tx in rust_primitive_types_x: + for ty in rust_primitive_types_y_nan: + assert hasattr(m, f"downsample_{tx}_{ty}") + + def test_minmax_rust_mod_correctly_build(): mod = tsds_rs.minmax sub_mods = ["sequential", "parallel"] - _test_rust_mod_correctly_build(mod, sub_mods, has_x_impl=False) + _test_rust_mod_correctly_build(mod, sub_mods, has_x_impl=True) + _test_rust_nan_mod_correctly_build(mod, sub_mods, has_x_impl=True) def test_m4_rust_mod_correctly_build(): mod = tsds_rs.m4 sub_mods = ["sequential", "parallel"] - _test_rust_mod_correctly_build(mod, sub_mods, has_x_impl=False) + _test_rust_mod_correctly_build(mod, sub_mods, has_x_impl=True) + _test_rust_nan_mod_correctly_build(mod, sub_mods, has_x_impl=True) def test_lttb_rust_mod_correctly_build(): @@ -43,3 +68,4 @@ def test_minmaxlttb_rust_mod_correctly_build(): mod = tsds_rs.minmaxlttb sub_mods = ["sequential", "parallel"] _test_rust_mod_correctly_build(mod, sub_mods, has_x_impl=True) + _test_rust_nan_mod_correctly_build(mod, sub_mods, has_x_impl=True) diff --git a/tests/test_tsdownsample.py b/tests/test_tsdownsample.py index ab75835..993faa6 100644 --- a/tests/test_tsdownsample.py +++ b/tests/test_tsdownsample.py @@ -10,8 +10,14 @@ M4Downsampler, MinMaxDownsampler, MinMaxLTTBDownsampler, + NaNM4Downsampler, + NaNMinMaxDownsampler, + NaNMinMaxLTTBDownsampler, +) +from tsdownsample.downsampling_interface import ( + AbstractDownsampler, + AbstractRustNaNDownsampler, ) -from tsdownsample.downsampling_interface import AbstractDownsampler # TODO: Improve tests # - compare implementations with existing plotly_resampler implementations @@ -24,19 +30,43 @@ MinMaxLTTBDownsampler(), ] +RUST_NAN_DOWNSAMPLERS = [ + NaNMinMaxDownsampler(), + NaNM4Downsampler(), + NaNMinMaxLTTBDownsampler(), +] + OTHER_DOWNSAMPLERS = [EveryNthDownsampler()] def generate_rust_downsamplers() -> Iterable[AbstractDownsampler]: - for downsampler in RUST_DOWNSAMPLERS: + for downsampler in RUST_DOWNSAMPLERS + RUST_NAN_DOWNSAMPLERS: + yield downsampler + + +def generate_rust_nan_downsamplers() -> Iterable[AbstractDownsampler]: + for downsampler in RUST_NAN_DOWNSAMPLERS: yield downsampler def generate_all_downsamplers() -> Iterable[AbstractDownsampler]: - for downsampler in RUST_DOWNSAMPLERS + OTHER_DOWNSAMPLERS: + for downsampler in RUST_DOWNSAMPLERS + RUST_NAN_DOWNSAMPLERS + OTHER_DOWNSAMPLERS: yield downsampler +def generate_datapoints(): + N_DATAPOINTS = 10_000 + return np.arange(N_DATAPOINTS) + + +def generate_nan_datapoints(): + N_DATAPOINTS = 10_000 + datapoints = np.arange(N_DATAPOINTS, dtype=np.float64) + datapoints[0] = np.nan + datapoints[9960] = np.nan + return datapoints + + @pytest.mark.parametrize("downsampler", generate_all_downsamplers()) def test_serialization_copy(downsampler: AbstractDownsampler): """Test serialization.""" @@ -45,7 +75,8 @@ def test_serialization_copy(downsampler: AbstractDownsampler): dc = copy(downsampler) ddc = deepcopy(downsampler) - arr = np.arange(10_000) + arr = generate_datapoints() + orig_downsampled = downsampler.downsample(arr, n_out=100) dc_downsampled = dc.downsample(arr, n_out=100) ddc_downsampled = ddc.downsample(arr, n_out=100) @@ -60,7 +91,7 @@ def test_serialization_pickle(downsampler: AbstractDownsampler): dc = pickle.loads(pickle.dumps(downsampler)) - arr = np.arange(10_000) + arr = generate_datapoints() orig_downsampled = downsampler.downsample(arr, n_out=100) dc_downsampled = dc.downsample(arr, n_out=100) assert np.all(orig_downsampled == dc_downsampled) @@ -69,12 +100,23 @@ def test_serialization_pickle(downsampler: AbstractDownsampler): @pytest.mark.parametrize("downsampler", generate_rust_downsamplers()) def test_rust_downsampler(downsampler: AbstractDownsampler): """Test the Rust downsamplers.""" - arr = np.arange(10_000) + arr = generate_datapoints() s_downsampled = downsampler.downsample(arr, n_out=100) assert s_downsampled[0] == 0 assert s_downsampled[-1] == len(arr) - 1 +@pytest.mark.parametrize("downsampler", generate_rust_nan_downsamplers()) +def test_rust_nan_downsampler(downsampler: AbstractRustNaNDownsampler): + """Test the Rust NaN downsamplers.""" + datapoints = generate_nan_datapoints() + s_downsampled = downsampler.downsample(datapoints, n_out=100) + print(s_downsampled) + assert s_downsampled[0] == 0 + assert s_downsampled[-2] == 9960 + assert s_downsampled[50] != np.nan + + def test_everynth_downsampler(): """Test EveryNth downsampler.""" arr = np.arange(10_000) @@ -89,7 +131,7 @@ def test_parallel_downsampling(downsampler: AbstractDownsampler): """Test parallel downsampling.""" arr = np.random.randn(10_000).astype(np.float32) s_downsampled = downsampler.downsample(arr, n_out=100) - s_downsampled_p = downsampler.downsample(arr, n_out=100, n_threads=2) + s_downsampled_p = downsampler.downsample(arr, n_out=100, parallel=True) assert np.all(s_downsampled == s_downsampled_p) @@ -99,7 +141,7 @@ def test_parallel_downsampling_with_x(downsampler: AbstractDownsampler): arr = np.random.randn(10_001).astype(np.float32) # 10_001 to test edge case idx = np.arange(len(arr)) s_downsampled = downsampler.downsample(idx, arr, n_out=100) - s_downsampled_p = downsampler.downsample(idx, arr, n_out=100, n_threads=2) + s_downsampled_p = downsampler.downsample(idx, arr, n_out=100, parallel=True) assert np.all(s_downsampled == s_downsampled_p) @@ -170,7 +212,7 @@ def test_downsampling_no_out_of_bounds_different_dtypes( for dtype in supported_dtypes_y: arr = arr_orig.astype(dtype) s_downsampled = downsampler.downsample(arr, n_out=76) - s_downsampled_p = downsampler.downsample(arr, n_out=76, n_threads=2) + s_downsampled_p = downsampler.downsample(arr, n_out=76, parallel=True) assert np.all(s_downsampled == s_downsampled_p) if dtype is not np.bool_: res += [s_downsampled] @@ -191,7 +233,7 @@ def test_downsampling_no_out_of_bounds_different_dtypes_with_x( for dtype_y in supported_dtypes_y: arr = arr_orig.astype(dtype_y) s_downsampled = downsampler.downsample(idx, arr, n_out=76) - s_downsampled_p = downsampler.downsample(idx, arr, n_out=76, n_threads=2) + s_downsampled_p = downsampler.downsample(idx, arr, n_out=76, parallel=True) assert np.all(s_downsampled == s_downsampled_p) if dtype_y is not np.bool_: res += [s_downsampled] @@ -248,23 +290,73 @@ def test_error_invalid_args(): arr = np.random.randint(0, 100, size=10_000) # No args with pytest.raises(ValueError) as e_msg: - MinMaxDownsampler().downsample(n_out=100, n_threads=2) + MinMaxDownsampler().downsample(n_out=100, parallel=True) assert "takes 1 or 2 positional arguments" in str(e_msg.value) # Too many args with pytest.raises(ValueError) as e_msg: - MinMaxDownsampler().downsample(arr, arr, arr, n_out=100, n_threads=2) + MinMaxDownsampler().downsample(arr, arr, arr, n_out=100, parallel=True) assert "takes 1 or 2 positional arguments" in str(e_msg.value) # Invalid y with pytest.raises(ValueError) as e_msg: - MinMaxDownsampler().downsample(arr.reshape(5, 2_000), n_out=100, n_threads=2) + MinMaxDownsampler().downsample(arr.reshape(5, 2_000), n_out=100, parallel=True) assert "y must be 1D" in str(e_msg.value) # Invalid x with pytest.raises(ValueError) as e_msg: MinMaxDownsampler().downsample( - arr.reshape(5, 2_000), arr, n_out=100, n_threads=2 + arr.reshape(5, 2_000), arr, n_out=100, parallel=True ) assert "x must be 1D" in str(e_msg.value) # Invalid x and y (different length) with pytest.raises(ValueError) as e_msg: - MinMaxDownsampler().downsample(arr, arr[:-1], n_out=100, n_threads=2) + MinMaxDownsampler().downsample(arr, arr[:-1], n_out=100, parallel=True) assert "x and y must have the same length" in str(e_msg.value) + + +@pytest.mark.parametrize("downsampler", generate_rust_downsamplers()) +def test_non_contiguous_array(downsampler: AbstractDownsampler): + """Test non contiguous array.""" + arr = np.random.randint(0, 100, size=10_000).astype(np.float32) + arr = arr[::2] + assert not arr.flags["C_CONTIGUOUS"] + with pytest.raises(ValueError) as e_msg: + downsampler.downsample(arr, n_out=100) + assert "must be contiguous" in str(e_msg.value) + + +def test_everynth_non_contiguous_array(): + """Test non contiguous array.""" + arr = np.random.randint(0, 100, size=10_000) + arr = arr[::2] + assert not arr.flags["C_CONTIGUOUS"] + downsampler = EveryNthDownsampler() + s_downsampled = downsampler.downsample(arr, n_out=100) + assert s_downsampled[0] == 0 + assert s_downsampled[-1] == 4950 + + +def test_nan_minmax_downsampler(): + """Test NaN downsamplers.""" + arr = np.random.randn(50_000) + arr[::5] = np.nan + s_downsampled = NaNMinMaxDownsampler().downsample(arr, n_out=100) + arr_downsampled = arr[s_downsampled] + assert np.all(np.isnan(arr_downsampled)) + + +def test_nan_m4_downsampler(): + """Test NaN downsamplers.""" + arr = np.random.randn(50_000) + arr[::5] = np.nan + s_downsampled = NaNM4Downsampler().downsample(arr, n_out=100) + arr_downsampled = arr[s_downsampled] + assert np.all(np.isnan(arr_downsampled[1::4])) # min is NaN + assert np.all(np.isnan(arr_downsampled[2::4])) # max is NaN + + +def test_nan_minmaxlttb_downsampler(): + """Test NaN downsamplers.""" + arr = np.random.randn(50_000) + arr[::5] = np.nan + s_downsampled = NaNMinMaxLTTBDownsampler().downsample(arr, n_out=100) + arr_downsampled = arr[s_downsampled] + assert np.all(np.isnan(arr_downsampled[1:-1])) # first and last are not NaN diff --git a/tsdownsample/__init__.py b/tsdownsample/__init__.py index 32188b0..20b3e6a 100644 --- a/tsdownsample/__init__.py +++ b/tsdownsample/__init__.py @@ -6,9 +6,12 @@ M4Downsampler, MinMaxDownsampler, MinMaxLTTBDownsampler, + NaNM4Downsampler, + NaNMinMaxDownsampler, + NaNMinMaxLTTBDownsampler, ) -__version__ = "0.1.2" +__version__ = "0.1.3" __author__ = "Jeroen Van Der Donckt" __all__ = [ @@ -17,4 +20,7 @@ "M4Downsampler", "LTTBDownsampler", "MinMaxLTTBDownsampler", + "NaNMinMaxDownsampler", + "NaNM4Downsampler", + "NaNMinMaxLTTBDownsampler", ] diff --git a/tsdownsample/_python/downsamplers.py b/tsdownsample/_python/downsamplers.py index 864a828..5343f9f 100644 --- a/tsdownsample/_python/downsamplers.py +++ b/tsdownsample/_python/downsamplers.py @@ -25,7 +25,7 @@ def _get_bin_idxs(x: np.ndarray, nb_bins: int) -> np.ndarray: bins = np.searchsorted(x, np.linspace(x[0], x[-1], nb_bins + 1), side="right") bins[0] = 0 bins[-1] = len(x) - return np.unique(bins) + return np.array(bins) class LTTB_py(AbstractDownsampler): @@ -144,11 +144,40 @@ def _downsample( if not len(y_slice): continue # calculate the argmin(slice) & argmax(slice) - rel_idxs.append(lower + y_slice.argmin()) - rel_idxs.append(lower + y_slice.argmax()) + rel_idxs.append(lower + np.nanargmin(y_slice)) + rel_idxs.append(lower + np.nanargmax(y_slice)) return np.unique(rel_idxs) +class NaNMinMax_py(AbstractDownsampler): + @staticmethod + def _check_valid_n_out(n_out: int): + assert n_out % 2 == 0, "n_out must be a multiple of 2" + + def _downsample( + self, x: Union[np.ndarray, None], y: np.ndarray, n_out: int, **kwargs + ) -> np.ndarray: + if x is None: + # Is fine for this implementation as this is only used for testing + x = np.arange(y.shape[0]) + + xdt = x.dtype + if np.issubdtype(xdt, np.datetime64) or np.issubdtype(xdt, np.timedelta64): + x = x.view(np.int64) + + bins = _get_bin_idxs(x, n_out // 2) + + rel_idxs = [] + for lower, upper in zip(bins, bins[1:]): + y_slice = y[lower:upper] + if not len(y_slice): + continue + # calculate the argmin(slice) & argmax(slice) + rel_idxs.append(lower + np.argmin(y_slice)) + rel_idxs.append(lower + np.argmax(y_slice)) + return np.array(sorted(rel_idxs)) + + class M4_py(AbstractDownsampler): """Aggregation method which selects the 4 M-s, i.e y-argmin, y-argmax, x-argmin, and x-argmax per bin. @@ -159,6 +188,41 @@ class M4_py(AbstractDownsampler): """ + @staticmethod + def _check_valid_n_out(n_out: int): + assert n_out % 4 == 0, "n_out must be a multiple of 4" + + def _downsample( + self, x: Union[np.ndarray, None], y: np.ndarray, n_out: int, **kwargs + ) -> np.ndarray: + """TODO complete docs""" + if x is None: + # Is fine for this implementation as this is only used for testing + x = np.arange(y.shape[0]) + + xdt = x.dtype + if np.issubdtype(xdt, np.datetime64) or np.issubdtype(xdt, np.timedelta64): + x = x.view(np.int64) + + bins = _get_bin_idxs(x, n_out // 4) + + rel_idxs = [] + for lower, upper in zip(bins, bins[1:]): + y_slice = y[lower:upper] + if not len(y_slice): + continue + + # calculate the min(idx), argmin(slice), argmax(slice), max(idx) + rel_idxs.append(lower) + rel_idxs.append(lower + np.nanargmin(y_slice)) + rel_idxs.append(lower + np.nanargmax(y_slice)) + rel_idxs.append(upper - 1) + + # NOTE: we do not use the np.unique so that all indices are retained + return np.array(sorted(rel_idxs)) + + +class NaNM4_py(AbstractDownsampler): @staticmethod def _check_valid_n_out(n_out: int): assert n_out % 4 == 0, "n_out must be a multiple of 4" diff --git a/tsdownsample/downsamplers.py b/tsdownsample/downsamplers.py index 5a8f0e8..93519b7 100644 --- a/tsdownsample/downsamplers.py +++ b/tsdownsample/downsamplers.py @@ -6,10 +6,38 @@ # ------------------ Rust Downsamplers ------------------ from tsdownsample._rust import _tsdownsample_rs # type: ignore[attr-defined] -from .downsampling_interface import AbstractDownsampler, AbstractRustDownsampler +from .downsampling_interface import ( + AbstractDownsampler, + AbstractRustDownsampler, + AbstractRustNaNDownsampler, +) class MinMaxDownsampler(AbstractRustDownsampler): + """Downsampler that uses the MinMax algorithm. If the y data contains NaNs, these + ignored (i.e. the NaNs are not taken into account when selecting data points). + + For each bin, the indices of the minimum and maximum values are selected. + """ + + @property + def rust_mod(self): + return _tsdownsample_rs.minmax + + @staticmethod + def _check_valid_n_out(n_out: int): + AbstractRustDownsampler._check_valid_n_out(n_out) + if n_out % 2 != 0: + raise ValueError("n_out must be even") + + +class NaNMinMaxDownsampler(AbstractRustNaNDownsampler): + """Downsampler that uses the MinMax algorithm. If the y data contains NaNs, the + indices of these NaNs are returned. + + For each bin, the indices of the minimum and maximum values are selected. + """ + @property def rust_mod(self): return _tsdownsample_rs.minmax @@ -22,6 +50,32 @@ def _check_valid_n_out(n_out: int): class M4Downsampler(AbstractRustDownsampler): + """Downsampler that uses the M4 algorithm. If the y data contains NaNs, these are + ignored (i.e. the NaNs are not taken into account when selecting data points). + + For each bin, the indices of the first, last, minimum and maximum values are + selected. + """ + + @property + def rust_mod(self): + return _tsdownsample_rs.m4 + + @staticmethod + def _check_valid_n_out(n_out: int): + AbstractRustDownsampler._check_valid_n_out(n_out) + if n_out % 4 != 0: + raise ValueError("n_out must be a multiple of 4") + + +class NaNM4Downsampler(AbstractRustNaNDownsampler): + """Downsampler that uses the M4 algorithm. If the y data contains NaNs, the indices + of these NaNs are returned. + + For each bin, the indices of the first, last, minimum and maximum values are + selected. + """ + @property def rust_mod(self): return _tsdownsample_rs.m4 @@ -34,22 +88,51 @@ def _check_valid_n_out(n_out: int): class LTTBDownsampler(AbstractRustDownsampler): + """Downsampler that uses the LTTB algorithm.""" + @property def rust_mod(self): return _tsdownsample_rs.lttb class MinMaxLTTBDownsampler(AbstractRustDownsampler): + """Downsampler that uses the MinMaxLTTB algorithm. If the y data contains NaNs, + these are ignored (i.e. the NaNs are not taken into account when selecting data + points). + + MinMaxLTTB paper: https://arxiv.org/abs/2305.00332 + """ + + @property + def rust_mod(self): + return _tsdownsample_rs.minmaxlttb + + def downsample( + self, *args, n_out: int, minmax_ratio: int = 4, parallel: bool = False, **_ + ): + assert minmax_ratio > 0, "minmax_ratio must be greater than 0" + return super().downsample( + *args, n_out=n_out, parallel=parallel, ratio=minmax_ratio + ) + + +class NaNMinMaxLTTBDownsampler(AbstractRustNaNDownsampler): + """Downsampler that uses the MinMaxLTTB algorithm. If the y data contains NaNs, the + indices of these NaNs are returned. + + MinMaxLTTB paper: https://arxiv.org/abs/2305.00332 + """ + @property def rust_mod(self): return _tsdownsample_rs.minmaxlttb def downsample( - self, *args, n_out: int, minmax_ratio: int = 30, n_threads: int = 1, **_ + self, *args, n_out: int, minmax_ratio: int = 4, parallel: bool = False, **_ ): assert minmax_ratio > 0, "minmax_ratio must be greater than 0" return super().downsample( - *args, n_out=n_out, n_threads=n_threads, ratio=minmax_ratio + *args, n_out=n_out, parallel=parallel, ratio=minmax_ratio ) @@ -57,6 +140,11 @@ def downsample( class EveryNthDownsampler(AbstractDownsampler): + """Downsampler that selects every nth data point""" + + def __init__(self, **kwargs): + super().__init__(check_contiguous=False, **kwargs) + def _downsample( self, x: Union[np.ndarray, None], y: np.ndarray, n_out: int, **_ ) -> np.ndarray: diff --git a/tsdownsample/downsampling_interface.py b/tsdownsample/downsampling_interface.py index de20d46..9c05c54 100644 --- a/tsdownsample/downsampling_interface.py +++ b/tsdownsample/downsampling_interface.py @@ -17,12 +17,26 @@ class AbstractDownsampler(ABC): def __init__( self, + check_contiguous: bool = True, x_dtype_regex_list: Optional[List[str]] = None, y_dtype_regex_list: Optional[List[str]] = None, ): + self.check_contiguous = check_contiguous self.x_dtype_regex_list = x_dtype_regex_list self.y_dtype_regex_list = y_dtype_regex_list + def _check_contiguous(self, arr: np.ndarray, y: bool = True): + # necessary for rust downsamplers as they don't support non-contiguous arrays + # (we call .as_slice().unwrap() on the array) in the lib.rs file + # which will panic if the array is not contiguous + if not self.check_contiguous: + return + + if arr.flags["C_CONTIGUOUS"]: + return + + raise ValueError(f"{'y' if y else 'x'} array must be contiguous.") + def _supports_dtype(self, arr: np.ndarray, y: bool = True): dtype_regex_list = self.y_dtype_regex_list if y else self.x_dtype_regex_list # base case @@ -66,6 +80,7 @@ def _check_valid_downsample_args( raise ValueError("x must be 1D array") if len(x) != len(y): raise ValueError("x and y must have the same length") + return x, y @staticmethod @@ -113,8 +128,10 @@ def downsample(self, *args, n_out: int, **kwargs): # x and y are optional self._check_valid_n_out(n_out) x, y = self._check_valid_downsample_args(*args) self._supports_dtype(y, y=True) + self._check_contiguous(y, y=True) if x is not None: self._supports_dtype(x, y=False) + self._check_contiguous(x, y=False) return self._downsample(x, y, n_out, **kwargs) @@ -144,7 +161,12 @@ class AbstractRustDownsampler(AbstractDownsampler, ABC): """RustDownsampler interface-class, subclassed by concrete downsamplers.""" def __init__(self): - super().__init__(_rust_dtypes, _y_rust_dtypes) # same for x and y + super().__init__(True, _rust_dtypes, _y_rust_dtypes) # same for x and y + + @property + def _downsample_func_prefix(self) -> str: + """The prefix of the downsample functions in the rust module.""" + return DOWNSAMPLE_F @property def rust_mod(self) -> ModuleType: @@ -180,12 +202,40 @@ def mod_multi_core(self) -> Union[ModuleType, None]: return None # no parallel compiled module available @staticmethod + def _view_x(x: np.ndarray) -> np.ndarray: + """View the x-data as different dtype (if necessary).""" + if np.issubdtype(x.dtype, np.datetime64): + # datetime64 is viewed as int64 + return x.view(dtype=np.int64) + elif np.issubdtype(x.dtype, np.timedelta64): + # timedelta64 is viewed as int64 + return x.view(dtype=np.int64) + return x + + @staticmethod + def _view_y(y: np.ndarray) -> np.ndarray: + """View the y-data as different dtype (if necessary).""" + if y.dtype == "bool": + # bool is viewed as int8 + return y.view(dtype=np.int8) + elif np.issubdtype(y.dtype, np.datetime64): + # datetime64 is viewed as int64 + return y.view(dtype=np.int64) + elif np.issubdtype(y.dtype, np.timedelta64): + # timedelta64 is viewed as int64 + return y.view(dtype=np.int64) + return y + def _switch_mod_with_y( - y_dtype: np.dtype, mod: ModuleType, downsample_func: str = DOWNSAMPLE_F + self, y_dtype: np.dtype, mod: ModuleType, downsample_func: Optional[str] = None ) -> Callable: - """The x-data is not considered in the downsampling + """Select the appropriate function from the rust module for the y-data. - Assumes equal binning. + Assumes equal binning (when no data for x is passed -> only this function is + executed). + Equidistant binning is utilized when a `downsample_func` is passed from the + `_switch_mod_with_x_and_y` method (since the x-data is considered in the + downsampling). Parameters ---------- @@ -195,7 +245,11 @@ def _switch_mod_with_y( The module to select the appropriate function from downsample_func : str, optional The name of the function to use, by default DOWNSAMPLE_FUNC. + This argument is passed from the `_switch_mod_with_x_and_y` method when + the x-data is considered in the downsampling. """ + if downsample_func is None: + downsample_func = self._downsample_func_prefix # FLOATS if np.issubdtype(y_dtype, np.floating): if y_dtype == np.float16: @@ -229,9 +283,12 @@ def _switch_mod_with_y( # BOOLS -> int8 (bool is viewed as int8) raise ValueError(f"Unsupported data type (for y): {y_dtype}") - @staticmethod def _switch_mod_with_x_and_y( - x_dtype: np.dtype, y_dtype: np.dtype, mod: ModuleType + self, # necessary to access the class its _switch_mod_with_y method + x_dtype: np.dtype, + y_dtype: np.dtype, + mod: ModuleType, + downsample_func: Optional[str] = None, ) -> Callable: """The x-data is considered in the downsampling @@ -245,49 +302,35 @@ def _switch_mod_with_x_and_y( The dtype of the y-data mod : ModuleType The module to select the appropriate function from + downsample_func : str, optional + The name of the function to use, by default DOWNSAMPLE_FUNC. """ + if downsample_func is None: + downsample_func = self._downsample_func_prefix # FLOATS if np.issubdtype(x_dtype, np.floating): if x_dtype == np.float16: - return AbstractRustDownsampler._switch_mod_with_y( - y_dtype, mod, f"{DOWNSAMPLE_F}_f16" - ) + return self._switch_mod_with_y(y_dtype, mod, f"{downsample_func}_f16") elif x_dtype == np.float32: - return AbstractRustDownsampler._switch_mod_with_y( - y_dtype, mod, f"{DOWNSAMPLE_F}_f32" - ) + return self._switch_mod_with_y(y_dtype, mod, f"{downsample_func}_f32") elif x_dtype == np.float64: - return AbstractRustDownsampler._switch_mod_with_y( - y_dtype, mod, f"{DOWNSAMPLE_F}_f64" - ) + return self._switch_mod_with_y(y_dtype, mod, f"{downsample_func}_f64") # UINTS elif np.issubdtype(x_dtype, np.unsignedinteger): if x_dtype == np.uint16: - return AbstractRustDownsampler._switch_mod_with_y( - y_dtype, mod, f"{DOWNSAMPLE_F}_u16" - ) + return self._switch_mod_with_y(y_dtype, mod, f"{downsample_func}_u16") elif x_dtype == np.uint32: - return AbstractRustDownsampler._switch_mod_with_y( - y_dtype, mod, f"{DOWNSAMPLE_F}_u32" - ) + return self._switch_mod_with_y(y_dtype, mod, f"{downsample_func}_u32") elif x_dtype == np.uint64: - return AbstractRustDownsampler._switch_mod_with_y( - y_dtype, mod, f"{DOWNSAMPLE_F}_u64" - ) + return self._switch_mod_with_y(y_dtype, mod, f"{downsample_func}_u64") # INTS (need to be last because uint is subdtype of int) elif np.issubdtype(x_dtype, np.integer): if x_dtype == np.int16: - return AbstractRustDownsampler._switch_mod_with_y( - y_dtype, mod, f"{DOWNSAMPLE_F}_i16" - ) + return self._switch_mod_with_y(y_dtype, mod, f"{downsample_func}_i16") elif x_dtype == np.int32: - return AbstractRustDownsampler._switch_mod_with_y( - y_dtype, mod, f"{DOWNSAMPLE_F}_i32" - ) + return self._switch_mod_with_y(y_dtype, mod, f"{downsample_func}_i32") elif x_dtype == np.int64: - return AbstractRustDownsampler._switch_mod_with_y( - y_dtype, mod, f"{DOWNSAMPLE_F}_i64" - ) + return self._switch_mod_with_y(y_dtype, mod, f"{downsample_func}_i64") # DATETIME -> i64 (datetime64 is viewed as int64) # TIMEDELTA -> i64 (timedelta64 is viewed as int64) raise ValueError(f"Unsupported data type (for x): {x_dtype}") @@ -297,13 +340,11 @@ def _downsample( x: Union[np.ndarray, None], y: np.ndarray, n_out: int, - n_threads: int = 1, + parallel: bool = False, **kwargs, ) -> np.ndarray: """Downsample the data in x and y.""" mod = self.mod_single_core - is_multi_core = False - parallel = n_threads > 1 if parallel: if self.mod_multi_core is None: name = self.__class__.__name__ @@ -313,42 +354,26 @@ def _downsample( ) else: mod = self.mod_multi_core - is_multi_core = True ## Viewing the y-data as different dtype (if necessary) - if y.dtype == "bool": - # bool is viewed as int8 - y = y.view(dtype=np.int8) - elif np.issubdtype(y.dtype, np.datetime64): - # datetime64 is viewed as int64 - y = y.view(dtype=np.int64) - elif np.issubdtype(y.dtype, np.timedelta64): - # timedelta64 is viewed as int64 - y = y.view(dtype=np.int64) + y = self._view_y(y) ## Viewing the x-data as different dtype (if necessary) if x is None: downsample_f = self._switch_mod_with_y(y.dtype, mod) - if is_multi_core: - return downsample_f(y, n_out, n_threads=n_threads, **kwargs) - else: - return downsample_f(y, n_out, **kwargs) - elif np.issubdtype(x.dtype, np.datetime64): - # datetime64 is viewed as int64 - x = x.view(dtype=np.int64) - elif np.issubdtype(x.dtype, np.timedelta64): - # timedelta64 is viewed as int64 - x = x.view(dtype=np.int64) + return downsample_f(y, n_out, **kwargs) + x = self._view_x(x) ## Getting the appropriate downsample function downsample_f = self._switch_mod_with_x_and_y(x.dtype, y.dtype, mod) - if is_multi_core: - return downsample_f(x, y, n_out, n_threads=n_threads, **kwargs) - else: - return downsample_f(x, y, n_out, **kwargs) + return downsample_f(x, y, n_out, **kwargs) - def downsample( - self, *args, n_out: int, n_threads: int = 1, **kwargs # x and y are optional - ): - """Downsample the data in x and y.""" - return super().downsample(*args, n_out=n_out, n_threads=n_threads, **kwargs) + def downsample(self, *args, n_out: int, parallel: bool = False, **kwargs): + """Downsample the data in x and y. + + The x and y arguments are positional-only arguments. If only one argument is + passed, it is considered to be the y-data. If two arguments are passed, the + first argument is considered to be the x-data and the second argument is + considered to be the y-data. + """ + return super().downsample(*args, n_out=n_out, parallel=parallel, **kwargs) def __deepcopy__(self, memo): """Deepcopy the object.""" @@ -362,3 +387,46 @@ def __deepcopy__(self, memo): else: setattr(result, k, deepcopy(v, memo)) return result + + +NAN_DOWNSAMPLE_F = "downsample_nan" + + +class AbstractRustNaNDownsampler(AbstractRustDownsampler, ABC): + """RustNaNDownsampler interface-class, subclassed by concrete downsamplers.""" + + @property + def _downsample_func_prefix(self) -> str: + """The prefix of the downsample functions in the rust module.""" + return NAN_DOWNSAMPLE_F + + def _switch_mod_with_y( + self, y_dtype: np.dtype, mod: ModuleType, downsample_func: Optional[str] = None + ) -> Callable: + """Select the appropriate function from the rust module for the y-data. + + Assumes equal binning (when no data for x is passed -> only this function is + executed). + Equidistant binning is utilized when a `downsample_func` is passed from the + `_switch_mod_with_x_and_y` method (since the x-data is considered in the + downsampling). + + Parameters + ---------- + y_dtype : np.dtype + The dtype of the y-data + mod : ModuleType + The module to select the appropriate function from + downsample_func : str, optional + The name of the function to use, by default NAN_DOWNSAMPLE_F. + This argument is passed from the `_switch_mod_with_x_and_y` method when + the x-data is considered in the downsampling. + """ + if downsample_func is None: + downsample_func = self._downsample_func_prefix + if not np.issubdtype(y_dtype, np.floating): + # When y is not a float, we need to remove the _nan suffix to use the + # regular downsample function as the _nan suffix is only used for floats. + # (Note that NaNs only exist for floats) + downsample_func = downsample_func.replace("_nan", "") + return super()._switch_mod_with_y(y_dtype, mod, downsample_func)