From e5e48663b2da64a3871700dfdbe2e4ef5c5c775d Mon Sep 17 00:00:00 2001 From: Jeroen Van Der Donckt <18898740+jvdd@users.noreply.github.com> Date: Mon, 25 Dec 2023 00:16:53 +0100 Subject: [PATCH 1/5] :see_no_evil: check for contiguous array (#64) * :see_no_evil: check for contiguous array * :christmas_tree: fix mypy warning --- tests/test_tsdownsample.py | 22 ++++++++++++++++++++++ tsdownsample/downsamplers.py | 3 +++ tsdownsample/downsampling_interface.py | 19 ++++++++++++++++++- 3 files changed, 43 insertions(+), 1 deletion(-) diff --git a/tests/test_tsdownsample.py b/tests/test_tsdownsample.py index ab75835..961a031 100644 --- a/tests/test_tsdownsample.py +++ b/tests/test_tsdownsample.py @@ -268,3 +268,25 @@ def test_error_invalid_args(): with pytest.raises(ValueError) as e_msg: MinMaxDownsampler().downsample(arr, arr[:-1], n_out=100, n_threads=2) assert "x and y must have the same length" in str(e_msg.value) + + +@pytest.mark.parametrize("downsampler", generate_rust_downsamplers()) +def test_non_contiguous_array(downsampler: AbstractDownsampler): + """Test non contiguous array.""" + arr = np.random.randint(0, 100, size=10_000) + arr = arr[::2] + assert not arr.flags["C_CONTIGUOUS"] + with pytest.raises(ValueError) as e_msg: + downsampler.downsample(arr, n_out=100) + assert "must be contiguous" in str(e_msg.value) + + +def test_everynth_non_contiguous_array(): + """Test non contiguous array.""" + arr = np.random.randint(0, 100, size=10_000) + arr = arr[::2] + assert not arr.flags["C_CONTIGUOUS"] + downsampler = EveryNthDownsampler() + s_downsampled = downsampler.downsample(arr, n_out=100) + assert s_downsampled[0] == 0 + assert s_downsampled[-1] == 4950 diff --git a/tsdownsample/downsamplers.py b/tsdownsample/downsamplers.py index 85c933b..53fb4b1 100644 --- a/tsdownsample/downsamplers.py +++ b/tsdownsample/downsamplers.py @@ -57,6 +57,9 @@ def downsample( class EveryNthDownsampler(AbstractDownsampler): + def __init__(self, **kwargs): + super().__init__(check_contiguous=False, **kwargs) + def _downsample( self, x: Union[np.ndarray, None], y: np.ndarray, n_out: int, **_ ) -> np.ndarray: diff --git a/tsdownsample/downsampling_interface.py b/tsdownsample/downsampling_interface.py index de20d46..3be2578 100644 --- a/tsdownsample/downsampling_interface.py +++ b/tsdownsample/downsampling_interface.py @@ -17,12 +17,26 @@ class AbstractDownsampler(ABC): def __init__( self, + check_contiguous: bool = True, x_dtype_regex_list: Optional[List[str]] = None, y_dtype_regex_list: Optional[List[str]] = None, ): + self.check_contiguous = check_contiguous self.x_dtype_regex_list = x_dtype_regex_list self.y_dtype_regex_list = y_dtype_regex_list + def _check_contiguous(self, arr: np.ndarray, y: bool = True): + # necessary for rust downsamplers as they don't support non-contiguous arrays + # (we call .as_slice().unwrap() on the array) in the lib.rs file + # which will panic if the array is not contiguous + if not self.check_contiguous: + return + + if arr.flags["C_CONTIGUOUS"]: + return + + raise ValueError(f"{'y' if y else 'x'} array must be contiguous.") + def _supports_dtype(self, arr: np.ndarray, y: bool = True): dtype_regex_list = self.y_dtype_regex_list if y else self.x_dtype_regex_list # base case @@ -66,6 +80,7 @@ def _check_valid_downsample_args( raise ValueError("x must be 1D array") if len(x) != len(y): raise ValueError("x and y must have the same length") + return x, y @staticmethod @@ -113,8 +128,10 @@ def downsample(self, *args, n_out: int, **kwargs): # x and y are optional self._check_valid_n_out(n_out) x, y = self._check_valid_downsample_args(*args) self._supports_dtype(y, y=True) + self._check_contiguous(y, y=True) if x is not None: self._supports_dtype(x, y=False) + self._check_contiguous(x, y=False) return self._downsample(x, y, n_out, **kwargs) @@ -144,7 +161,7 @@ class AbstractRustDownsampler(AbstractDownsampler, ABC): """RustDownsampler interface-class, subclassed by concrete downsamplers.""" def __init__(self): - super().__init__(_rust_dtypes, _y_rust_dtypes) # same for x and y + super().__init__(True, _rust_dtypes, _y_rust_dtypes) # same for x and y @property def rust_mod(self) -> ModuleType: From 4086818090831fae83b7ac684eabd47541334864 Mon Sep 17 00:00:00 2001 From: Jeroen Van Der Donckt <18898740+jvdd@users.noreply.github.com> Date: Tue, 26 Dec 2023 16:46:13 +0100 Subject: [PATCH 2/5] :pray: (#65) --- .github/workflows/ci-tsdownsample.yml | 2 +- pyproject.toml | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci-tsdownsample.yml b/.github/workflows/ci-tsdownsample.yml index 8261300..d243561 100644 --- a/.github/workflows/ci-tsdownsample.yml +++ b/.github/workflows/ci-tsdownsample.yml @@ -46,7 +46,7 @@ jobs: matrix: os: ['windows-latest', 'macOS-latest', 'ubuntu-latest'] rust: ['nightly'] # ['stable', 'beta'] - python-version: ['3.7', '3.8', '3.9', '3.10', '3.11'] + python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', '3.12'] env: PYTHON: ${{ matrix.python-version }} diff --git a/pyproject.toml b/pyproject.toml index 8583728..0dddb37 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ classifiers = [ 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: 3.11', + 'Programming Language :: Python :: 3.12', 'Operating System :: POSIX', 'Operating System :: MacOS :: MacOS X', 'Operating System :: Microsoft :: Windows' From 7f205739b18d256adfd8f0a2cce4e9033021f0c2 Mon Sep 17 00:00:00 2001 From: Jeroen Van Der Donckt <18898740+jvdd@users.noreply.github.com> Date: Wed, 3 Jan 2024 13:07:28 +0100 Subject: [PATCH 3/5] :rocket: pre-release (#66) --- .github/workflows/ci-tsdownsample.yml | 2 +- pyproject.toml | 2 +- tsdownsample/__init__.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci-tsdownsample.yml b/.github/workflows/ci-tsdownsample.yml index d243561..9a13c06 100644 --- a/.github/workflows/ci-tsdownsample.yml +++ b/.github/workflows/ci-tsdownsample.yml @@ -155,7 +155,7 @@ jobs: target: ${{ matrix.target }} manylinux: ${{ matrix.manylinux || 'auto' }} container: ${{ matrix.container }} - args: --release --out dist --interpreter ${{ matrix.interpreter || '3.7 3.8 3.9 3.10 3.11' }} + args: --release --out dist --interpreter ${{ matrix.interpreter || '3.7 3.8 3.9 3.10 3.11 3.12' }} - run: ${{ matrix.ls || 'ls -lh' }} dist/ diff --git a/pyproject.toml b/pyproject.toml index 0dddb37..5941fcb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "maturin" [project] name = "tsdownsample" description = "Time series downsampling in rust" -version = "0.1.2" +version = "0.1.3rc1" requires-python = ">=3.7" dependencies = ["numpy"] authors = [{name = "Jeroen Van Der Donckt"}] diff --git a/tsdownsample/__init__.py b/tsdownsample/__init__.py index 32188b0..4a5a6d6 100644 --- a/tsdownsample/__init__.py +++ b/tsdownsample/__init__.py @@ -8,7 +8,7 @@ MinMaxLTTBDownsampler, ) -__version__ = "0.1.2" +__version__ = "0.1.3rc1" __author__ = "Jeroen Van Der Donckt" __all__ = [ From ec24796a291947a9a85881c44ae4b3d56a37d4d7 Mon Sep 17 00:00:00 2001 From: Jeroen Van Der Donckt <18898740+jvdd@users.noreply.github.com> Date: Mon, 15 Jan 2024 12:47:39 +0100 Subject: [PATCH 4/5] Fix multithreading (#68) * :see_no_evil: remove num_threads argument * :broom: remove n_threads in rust functions * :tada: finalize parallel implementation * :broom: remove n_threads from rust benchmarks --- README.md | 11 +- downsample_rs/Cargo.toml | 6 +- downsample_rs/benches/bench_m4.rs | 19 +-- downsample_rs/benches/bench_minmax.rs | 15 +-- downsample_rs/benches/bench_minmaxlttb.rs | 6 - downsample_rs/dev_utils/src/utils.rs | 8 -- downsample_rs/src/lib.rs | 18 +++ downsample_rs/src/m4.rs | 136 +++++++++------------- downsample_rs/src/minmax.rs | 126 ++++++++------------ downsample_rs/src/minmaxlttb.rs | 118 +++++-------------- downsample_rs/src/searchsorted.rs | 31 +++-- pyproject.toml | 2 +- src/lib.rs | 130 +-------------------- tests/benchmarks/test_downsamplers.py | 30 ++--- tests/test_tsdownsample.py | 18 +-- tsdownsample/__init__.py | 2 +- tsdownsample/downsamplers.py | 4 +- tsdownsample/downsampling_interface.py | 29 ++--- 18 files changed, 216 insertions(+), 493 deletions(-) diff --git a/README.md b/README.md index 2bc8a2e..3a975b7 100644 --- a/README.md +++ b/README.md @@ -87,7 +87,8 @@ downsample([x], y, n_out, **kwargs) -> ndarray[uint64] - `x` and `y` are both positional arguments - `n_out` is a mandatory keyword argument that defines the number of output values* - `**kwargs` are optional keyword arguments *(see [table below](#downsampling-algorithms-📈))*: - - `n_threads`: how many threads to use for multi-threading (default `1`, so no multi-threading) + - `parallel`: whether to use multi-threading (default: `False`) + ❗ The max number of threads can be configured with the `TSDOWNSAMPLE_MAX_THREADS` ENV var (e.g. `os.environ["TSDOWNSAMPLE_MAX_THREADS"] = "4"`) - ... **Returns**: a `ndarray[uint64]` of indices that can be used to index the original data. @@ -99,10 +100,10 @@ The following downsampling algorithms (classes) are implemented: | Downsampler | Description | `**kwargs` | | ---:| --- |--- | -| `MinMaxDownsampler` | selects the **min and max** value in each bin | `n_threads` | -| `M4Downsampler` | selects the [**min, max, first and last**](https://dl.acm.org/doi/pdf/10.14778/2732951.2732953) value in each bin | `n_threads` | -| `LTTBDownsampler` | performs the [**Largest Triangle Three Buckets**](https://skemman.is/bitstream/1946/15343/3/SS_MSthesis.pdf) algorithm | `n_threads` | -| `MinMaxLTTBDownsampler` | (*new two-step algorithm 🎉*) first selects `n_out` * `minmax_ratio` **min and max** values, then further reduces these to `n_out` values using the **Largest Triangle Three Buckets** algorithm | `n_threads`, `minmax_ratio`* | +| `MinMaxDownsampler` | selects the **min and max** value in each bin | `parallel` | +| `M4Downsampler` | selects the [**min, max, first and last**](https://dl.acm.org/doi/pdf/10.14778/2732951.2732953) value in each bin | `parallel` | +| `LTTBDownsampler` | performs the [**Largest Triangle Three Buckets**](https://skemman.is/bitstream/1946/15343/3/SS_MSthesis.pdf) algorithm | `parallel` | +| `MinMaxLTTBDownsampler` | (*new two-step algorithm 🎉*) first selects `n_out` * `minmax_ratio` **min and max** values, then further reduces these to `n_out` values using the **Largest Triangle Three Buckets** algorithm | `parallel`, `minmax_ratio`* | *Default value for `minmax_ratio` is 4, which is empirically proven to be a good default. More details here: https://arxiv.org/abs/2305.00332 diff --git a/downsample_rs/Cargo.toml b/downsample_rs/Cargo.toml index 9c52eb8..ca0ee16 100644 --- a/downsample_rs/Cargo.toml +++ b/downsample_rs/Cargo.toml @@ -9,15 +9,15 @@ license = "MIT" [dependencies] # TODO: perhaps use polars? argminmax = { version = "0.6.1", features = ["half"] } -# argminmax = { path = "../../argminmax" , features = ["half", "ndarray"] } half = { version = "2.1", default-features = false , features=["num-traits"], optional = true} num-traits = { version = "0.2.15", default-features = false } -rayon = { version = "1.6.0", default-features = false } +once_cell = "1" +rayon = { version = "1.8.0", default-features = false } [dev-dependencies] rstest = { version = "0.18.1", default-features = false } rstest_reuse = { version = "0.6", default-features = false } -criterion = "0.4.0" +criterion = "0.5.1" dev_utils = { path = "dev_utils" } [[bench]] diff --git a/downsample_rs/benches/bench_m4.rs b/downsample_rs/benches/bench_m4.rs index c40df6a..2aa1732 100644 --- a/downsample_rs/benches/bench_m4.rs +++ b/downsample_rs/benches/bench_m4.rs @@ -14,15 +14,8 @@ fn m4_f32_random_array_long_single_core(c: &mut Criterion) { fn m4_f32_random_array_long_multi_core(c: &mut Criterion) { let n = config::ARRAY_LENGTH_LONG; let data = utils::get_random_array::(n, f32::MIN, f32::MAX); - let all_threads: usize = utils::get_all_threads(); c.bench_function("m4_p_f32", |b| { - b.iter(|| { - m4_mod::m4_without_x_parallel( - black_box(data.as_slice()), - black_box(2_000), - black_box(all_threads), - ) - }) + b.iter(|| m4_mod::m4_without_x_parallel(black_box(data.as_slice()), black_box(2_000))) }); } @@ -48,15 +41,8 @@ fn m4_f32_random_array_50M_multi_core(c: &mut Criterion) { let n = 50_000_000; let data = utils::get_random_array::(n, f32::MIN, f32::MAX); let x = (0..n).map(|i| i as i32).collect::>(); - let all_threads: usize = utils::get_all_threads(); c.bench_function("m4_p_50M_f32", |b| { - b.iter(|| { - m4_mod::m4_without_x_parallel( - black_box(data.as_slice()), - black_box(2_000), - black_box(all_threads), - ) - }) + b.iter(|| m4_mod::m4_without_x_parallel(black_box(data.as_slice()), black_box(2_000))) }); c.bench_function("m4_x_p_50M_f32", |b| { b.iter(|| { @@ -64,7 +50,6 @@ fn m4_f32_random_array_50M_multi_core(c: &mut Criterion) { black_box(x.as_slice()), black_box(data.as_slice()), black_box(2_000), - black_box(all_threads), ) }) }); diff --git a/downsample_rs/benches/bench_minmax.rs b/downsample_rs/benches/bench_minmax.rs index 599cdbd..97bf677 100644 --- a/downsample_rs/benches/bench_minmax.rs +++ b/downsample_rs/benches/bench_minmax.rs @@ -14,14 +14,9 @@ fn minmax_f32_random_array_long_single_core(c: &mut Criterion) { fn minmax_f32_random_array_long_multi_core(c: &mut Criterion) { let n = config::ARRAY_LENGTH_LONG; let data = utils::get_random_array::(n, f32::MIN, f32::MAX); - let all_threads: usize = utils::get_all_threads(); c.bench_function("minmax_p_f32", |b| { b.iter(|| { - minmax_mod::min_max_without_x_parallel( - black_box(data.as_slice()), - black_box(2_000), - black_box(all_threads), - ) + minmax_mod::min_max_without_x_parallel(black_box(data.as_slice()), black_box(2_000)) }) }); } @@ -55,14 +50,9 @@ fn minmax_f32_random_array_50M_long_multi_core(c: &mut Criterion) { let n = 50_000_000; let data = utils::get_random_array::(n, f32::MIN, f32::MAX); let x = (0..n).map(|i| i as i32).collect::>(); - let all_threads: usize = utils::get_all_threads(); c.bench_function("minmax_p_50M_f32", |b| { b.iter(|| { - minmax_mod::min_max_without_x_parallel( - black_box(data.as_slice()), - black_box(2_000), - black_box(all_threads), - ) + minmax_mod::min_max_without_x_parallel(black_box(data.as_slice()), black_box(2_000)) }) }); c.bench_function("minmax_x_p_50M_f32", |b| { @@ -71,7 +61,6 @@ fn minmax_f32_random_array_50M_long_multi_core(c: &mut Criterion) { black_box(x.as_slice()), black_box(data.as_slice()), black_box(2_000), - black_box(all_threads), ) }) }); diff --git a/downsample_rs/benches/bench_minmaxlttb.rs b/downsample_rs/benches/bench_minmaxlttb.rs index a0241de..bb78e09 100644 --- a/downsample_rs/benches/bench_minmaxlttb.rs +++ b/downsample_rs/benches/bench_minmaxlttb.rs @@ -25,7 +25,6 @@ fn minmaxlttb_f32_random_array_long_multi_core(c: &mut Criterion) { let n = config::ARRAY_LENGTH_LONG; let x = (0..n).map(|i| i as i32).collect::>(); let y = utils::get_random_array::(n, f32::MIN, f32::MAX); - let all_threads: usize = utils::get_all_threads(); c.bench_function("mlttb_x_p_f32", |b| { b.iter(|| { minmaxlttb_mod::minmaxlttb_with_x_parallel( @@ -33,7 +32,6 @@ fn minmaxlttb_f32_random_array_long_multi_core(c: &mut Criterion) { black_box(y.as_slice()), black_box(2_000), black_box(MINMAX_RATIO), - black_box(all_threads), ) }) }); @@ -59,7 +57,6 @@ fn minmaxlttb_f32_random_array_50M_multi_core(c: &mut Criterion) { let n = 50_000_000; let x = (0..n).map(|i| i as i32).collect::>(); let y = utils::get_random_array::(n, f32::MIN, f32::MAX); - let all_threads: usize = utils::get_all_threads(); c.bench_function("mlttb_x_p_50M_f32", |b| { b.iter(|| { minmaxlttb_mod::minmaxlttb_with_x_parallel( @@ -67,7 +64,6 @@ fn minmaxlttb_f32_random_array_50M_multi_core(c: &mut Criterion) { black_box(y.as_slice()), black_box(2_000), black_box(MINMAX_RATIO), - black_box(all_threads), ) }) }); @@ -90,14 +86,12 @@ fn minmaxlttb_without_x_f32_random_array_50M_single_core(c: &mut Criterion) { fn minmaxlttb_without_x_f32_random_array_50M_multi_core(c: &mut Criterion) { let n = 50_000_000; let y = utils::get_random_array::(n, f32::MIN, f32::MAX); - let all_threads: usize = utils::get_all_threads(); c.bench_function("mlttb_p_50M_f32", |b| { b.iter(|| { minmaxlttb_mod::minmaxlttb_without_x_parallel( black_box(y.as_slice()), black_box(2_000), black_box(MINMAX_RATIO), - black_box(all_threads), ) }) }); diff --git a/downsample_rs/dev_utils/src/utils.rs b/downsample_rs/dev_utils/src/utils.rs index f3413ae..975fa65 100644 --- a/downsample_rs/dev_utils/src/utils.rs +++ b/downsample_rs/dev_utils/src/utils.rs @@ -33,11 +33,3 @@ where } arr } - -// ------------- Multi-threading ------------- - -use std::thread::available_parallelism; - -pub fn get_all_threads() -> usize { - available_parallelism().map(|x| x.get()).unwrap_or(1) -} diff --git a/downsample_rs/src/lib.rs b/downsample_rs/src/lib.rs index f678a45..1e222c0 100644 --- a/downsample_rs/src/lib.rs +++ b/downsample_rs/src/lib.rs @@ -14,3 +14,21 @@ pub use m4::*; pub(crate) mod helpers; pub(crate) mod searchsorted; pub(crate) mod types; + +use once_cell::sync::Lazy; +use rayon::{ThreadPool, ThreadPoolBuilder}; + +pub static POOL: Lazy = Lazy::new(|| { + ThreadPoolBuilder::new() + .num_threads( + std::env::var("TSDOWNSAMPLE_MAX_THREADS") + .map(|s| s.parse::().expect("integer")) + .unwrap_or_else(|_| { + std::thread::available_parallelism() + .unwrap_or(std::num::NonZeroUsize::new(1).unwrap()) + .get() + }), + ) + .build() + .expect("could not spawn threads") +}); diff --git a/downsample_rs/src/m4.rs b/downsample_rs/src/m4.rs index 985cab1..e44aa40 100644 --- a/downsample_rs/src/m4.rs +++ b/downsample_rs/src/m4.rs @@ -3,10 +3,11 @@ use num_traits::{AsPrimitive, FromPrimitive}; use rayon::iter::IndexedParallelIterator; use rayon::prelude::*; -use crate::searchsorted::{ +use super::searchsorted::{ get_equidistant_bin_idx_iterator, get_equidistant_bin_idx_iterator_parallel, }; -use crate::types::Num; +use super::types::Num; +use super::POOL; // ----------------------------------- NON-PARALLEL ------------------------------------ @@ -37,22 +38,15 @@ where // ----------- WITH X -pub fn m4_with_x_parallel( - x: &[Tx], - arr: &[Ty], - n_out: usize, - n_threads: usize, -) -> Vec +pub fn m4_with_x_parallel(x: &[Tx], arr: &[Ty], n_out: usize) -> Vec where for<'a> &'a [Ty]: ArgMinMax, Tx: Num + FromPrimitive + AsPrimitive + Send + Sync, Ty: Copy + PartialOrd + Send + Sync, { assert_eq!(n_out % 4, 0); - let bin_idx_iterator = get_equidistant_bin_idx_iterator_parallel(x, n_out / 4, n_threads); - m4_generic_with_x_parallel(arr, bin_idx_iterator, n_out, n_threads, |arr| { - arr.argminmax() - }) + let bin_idx_iterator = get_equidistant_bin_idx_iterator_parallel(x, n_out / 4); + m4_generic_with_x_parallel(arr, bin_idx_iterator, n_out, |arr| arr.argminmax()) } // ----------- WITHOUT X @@ -60,13 +54,12 @@ where pub fn m4_without_x_parallel( arr: &[T], n_out: usize, - n_threads: usize, ) -> Vec where for<'a> &'a [T]: ArgMinMax, { assert_eq!(n_out % 4, 0); - m4_generic_parallel(arr, n_out, n_threads, |arr| arr.argminmax()) + m4_generic_parallel(arr, n_out, |arr| arr.argminmax()) } // TODO: check for duplicate data in the output array @@ -125,7 +118,6 @@ pub(crate) fn m4_generic( pub(crate) fn m4_generic_parallel( arr: &[T], n_out: usize, - n_threads: usize, f_argminmax: fn(&[T]) -> (usize, usize), ) -> Vec { // Assumes n_out is a multiple of 4 @@ -137,39 +129,34 @@ pub(crate) fn m4_generic_parallel( let block_size: f64 = (arr.len() - 1) as f64 / (n_out / 4) as f64; // Store the enumerated indexes in the output array + // These indexes are used to calculate the start and end indexes of each bin in + // the multi-threaded execution let mut sampled_indices: Vec = (0..n_out).collect::>(); - // to limit the amounts of threads Rayon uses, an explicit threadpool needs to be created - // in which the required code is "installed". This limits the amount of used threads. - // https://docs.rs/rayon/latest/rayon/struct.ThreadPool.html#method.install - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(n_threads) - .build(); - - let func = || { - for chunk in sampled_indices.chunks_exact_mut(4) { - let i: f64 = unsafe { *chunk.get_unchecked(0) >> 2 } as f64; - let start_idx: usize = (block_size * i) as usize + (i != 0.0) as usize; - let end_idx: usize = (block_size * (i + 1.0)) as usize + 1; - - let (min_index, max_index) = f_argminmax(&arr[start_idx..end_idx]); - - chunk[0] = start_idx; - // Add the indexes in sorted order - if min_index < max_index { - chunk[1] = min_index + start_idx; - chunk[2] = max_index + start_idx; - } else { - chunk[1] = max_index + start_idx; - chunk[2] = min_index + start_idx; - } - chunk[3] = end_idx - 1; - } - }; + POOL.install(|| { + sampled_indices + .par_chunks_exact_mut(4) + .for_each(|sampled_index_chunk| { + let i: f64 = unsafe { *sampled_index_chunk.get_unchecked(0) >> 2 } as f64; + let start_idx: usize = (block_size * i) as usize + (i != 0.0) as usize; + let end_idx: usize = (block_size * (i + 1.0)) as usize + 1; + + let (min_index, max_index) = f_argminmax(&arr[start_idx..end_idx]); - pool.unwrap().install(func); // allow panic if pool could not be created + sampled_index_chunk[0] = start_idx; + // Add the indexes in sorted order + if min_index < max_index { + sampled_index_chunk[1] = min_index + start_idx; + sampled_index_chunk[2] = max_index + start_idx; + } else { + sampled_index_chunk[1] = max_index + start_idx; + sampled_index_chunk[2] = min_index + start_idx; + } + sampled_index_chunk[3] = end_idx - 1; + }) + }); - sampled_indices.to_vec() + sampled_indices } // --------------------- WITH X @@ -224,7 +211,6 @@ pub(crate) fn m4_generic_with_x_parallel( arr: &[T], bin_idx_iterator: impl IndexedParallelIterator>>, n_out: usize, - n_threads: usize, f_argminmax: fn(&[T]) -> (usize, usize), ) -> Vec { // Assumes n_out is a multiple of 4 @@ -232,14 +218,7 @@ pub(crate) fn m4_generic_with_x_parallel( return (0..arr.len()).collect::>(); } - // to limit the amounts of threads Rayon uses, an explicit threadpool needs to be created - // in which the required code is "installed". This limits the amount of used threads. - // https://docs.rs/rayon/latest/rayon/struct.ThreadPool.html#method.install - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(n_threads) - .build(); - - let iter_func = || { + POOL.install(|| { bin_idx_iterator .flat_map(|bin_idx_iterator| { bin_idx_iterator @@ -275,10 +254,7 @@ pub(crate) fn m4_generic_with_x_parallel( }) .flatten() .collect::>() - }; - - let result = pool.unwrap().install(iter_func); // allow panic if pool could not be created - result + }) } #[cfg(test)] @@ -296,14 +272,13 @@ mod tests { utils::get_random_array(n, f32::MIN, f32::MAX) } - // Template for the n_threads matrix + // Template for n_out #[template] #[rstest] - #[case(1)] - #[case(utils::get_all_threads() / 2)] - #[case(utils::get_all_threads())] - #[case(utils::get_all_threads() * 2)] - fn threads(#[case] n_threads: usize) {} + #[case(196)] + #[case(200)] + #[case(204)] + fn n_outs(#[case] n_out: usize) {} #[test] fn test_m4_scalar_without_x_correct() { @@ -325,11 +300,11 @@ mod tests { assert_eq!(sampled_values, expected_values); } - #[apply(threads)] - fn test_m4_scalar_without_x_parallel_correct(n_threads: usize) { + #[test] + fn test_m4_scalar_without_x_parallel_correct() { let arr: [f32; 100] = core::array::from_fn(|i| i.as_()); - let sampled_indices = m4_without_x_parallel(&arr, 12, n_threads); + let sampled_indices = m4_without_x_parallel(&arr, 12); let sampled_values = sampled_indices .iter() .map(|x| arr[*x]) @@ -366,12 +341,12 @@ mod tests { assert_eq!(sampled_values, expected_values); } - #[apply(threads)] - fn test_m4_scalar_with_x_parallel_correct(n_threads: usize) { + #[test] + fn test_m4_scalar_with_x_parallel_correct() { let x: [i32; 100] = core::array::from_fn(|i| i.as_()); let arr: [f32; 100] = core::array::from_fn(|i| i.as_()); - let sampled_indices = m4_with_x_parallel(&x, &arr, 12, n_threads); + let sampled_indices = m4_with_x_parallel(&x, &arr, 12); let sampled_values = sampled_indices .iter() .map(|x| arr[*x]) @@ -410,14 +385,14 @@ mod tests { assert_eq!(sampled_indices, expected_indices); } - #[apply(threads)] - fn test_m4_scalar_with_x_gap_parallel(n_threads: usize) { + #[test] + fn test_m4_scalar_with_x_gap_parallel() { // We will create a gap in the middle of the array // Increment the second half of the array by 50 let x: [i32; 100] = core::array::from_fn(|i| if i > 50 { (i + 50).as_() } else { i.as_() }); let arr: [f32; 100] = core::array::from_fn(|i| i.as_()); - let sampled_indices = m4_with_x_parallel(&x, &arr, 20, n_threads); + let sampled_indices = m4_with_x_parallel(&x, &arr, 20); assert_eq!(sampled_indices.len(), 16); // One full gap let expected_indices = vec![0, 0, 29, 29, 30, 30, 50, 50, 51, 51, 69, 69, 70, 70, 99, 99]; assert_eq!(sampled_indices, expected_indices); @@ -425,7 +400,7 @@ mod tests { // Increment the second half of the array by 50 again let x = x.map(|x| if x > 101 { x + 50 } else { x }); - let sampled_indices = m4_with_x_parallel(&x, &arr, 20, n_threads); + let sampled_indices = m4_with_x_parallel(&x, &arr, 20); assert_eq!(sampled_indices.len(), 17); // Gap with 1 value let expected_indices = vec![ 0, 0, 39, 39, 40, 40, 50, 50, 51, 52, 52, 59, 59, 60, 60, 99, 99, @@ -433,20 +408,19 @@ mod tests { assert_eq!(sampled_indices, expected_indices); } - #[apply(threads)] - fn test_many_random_runs_correct(n_threads: usize) { + #[apply(n_outs)] + fn test_many_random_runs_correct(n_out: usize) { const N: usize = 20_003; - const N_OUT: usize = 204; let x: [i32; N] = core::array::from_fn(|i| i.as_()); for _ in 0..100 { let arr = get_array_f32(N); - let idxs1 = m4_without_x(arr.as_slice(), N_OUT); - let idxs2 = m4_with_x(&x, arr.as_slice(), N_OUT); + let idxs1 = m4_without_x(arr.as_slice(), n_out); + let idxs2 = m4_with_x(&x, arr.as_slice(), n_out); assert_eq!(idxs1, idxs2); - let idxs3 = m4_without_x_parallel(arr.as_slice(), N_OUT, n_threads); - let idxs4 = m4_with_x_parallel(&x, arr.as_slice(), N_OUT, n_threads); + let idxs3 = m4_without_x_parallel(arr.as_slice(), n_out); + let idxs4 = m4_with_x_parallel(&x, arr.as_slice(), n_out); assert_eq!(idxs1, idxs3); - assert_eq!(idxs1, idxs4); // TODO: this should not fail when n_threads = 16 + assert_eq!(idxs1, idxs4); // TODO: this fails when nb. of threads = 16 } } } diff --git a/downsample_rs/src/minmax.rs b/downsample_rs/src/minmax.rs index e27b2d4..0f5ba79 100644 --- a/downsample_rs/src/minmax.rs +++ b/downsample_rs/src/minmax.rs @@ -8,6 +8,7 @@ use super::searchsorted::{ get_equidistant_bin_idx_iterator, get_equidistant_bin_idx_iterator_parallel, }; use super::types::Num; +use super::POOL; // ----------------------------------- NON-PARALLEL ------------------------------------ @@ -38,22 +39,15 @@ where // ----------- WITH X -pub fn min_max_with_x_parallel( - x: &[Tx], - arr: &[Ty], - n_out: usize, - n_threads: usize, -) -> Vec +pub fn min_max_with_x_parallel(x: &[Tx], arr: &[Ty], n_out: usize) -> Vec where for<'a> &'a [Ty]: ArgMinMax, Tx: Num + FromPrimitive + AsPrimitive + Send + Sync, Ty: Copy + PartialOrd + Send + Sync, { assert_eq!(n_out % 2, 0); - let bin_idx_iterator = get_equidistant_bin_idx_iterator_parallel(x, n_out / 2, n_threads); - min_max_generic_with_x_parallel(arr, bin_idx_iterator, n_out, n_threads, |arr| { - arr.argminmax() - }) + let bin_idx_iterator = get_equidistant_bin_idx_iterator_parallel(x, n_out / 2); + min_max_generic_with_x_parallel(arr, bin_idx_iterator, n_out, |arr| arr.argminmax()) } // ----------- WITHOUT X @@ -61,13 +55,12 @@ where pub fn min_max_without_x_parallel( arr: &[T], n_out: usize, - n_threads: usize, ) -> Vec where for<'a> &'a [T]: ArgMinMax, { assert_eq!(n_out % 2, 0); - min_max_generic_parallel(arr, n_out, n_threads, |arr| arr.argminmax()) + min_max_generic_parallel(arr, n_out, |arr| arr.argminmax()) } // ----------------------------------- GENERICS ------------------------------------ @@ -118,7 +111,6 @@ pub(crate) fn min_max_generic( pub(crate) fn min_max_generic_parallel( arr: &[T], n_out: usize, - n_threads: usize, f_argminmax: fn(&[T]) -> (usize, usize), ) -> Vec { // Assumes n_out is a multiple of 2 @@ -130,38 +122,32 @@ pub(crate) fn min_max_generic_parallel( let block_size: f64 = (arr.len() - 1) as f64 / (n_out / 2) as f64; // Store the enumerated indexes in the output array - // let mut sampled_indices: Array1 = Array1::from_vec((0..n_out).collect::>()); + // These indexes are used to calculate the start and end indexes of each bin in + // the multi-threaded execution let mut sampled_indices: Vec = (0..n_out).collect::>(); - // to limit the amounts of threads Rayon uses, an explicit threadpool needs to be created - // in which the required code is "installed". This limits the amount of used threads. - // https://docs.rs/rayon/latest/rayon/struct.ThreadPool.html#method.install - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(n_threads) - .build(); - - let func = || { - for chunk in sampled_indices.chunks_exact_mut(2) { - let i: f64 = unsafe { *chunk.get_unchecked(0) >> 1 } as f64; - let start_idx: usize = (block_size * i) as usize + (i != 0.0) as usize; - let end_idx: usize = (block_size * (i + 1.0)) as usize + 1; - - let (min_index, max_index) = f_argminmax(&arr[start_idx..end_idx]); - - // Add the indexes in sorted order - if min_index < max_index { - chunk[0] = min_index + start_idx; - chunk[1] = max_index + start_idx; - } else { - chunk[0] = max_index + start_idx; - chunk[1] = min_index + start_idx; - } - } - }; + POOL.install(|| { + sampled_indices + .par_chunks_exact_mut(2) + .for_each(|sampled_index_chunk| { + let i: f64 = unsafe { *sampled_index_chunk.get_unchecked(0) >> 1 } as f64; + let start_idx: usize = (block_size * i) as usize + (i != 0.0) as usize; + let end_idx: usize = (block_size * (i + 1.0)) as usize + 1; + + let (min_index, max_index) = f_argminmax(&arr[start_idx..end_idx]); - pool.unwrap().install(func); // allow panic if pool could not be created + // Add the indexes in sorted order + if min_index < max_index { + sampled_index_chunk[0] = min_index + start_idx; + sampled_index_chunk[1] = max_index + start_idx; + } else { + sampled_index_chunk[0] = max_index + start_idx; + sampled_index_chunk[1] = min_index + start_idx; + } + }) + }); - sampled_indices.to_vec() + sampled_indices } // --------------------- WITH X @@ -212,7 +198,6 @@ pub(crate) fn min_max_generic_with_x_parallel( arr: &[T], bin_idx_iterator: impl IndexedParallelIterator>>, n_out: usize, - n_threads: usize, f_argminmax: fn(&[T]) -> (usize, usize), ) -> Vec { // Assumes n_out is a multiple of 2 @@ -220,14 +205,7 @@ pub(crate) fn min_max_generic_with_x_parallel( return (0..arr.len()).collect::>(); } - // to limit the amounts of threads Rayon uses, an explicit threadpool needs to be created - // in which the required code is "installed". This limits the amount of used threads. - // https://docs.rs/rayon/latest/rayon/struct.ThreadPool.html#method.install - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(n_threads) - .build(); - - let iter_func = || { + POOL.install(|| { bin_idx_iterator .flat_map(|bin_idx_iterator| { bin_idx_iterator @@ -259,9 +237,7 @@ pub(crate) fn min_max_generic_with_x_parallel( }) .flatten() .collect::>() - }; - - pool.unwrap().install(iter_func) // allow panic if pool could not be created + }) } #[cfg(test)] @@ -279,14 +255,13 @@ mod tests { utils::get_random_array(n, f32::MIN, f32::MAX) } - // Template for the n_threads matrix + // Template for n_out #[template] #[rstest] - #[case(1)] - #[case(utils::get_all_threads() / 2)] - #[case(utils::get_all_threads())] - #[case(utils::get_all_threads() * 2)] - fn threads(#[case] n_threads: usize) {} + #[case(198)] + #[case(200)] + #[case(202)] + fn n_outs(#[case] n_out: usize) {} #[test] fn test_min_max_scalar_without_x_correct() { @@ -308,11 +283,11 @@ mod tests { assert_eq!(sampled_values, expected_values); } - #[apply(threads)] - fn test_min_max_scalar_without_x_parallel_correct(n_threads: usize) { + #[test] + fn test_min_max_scalar_without_x_parallel_correct() { let arr: [f32; 100] = core::array::from_fn(|i| i.as_()); - let sampled_indices = min_max_without_x_parallel(&arr, 10, n_threads); + let sampled_indices = min_max_without_x_parallel(&arr, 10); let sampled_values = sampled_indices .iter() .map(|x| arr[*x]) @@ -349,12 +324,12 @@ mod tests { assert_eq!(sampled_values, expected_values); } - #[apply(threads)] - fn test_min_max_scalar_with_x_parallel_correct(n_threads: usize) { + #[test] + fn test_min_max_scalar_with_x_parallel_correct() { let x: [i32; 100] = core::array::from_fn(|i| i.as_()); let arr: [f32; 100] = core::array::from_fn(|i| i.as_()); - let sampled_indices = min_max_with_x_parallel(&x, &arr, 10, n_threads); + let sampled_indices = min_max_with_x_parallel(&x, &arr, 10); let sampled_values = sampled_indices .iter() .map(|x| arr[*x]) @@ -391,14 +366,14 @@ mod tests { assert_eq!(sampled_indices, expected_indices); } - #[apply(threads)] - fn test_min_max_scalar_with_x_parallel_gap(n_threads: usize) { + #[test] + fn test_min_max_scalar_with_x_parallel_gap() { // Create a gap in the middle of the array // Increment the second half of the array by 50 let x: [i32; 100] = core::array::from_fn(|i| if i > 50 { (i + 50).as_() } else { i.as_() }); let arr: [f32; 100] = core::array::from_fn(|i| i.as_()); - let sampled_indices = min_max_with_x_parallel(&x, &arr, 10, n_threads); + let sampled_indices = min_max_with_x_parallel(&x, &arr, 10); assert_eq!(sampled_indices.len(), 8); // One full gap let expected_indices = vec![0, 29, 30, 50, 51, 69, 70, 99]; assert_eq!(sampled_indices, expected_indices); @@ -406,24 +381,23 @@ mod tests { // Increment the second half of the array by 50 again let x = x.map(|i| if i > 101 { i + 50 } else { i }); - let sampled_indices = min_max_with_x_parallel(&x, &arr, 10, n_threads); + let sampled_indices = min_max_with_x_parallel(&x, &arr, 10); assert_eq!(sampled_indices.len(), 9); // Gap with 1 value let expected_indices = vec![0, 39, 40, 50, 51, 52, 59, 60, 99]; assert_eq!(sampled_indices, expected_indices); } - #[apply(threads)] - fn test_many_random_runs_same_output(n_threads: usize) { + #[apply(n_outs)] + fn test_many_random_runs_same_output(n_out: usize) { const N: usize = 20_003; - const N_OUT: usize = 202; let x: [i32; N] = core::array::from_fn(|i| i.as_()); for _ in 0..100 { let mut arr = get_array_f32(N); arr[N - 1] = f32::INFINITY; // Make sure the last value is always the max - let idxs1 = min_max_without_x(arr.as_slice(), N_OUT); - let idxs2 = min_max_without_x_parallel(arr.as_slice(), N_OUT, n_threads); - let idxs3 = min_max_with_x(&x, arr.as_slice(), N_OUT); - let idxs4 = min_max_with_x_parallel(&x, arr.as_slice(), N_OUT, n_threads); + let idxs1 = min_max_without_x(arr.as_slice(), n_out); + let idxs2 = min_max_without_x_parallel(arr.as_slice(), n_out); + let idxs3 = min_max_with_x(&x, arr.as_slice(), n_out); + let idxs4 = min_max_with_x_parallel(&x, arr.as_slice(), n_out); assert_eq!(idxs1, idxs2); assert_eq!(idxs1, idxs3); assert_eq!(idxs1, idxs4); diff --git a/downsample_rs/src/minmaxlttb.rs b/downsample_rs/src/minmaxlttb.rs index 9916b70..27fab1b 100644 --- a/downsample_rs/src/minmaxlttb.rs +++ b/downsample_rs/src/minmaxlttb.rs @@ -19,14 +19,7 @@ pub fn minmaxlttb_with_x + FromPrimitive, Ty: Num + A where for<'a> &'a [Ty]: ArgMinMax, { - minmaxlttb_generic( - x, - y, - n_out, - minmax_ratio, - None, - MinMaxFunctionWithX::Serial(minmax::min_max_with_x), - ) + minmaxlttb_generic(x, y, n_out, minmax_ratio, minmax::min_max_with_x) } // ----------- WITHOUT X @@ -39,13 +32,7 @@ pub fn minmaxlttb_without_x>( where for<'a> &'a [Ty]: ArgMinMax, { - minmaxlttb_generic_without_x( - y, - n_out, - minmax_ratio, - None, - MinMaxFunctionWithoutX::Serial(minmax::min_max_without_x), - ) + minmaxlttb_generic_without_x(y, n_out, minmax_ratio, minmax::min_max_without_x) } // ------------------------------------- PARALLEL -------------------------------------- @@ -60,19 +47,11 @@ pub fn minmaxlttb_with_x_parallel< y: &[Ty], n_out: usize, minmax_ratio: usize, - n_threads: usize, ) -> Vec where for<'a> &'a [Ty]: ArgMinMax, { - minmaxlttb_generic( - x, - y, - n_out, - minmax_ratio, - Some(n_threads), - MinMaxFunctionWithX::Parallel(minmax::min_max_with_x_parallel), - ) + minmaxlttb_generic(x, y, n_out, minmax_ratio, minmax::min_max_with_x_parallel) } // ----------- WITHOUT X @@ -81,44 +60,22 @@ pub fn minmaxlttb_without_x_parallel + Send + Sync>( y: &[Ty], n_out: usize, minmax_ratio: usize, - n_threads: usize, ) -> Vec where for<'a> &'a [Ty]: ArgMinMax, { - minmaxlttb_generic_without_x( - y, - n_out, - minmax_ratio, - Some(n_threads), - MinMaxFunctionWithoutX::Parallel(minmax::min_max_without_x_parallel), - ) + minmaxlttb_generic_without_x(y, n_out, minmax_ratio, minmax::min_max_without_x_parallel) } // ----------------------------------- GENERICS ------------------------------------ -// types to make function signatures easier to read -type ThreadCount = usize; -type OutputCount = usize; - -pub enum MinMaxFunctionWithX, Ty: Num + AsPrimitive> { - Serial(fn(&[Tx], &[Ty], OutputCount) -> Vec), - Parallel(fn(&[Tx], &[Ty], OutputCount, ThreadCount) -> Vec), -} - -pub enum MinMaxFunctionWithoutX> { - Serial(fn(&[Ty], OutputCount) -> Vec), - Parallel(fn(&[Ty], OutputCount, ThreadCount) -> Vec), -} - #[inline(always)] pub(crate) fn minmaxlttb_generic, Ty: Num + AsPrimitive>( x: &[Tx], y: &[Ty], n_out: usize, minmax_ratio: usize, - n_threads: Option, - f_minmax: MinMaxFunctionWithX, + f_minmax: fn(&[Tx], &[Ty], usize) -> Vec, ) -> Vec where for<'a> &'a [Ty]: ArgMinMax, @@ -128,19 +85,11 @@ where // Apply first min max aggregation (if above ratio) if x.len() / n_out > minmax_ratio { // Get index of min max points - let mut index = match f_minmax { - MinMaxFunctionWithX::Serial(func) => func( - &x[1..(x.len() - 1)], - &y[1..(x.len() - 1)], - n_out * minmax_ratio, - ), - MinMaxFunctionWithX::Parallel(func) => func( - &x[1..(x.len() - 1)], - &y[1..(x.len() - 1)], - n_out * minmax_ratio, - n_threads.unwrap(), // n_threads cannot be None - ), - }; + let mut index = f_minmax( + &x[1..(x.len() - 1)], + &y[1..(x.len() - 1)], + n_out * minmax_ratio, + ); // inplace + 1 index.iter_mut().for_each(|elem| *elem += 1); // Prepend first and last point @@ -176,8 +125,7 @@ pub(crate) fn minmaxlttb_generic_without_x>( y: &[Ty], n_out: usize, minmax_ratio: usize, - n_threads: Option, - f_minmax: MinMaxFunctionWithoutX, + f_minmax: fn(&[Ty], usize) -> Vec, ) -> Vec where for<'a> &'a [Ty]: ArgMinMax, @@ -186,16 +134,7 @@ where // Apply first min max aggregation (if above ratio) if y.len() / n_out > minmax_ratio { // Get index of min max points - let mut index = match f_minmax { - MinMaxFunctionWithoutX::Serial(func) => { - func(&y[1..(y.len() - 1)], n_out * minmax_ratio) - } - MinMaxFunctionWithoutX::Parallel(func) => func( - &y[1..(y.len() - 1)], - n_out * minmax_ratio, - n_threads.unwrap(), // n_threads cannot be None - ), - }; + let mut index = f_minmax(&y[1..(y.len() - 1)], n_out * minmax_ratio); // inplace + 1 index.iter_mut().for_each(|elem| *elem += 1); // Prepend first and last point @@ -234,14 +173,13 @@ mod tests { utils::get_random_array(n, f32::MIN, f32::MAX) } - // Template for the n_threads matrix + // Template for n_out #[template] #[rstest] - #[case(1)] - #[case(utils::get_all_threads() / 2)] - #[case(utils::get_all_threads())] - #[case(utils::get_all_threads() * 2)] - fn threads(#[case] n_threads: usize) {} + #[case(98)] + #[case(100)] + #[case(102)] + fn n_outs(#[case] n_out: usize) {} #[test] fn test_minmaxlttb_with_x() { @@ -258,32 +196,30 @@ mod tests { assert_eq!(sampled_indices, vec![0, 1, 5, 9]); } - #[apply(threads)] - fn test_minmaxlttb_with_x_parallel(n_threads: usize) { + #[test] + fn test_minmaxlttb_with_x_parallel() { let x = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; let y = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]; - let sampled_indices = minmaxlttb_with_x_parallel(&x, &y, 4, 2, n_threads); + let sampled_indices = minmaxlttb_with_x_parallel(&x, &y, 4, 2); assert_eq!(sampled_indices, vec![0, 1, 5, 9]); } - #[apply(threads)] - fn test_minmaxlttb_without_x_parallel(n_threads: usize) { + #[test] + fn test_minmaxlttb_without_x_parallel() { let y = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]; - let sampled_indices = minmaxlttb_without_x_parallel(&y, 4, 2, n_threads); + let sampled_indices = minmaxlttb_without_x_parallel(&y, 4, 2); assert_eq!(sampled_indices, vec![0, 1, 5, 9]); } - #[apply(threads)] - fn test_many_random_runs_same_output(n_threads: usize) { + #[apply(n_outs)] + fn test_many_random_runs_same_output(n_out: usize) { const N: usize = 20_000; - const N_OUT: usize = 100; const MINMAX_RATIO: usize = 5; for _ in 0..100 { // TODO: test with x let arr = get_array_f32(N); - let idxs1 = minmaxlttb_without_x(arr.as_slice(), N_OUT, MINMAX_RATIO); - let idxs2 = - minmaxlttb_without_x_parallel(arr.as_slice(), N_OUT, MINMAX_RATIO, n_threads); + let idxs1 = minmaxlttb_without_x(arr.as_slice(), n_out, MINMAX_RATIO); + let idxs2 = minmaxlttb_without_x_parallel(arr.as_slice(), n_out, MINMAX_RATIO); assert_eq!(idxs1, idxs2); } } diff --git a/downsample_rs/src/searchsorted.rs b/downsample_rs/src/searchsorted.rs index af53a9c..c02a3e1 100644 --- a/downsample_rs/src/searchsorted.rs +++ b/downsample_rs/src/searchsorted.rs @@ -2,6 +2,7 @@ use rayon::iter::IndexedParallelIterator; use rayon::prelude::*; use super::types::Num; +use super::POOL; use num_traits::{AsPrimitive, FromPrimitive}; // ---------------------- Binary search ---------------------- @@ -128,7 +129,6 @@ fn sequential_add_mul(start_val: f64, add_val: f64, mul: usize) -> f64 { pub(crate) fn get_equidistant_bin_idx_iterator_parallel( arr: &[T], nb_bins: usize, - n_threads: usize, ) -> impl IndexedParallelIterator> + '_> + '_ where T: Num + FromPrimitive + AsPrimitive + Sync + Send, @@ -140,7 +140,7 @@ where (arr[arr.len() - 1].as_() / nb_bins as f64) - (arr[0].as_() / nb_bins as f64); let arr0: f64 = arr[0].as_(); // The first value of the array // 2. Compute the number of threads & bins per thread - let n_threads = std::cmp::min(n_threads, nb_bins); + let n_threads = std::cmp::min(POOL.current_num_threads(), nb_bins); let nb_bins_per_thread = nb_bins / n_threads; let nb_bins_last_thread = nb_bins - nb_bins_per_thread * (n_threads - 1); // 3. Iterate over the number of threads @@ -188,16 +188,15 @@ mod tests { use super::*; - use dev_utils::utils::{get_all_threads, get_random_array}; + use dev_utils::utils::get_random_array; - // Template for the n_threads matrix + // Template for nb_bins #[template] #[rstest] - #[case(1)] - #[case(get_all_threads() / 2)] - #[case(get_all_threads())] - #[case(get_all_threads() * 2)] - fn threads(#[case] n_threads: usize) {} + #[case(99)] + #[case(100)] + #[case(101)] + fn nb_bins(#[case] nb_bins: usize) {} #[test] fn test_search_sorted_identicial_to_np_linspace_searchsorted() { @@ -250,8 +249,8 @@ mod tests { // assert_eq!(binary_search_with_mid(&arr, 11, 0, arr.len() - 1, 9), 10); } - #[apply(threads)] - fn test_get_equidistant_bin_idxs(n_threads: usize) { + #[test] + fn test_get_equidistant_bin_idxs() { let expected_indices = vec![0, 4, 7]; let arr = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; @@ -259,7 +258,7 @@ mod tests { let bin_idxs = bin_idxs_iter.map(|x| x.unwrap().0).collect::>(); assert_eq!(bin_idxs, expected_indices); - let bin_idxs_iter = get_equidistant_bin_idx_iterator_parallel(&arr, 3, n_threads); + let bin_idxs_iter = get_equidistant_bin_idx_iterator_parallel(&arr, 3); let bin_idxs = bin_idxs_iter .map(|x| x.map(|x| x.unwrap().0).collect::>()) .flatten() @@ -267,10 +266,9 @@ mod tests { assert_eq!(bin_idxs, expected_indices); } - #[apply(threads)] - fn test_many_random_same_result(n_threads: usize) { + #[apply(nb_bins)] + fn test_many_random_same_result(nb_bins: usize) { let n = 5_000; - let nb_bins = 100; for _ in 0..100 { let mut arr = get_random_array::(n, i32::MIN, i32::MAX); @@ -282,8 +280,7 @@ mod tests { let bin_idxs = bin_idxs_iter.map(|x| x.unwrap().0).collect::>(); // Calculate the bin indexes in parallel - let bin_idxs_iter = - get_equidistant_bin_idx_iterator_parallel(&arr[..], nb_bins, n_threads); + let bin_idxs_iter = get_equidistant_bin_idx_iterator_parallel(&arr[..], nb_bins); let bin_idxs_parallel = bin_idxs_iter .map(|x| x.map(|x| x.unwrap().0).collect::>()) .flatten() diff --git a/pyproject.toml b/pyproject.toml index 5941fcb..e186284 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "maturin" [project] name = "tsdownsample" description = "Time series downsampling in rust" -version = "0.1.3rc1" +version = "0.1.3rc2" requires-python = ">=3.7" dependencies = ["numpy"] authors = [{name = "Jeroen Van Der Donckt"}] diff --git a/src/lib.rs b/src/lib.rs index 5a93afc..3979b6c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,25 +32,6 @@ macro_rules! _create_pyfunc_without_x { }; } -macro_rules! _create_pyfunc_without_x_multithreaded { - ($name:ident, $resample_mod:ident, $resample_fn:ident, $type:ty, $mod:ident) => { - // Create the Python function - #[pyfunction] - fn $name<'py>( - py: Python<'py>, - y: PyReadonlyArray1<$type>, - n_out: usize, - n_threads: usize, - ) -> &'py PyArray1 { - let y = y.as_slice().unwrap(); - let sampled_indices = $resample_mod::$resample_fn(y, n_out, n_threads); - sampled_indices.into_pyarray(py) - } - // Add the function to the module - $mod.add_wrapped(wrap_pyfunction!($name))?; - }; -} - macro_rules! _create_pyfunc_without_x_with_ratio { ($name:ident, $resample_mod:ident, $resample_fn:ident, $type:ty, $mod:ident) => { // Create the Python function @@ -70,26 +51,6 @@ macro_rules! _create_pyfunc_without_x_with_ratio { }; } -macro_rules! _create_pyfunc_without_x_with_ratio_multithreaded { - ($name:ident, $resample_mod:ident, $resample_fn:ident, $type:ty, $mod:ident) => { - // Create the Python function - #[pyfunction] - fn $name<'py>( - py: Python<'py>, - y: PyReadonlyArray1<$type>, - n_out: usize, - ratio: usize, - n_threads: usize, - ) -> &'py PyArray1 { - let y = y.as_slice().unwrap(); - let sampled_indices = $resample_mod::$resample_fn(y, n_out, ratio, n_threads); - sampled_indices.into_pyarray(py) - } - // Add the function to the module - $mod.add_wrapped(wrap_pyfunction!($name))?; - }; -} - macro_rules! _create_pyfuncs_without_x_generic { ($create_macro:ident, $resample_mod:ident, $resample_fn:ident, $mod:ident, $($t:ty)*) => { $( @@ -122,27 +83,6 @@ macro_rules! _create_pyfunc_with_x { }; } -macro_rules! _create_pyfunc_with_x_multithreaded { - ($name:ident, $resample_mod:ident, $resample_fn:ident, $type_x:ty, $type_y:ty, $mod:ident) => { - // Create the Python function - #[pyfunction] - fn $name<'py>( - py: Python<'py>, - x: PyReadonlyArray1<$type_x>, - y: PyReadonlyArray1<$type_y>, - n_out: usize, - n_threads: usize, - ) -> &'py PyArray1 { - let x = x.as_slice().unwrap(); - let y = y.as_slice().unwrap(); - let sampled_indices = $resample_mod::$resample_fn(x, y, n_out, n_threads); - sampled_indices.into_pyarray(py) - } - // Add the function to the module - $mod.add_wrapped(wrap_pyfunction!($name))?; - }; -} - macro_rules! _create_pyfunc_with_x_with_ratio { ($name:ident, $resample_mod:ident, $resample_fn:ident, $type_x:ty, $type_y:ty, $mod:ident) => { // Create the Python function @@ -164,28 +104,6 @@ macro_rules! _create_pyfunc_with_x_with_ratio { }; } -macro_rules! _create_pyfunc_with_x_with_ratio_multithreaded { - ($name:ident, $resample_mod:ident, $resample_fn:ident, $type_x:ty, $type_y:ty, $mod:ident) => { - // Create the Python function - #[pyfunction] - fn $name<'py>( - py: Python<'py>, - x: PyReadonlyArray1<$type_x>, - y: PyReadonlyArray1<$type_y>, - n_out: usize, - ratio: usize, - n_threads: usize, - ) -> &'py PyArray1 { - let x = x.as_slice().unwrap(); - let y = y.as_slice().unwrap(); - let sampled_indices = $resample_mod::$resample_fn(x, y, n_out, ratio, n_threads); - sampled_indices.into_pyarray(py) - } - // Add the function to the module - $mod.add_wrapped(wrap_pyfunction!($name))?; - }; -} - macro_rules! _create_pyfuncs_with_x_generic { // ($create_macro:ident, $resample_mod:ident, $resample_fn:ident, $mod:ident, $($t:ty)+) => { // // The macro will implement the function for all combinations of $t (for type x and y). @@ -234,14 +152,6 @@ macro_rules! create_pyfuncs_without_x { $mod ); }; - (@threaded $resample_mod:ident, $resample_fn:ident, $mod:ident) => { - _create_pyfuncs_without_x_helper!( - _create_pyfunc_without_x_multithreaded, - $resample_mod, - $resample_fn, - $mod - ); - }; } macro_rules! create_pyfuncs_without_x_with_ratio { @@ -254,14 +164,6 @@ macro_rules! create_pyfuncs_without_x_with_ratio { $mod ); }; - (@threaded $resample_mod:ident, $resample_fn:ident, $mod:ident) => { - _create_pyfuncs_without_x_helper!( - _create_pyfunc_without_x_with_ratio_multithreaded, - $resample_mod, - $resample_fn, - $mod - ); - }; } macro_rules! _create_pyfuncs_with_x_helper { @@ -275,14 +177,6 @@ macro_rules! create_pyfuncs_with_x { ($resample_mod:ident, $resample_fn:ident, $mod:ident) => { _create_pyfuncs_with_x_helper!(_create_pyfunc_with_x, $resample_mod, $resample_fn, $mod); }; - (@threaded $resample_mod:ident, $resample_fn:ident, $mod:ident) => { - _create_pyfuncs_with_x_helper!( - _create_pyfunc_with_x_multithreaded, - $resample_mod, - $resample_fn, - $mod - ); - }; } macro_rules! create_pyfuncs_with_x_with_ratio { @@ -295,14 +189,6 @@ macro_rules! create_pyfuncs_with_x_with_ratio { $mod ); }; - (@threaded $resample_mod:ident, $resample_fn:ident, $mod:ident) => { - _create_pyfuncs_with_x_helper!( - _create_pyfunc_with_x_with_ratio_multithreaded, - $resample_mod, - $resample_fn, - $mod - ); - }; } // -------------------------------------- MINMAX --------------------------------------- @@ -332,12 +218,12 @@ fn minmax(_py: Python<'_>, m: &PyModule) -> PyResult<()> { // ----- WITHOUT X { - create_pyfuncs_without_x!(@threaded minmax_mod, min_max_without_x_parallel, parallel_mod); + create_pyfuncs_without_x!(minmax_mod, min_max_without_x_parallel, parallel_mod); } // ----- WITH X { - create_pyfuncs_with_x!(@threaded minmax_mod, min_max_with_x_parallel, parallel_mod); + create_pyfuncs_with_x!(minmax_mod, min_max_with_x_parallel, parallel_mod); } // Add the sub modules to the module @@ -374,12 +260,12 @@ fn m4(_py: Python, m: &PyModule) -> PyResult<()> { // ----- WITHOUT X { - create_pyfuncs_without_x!(@threaded m4_mod, m4_without_x_parallel, parallel_mod); + create_pyfuncs_without_x!(m4_mod, m4_without_x_parallel, parallel_mod); } // ----- WITH X { - create_pyfuncs_with_x!(@threaded m4_mod, m4_with_x_parallel, parallel_mod); + create_pyfuncs_with_x!(m4_mod, m4_with_x_parallel, parallel_mod); } // Add the sub modules to the module @@ -444,7 +330,7 @@ fn minmaxlttb(_py: Python, m: &PyModule) -> PyResult<()> { // ----- WITHOUT X { - create_pyfuncs_without_x_with_ratio!(@threaded + create_pyfuncs_without_x_with_ratio!( minmaxlttb_mod, minmaxlttb_without_x_parallel, parallel_mod @@ -453,11 +339,7 @@ fn minmaxlttb(_py: Python, m: &PyModule) -> PyResult<()> { // ----- WITH X { - create_pyfuncs_with_x_with_ratio!(@threaded - minmaxlttb_mod, - minmaxlttb_with_x_parallel, - parallel_mod - ); + create_pyfuncs_with_x_with_ratio!(minmaxlttb_mod, minmaxlttb_with_x_parallel, parallel_mod); } // Add the submodules to the module diff --git a/tests/benchmarks/test_downsamplers.py b/tests/benchmarks/test_downsamplers.py index 515ac1e..e257487 100644 --- a/tests/benchmarks/test_downsamplers.py +++ b/tests/benchmarks/test_downsamplers.py @@ -1,5 +1,3 @@ -import os - import numpy as np import pytest @@ -16,10 +14,6 @@ Y_DTYPES = [np.float32, np.float64] + [np.int32, np.int64] -def _parallel_to_n_threads(parallel): - return 0 if not parallel else os.cpu_count() - - # --------------------------------------------------------------------------- # # MinMaxDownsampler # --------------------------------------------------------------------------- # @@ -35,11 +29,10 @@ def test_minmax_no_x(benchmark, n_samples, n_out, dtype, parallel): downsampler = MinMaxDownsampler() n_samples = int(n_samples.replace(",", "")) n_out = int(n_out.replace(",", "")) - n_threads = _parallel_to_n_threads(parallel) y = np.random.randn(n_samples).astype(dtype) - benchmark(downsampler.downsample, y, n_out=n_out, n_threads=n_threads) + benchmark(downsampler.downsample, y, n_out=n_out, parallel=parallel) @pytest.mark.benchmark(group="minmax") @@ -52,12 +45,11 @@ def test_minmax_with_x(benchmark, n_samples, n_out, dtype, parallel): downsampler = MinMaxDownsampler() n_samples = int(n_samples.replace(",", "")) n_out = int(n_out.replace(",", "")) - n_threads = _parallel_to_n_threads(parallel) x = np.arange(n_samples) y = np.random.randn(n_samples).astype(dtype) - benchmark(downsampler.downsample, x, y, n_out=n_out, n_threads=n_threads) + benchmark(downsampler.downsample, x, y, n_out=n_out, parallel=parallel) # --------------------------------------------------------------------------- # @@ -75,11 +67,10 @@ def test_m4_no_x(benchmark, n_samples, n_out, dtype, parallel): downsampler = M4Downsampler() n_samples = int(n_samples.replace(",", "")) n_out = int(n_out.replace(",", "")) - n_threads = _parallel_to_n_threads(parallel) y = np.random.randn(n_samples).astype(dtype) - benchmark(downsampler.downsample, y, n_out=n_out, n_threads=n_threads) + benchmark(downsampler.downsample, y, n_out=n_out, parallel=parallel) @pytest.mark.benchmark(group="m4") @@ -92,12 +83,11 @@ def test_m4_with_x(benchmark, n_samples, n_out, dtype, parallel): downsampler = M4Downsampler() n_samples = int(n_samples.replace(",", "")) n_out = int(n_out.replace(",", "")) - n_threads = _parallel_to_n_threads(parallel) x = np.arange(n_samples) y = np.random.randn(n_samples).astype(dtype) - benchmark(downsampler.downsample, x, y, n_out=n_out, n_threads=n_threads) + benchmark(downsampler.downsample, x, y, n_out=n_out, parallel=parallel) # --------------------------------------------------------------------------- # @@ -115,11 +105,10 @@ def test_lttb_no_x(benchmark, n_samples, n_out, dtype, parallel): downsampler = LTTBDownsampler() n_samples = int(n_samples.replace(",", "")) n_out = int(n_out.replace(",", "")) - n_threads = _parallel_to_n_threads(parallel) y = np.random.randn(n_samples).astype(dtype) - benchmark(downsampler.downsample, y, n_out=n_out, n_threads=n_threads) + benchmark(downsampler.downsample, y, n_out=n_out, parallel=parallel) @pytest.mark.benchmark(group="lttb") @@ -132,12 +121,11 @@ def test_lttb_with_x(benchmark, n_samples, n_out, dtype, parallel): downsampler = LTTBDownsampler() n_samples = int(n_samples.replace(",", "")) n_out = int(n_out.replace(",", "")) - n_threads = _parallel_to_n_threads(parallel) x = np.arange(n_samples) y = np.random.randn(n_samples).astype(dtype) - benchmark(downsampler.downsample, x, y, n_out=n_out, n_threads=n_threads) + benchmark(downsampler.downsample, x, y, n_out=n_out, parallel=parallel) # --------------------------------------------------------------------------- # @@ -155,11 +143,10 @@ def test_minmaxlttb_no_x(benchmark, n_samples, n_out, dtype, parallel): downsampler = MinMaxLTTBDownsampler() n_samples = int(n_samples.replace(",", "")) n_out = int(n_out.replace(",", "")) - n_threads = _parallel_to_n_threads(parallel) y = np.random.randn(n_samples).astype(dtype) - benchmark(downsampler.downsample, y, n_out=n_out, n_threads=n_threads) + benchmark(downsampler.downsample, y, n_out=n_out, parallel=parallel) @pytest.mark.benchmark(group="minmaxlttb") @@ -172,12 +159,11 @@ def test_minmaxlttb_with_x(benchmark, n_samples, n_out, dtype, parallel): downsampler = MinMaxLTTBDownsampler() n_samples = int(n_samples.replace(",", "")) n_out = int(n_out.replace(",", "")) - n_threads = _parallel_to_n_threads(parallel) x = np.arange(n_samples) y = np.random.randn(n_samples).astype(dtype) - benchmark(downsampler.downsample, x, y, n_out=n_out, n_threads=n_threads) + benchmark(downsampler.downsample, x, y, n_out=n_out, parallel=parallel) # --------------------------------------------------------------------------- # diff --git a/tests/test_tsdownsample.py b/tests/test_tsdownsample.py index 961a031..71abfd6 100644 --- a/tests/test_tsdownsample.py +++ b/tests/test_tsdownsample.py @@ -89,7 +89,7 @@ def test_parallel_downsampling(downsampler: AbstractDownsampler): """Test parallel downsampling.""" arr = np.random.randn(10_000).astype(np.float32) s_downsampled = downsampler.downsample(arr, n_out=100) - s_downsampled_p = downsampler.downsample(arr, n_out=100, n_threads=2) + s_downsampled_p = downsampler.downsample(arr, n_out=100, parallel=True) assert np.all(s_downsampled == s_downsampled_p) @@ -99,7 +99,7 @@ def test_parallel_downsampling_with_x(downsampler: AbstractDownsampler): arr = np.random.randn(10_001).astype(np.float32) # 10_001 to test edge case idx = np.arange(len(arr)) s_downsampled = downsampler.downsample(idx, arr, n_out=100) - s_downsampled_p = downsampler.downsample(idx, arr, n_out=100, n_threads=2) + s_downsampled_p = downsampler.downsample(idx, arr, n_out=100, parallel=True) assert np.all(s_downsampled == s_downsampled_p) @@ -170,7 +170,7 @@ def test_downsampling_no_out_of_bounds_different_dtypes( for dtype in supported_dtypes_y: arr = arr_orig.astype(dtype) s_downsampled = downsampler.downsample(arr, n_out=76) - s_downsampled_p = downsampler.downsample(arr, n_out=76, n_threads=2) + s_downsampled_p = downsampler.downsample(arr, n_out=76, parallel=True) assert np.all(s_downsampled == s_downsampled_p) if dtype is not np.bool_: res += [s_downsampled] @@ -191,7 +191,7 @@ def test_downsampling_no_out_of_bounds_different_dtypes_with_x( for dtype_y in supported_dtypes_y: arr = arr_orig.astype(dtype_y) s_downsampled = downsampler.downsample(idx, arr, n_out=76) - s_downsampled_p = downsampler.downsample(idx, arr, n_out=76, n_threads=2) + s_downsampled_p = downsampler.downsample(idx, arr, n_out=76, parallel=True) assert np.all(s_downsampled == s_downsampled_p) if dtype_y is not np.bool_: res += [s_downsampled] @@ -248,25 +248,25 @@ def test_error_invalid_args(): arr = np.random.randint(0, 100, size=10_000) # No args with pytest.raises(ValueError) as e_msg: - MinMaxDownsampler().downsample(n_out=100, n_threads=2) + MinMaxDownsampler().downsample(n_out=100, parallel=True) assert "takes 1 or 2 positional arguments" in str(e_msg.value) # Too many args with pytest.raises(ValueError) as e_msg: - MinMaxDownsampler().downsample(arr, arr, arr, n_out=100, n_threads=2) + MinMaxDownsampler().downsample(arr, arr, arr, n_out=100, parallel=True) assert "takes 1 or 2 positional arguments" in str(e_msg.value) # Invalid y with pytest.raises(ValueError) as e_msg: - MinMaxDownsampler().downsample(arr.reshape(5, 2_000), n_out=100, n_threads=2) + MinMaxDownsampler().downsample(arr.reshape(5, 2_000), n_out=100, parallel=True) assert "y must be 1D" in str(e_msg.value) # Invalid x with pytest.raises(ValueError) as e_msg: MinMaxDownsampler().downsample( - arr.reshape(5, 2_000), arr, n_out=100, n_threads=2 + arr.reshape(5, 2_000), arr, n_out=100, parallel=True ) assert "x must be 1D" in str(e_msg.value) # Invalid x and y (different length) with pytest.raises(ValueError) as e_msg: - MinMaxDownsampler().downsample(arr, arr[:-1], n_out=100, n_threads=2) + MinMaxDownsampler().downsample(arr, arr[:-1], n_out=100, parallel=True) assert "x and y must have the same length" in str(e_msg.value) diff --git a/tsdownsample/__init__.py b/tsdownsample/__init__.py index 4a5a6d6..6b1fbcf 100644 --- a/tsdownsample/__init__.py +++ b/tsdownsample/__init__.py @@ -8,7 +8,7 @@ MinMaxLTTBDownsampler, ) -__version__ = "0.1.3rc1" +__version__ = "0.1.3rc2" __author__ = "Jeroen Van Der Donckt" __all__ = [ diff --git a/tsdownsample/downsamplers.py b/tsdownsample/downsamplers.py index 53fb4b1..3cf30af 100644 --- a/tsdownsample/downsamplers.py +++ b/tsdownsample/downsamplers.py @@ -45,11 +45,11 @@ def rust_mod(self): return _tsdownsample_rs.minmaxlttb def downsample( - self, *args, n_out: int, minmax_ratio: int = 4, n_threads: int = 1, **_ + self, *args, n_out: int, minmax_ratio: int = 4, parallel: bool = False, **_ ): assert minmax_ratio > 0, "minmax_ratio must be greater than 0" return super().downsample( - *args, n_out=n_out, n_threads=n_threads, ratio=minmax_ratio + *args, n_out=n_out, parallel=parallel, ratio=minmax_ratio ) diff --git a/tsdownsample/downsampling_interface.py b/tsdownsample/downsampling_interface.py index 3be2578..6a0fcb0 100644 --- a/tsdownsample/downsampling_interface.py +++ b/tsdownsample/downsampling_interface.py @@ -314,13 +314,11 @@ def _downsample( x: Union[np.ndarray, None], y: np.ndarray, n_out: int, - n_threads: int = 1, + parallel: bool = False, **kwargs, ) -> np.ndarray: """Downsample the data in x and y.""" mod = self.mod_single_core - is_multi_core = False - parallel = n_threads > 1 if parallel: if self.mod_multi_core is None: name = self.__class__.__name__ @@ -330,7 +328,6 @@ def _downsample( ) else: mod = self.mod_multi_core - is_multi_core = True ## Viewing the y-data as different dtype (if necessary) if y.dtype == "bool": # bool is viewed as int8 @@ -344,10 +341,7 @@ def _downsample( ## Viewing the x-data as different dtype (if necessary) if x is None: downsample_f = self._switch_mod_with_y(y.dtype, mod) - if is_multi_core: - return downsample_f(y, n_out, n_threads=n_threads, **kwargs) - else: - return downsample_f(y, n_out, **kwargs) + return downsample_f(y, n_out, **kwargs) elif np.issubdtype(x.dtype, np.datetime64): # datetime64 is viewed as int64 x = x.view(dtype=np.int64) @@ -356,16 +350,17 @@ def _downsample( x = x.view(dtype=np.int64) ## Getting the appropriate downsample function downsample_f = self._switch_mod_with_x_and_y(x.dtype, y.dtype, mod) - if is_multi_core: - return downsample_f(x, y, n_out, n_threads=n_threads, **kwargs) - else: - return downsample_f(x, y, n_out, **kwargs) + return downsample_f(x, y, n_out, **kwargs) - def downsample( - self, *args, n_out: int, n_threads: int = 1, **kwargs # x and y are optional - ): - """Downsample the data in x and y.""" - return super().downsample(*args, n_out=n_out, n_threads=n_threads, **kwargs) + def downsample(self, *args, n_out: int, parallel: bool = False, **kwargs): + """Downsample the data in x and y. + + The x and y arguments are positional-only arguments. If only one argument is + passed, it is considered to be the y-data. If two arguments are passed, the + first argument is considered to be the x-data and the second argument is + considered to be the y-data. + """ + return super().downsample(*args, n_out=n_out, parallel=parallel, **kwargs) def __deepcopy__(self, memo): """Deepcopy the object.""" From e4d4a66e40f3bf0411acf4aa42682e973fea137e Mon Sep 17 00:00:00 2001 From: Jeroen Van Der Donckt <18898740+jvdd@users.noreply.github.com> Date: Mon, 15 Jan 2024 15:19:34 +0100 Subject: [PATCH 5/5] :arrow_up: update dependencies (#69) * :arrow_up: update dependencies * :arrow_up: update codspeed benchmarking action --- .github/workflows/codspeed.yml | 2 +- Cargo.toml | 8 ++++---- downsample_rs/Cargo.toml | 6 +++--- downsample_rs/src/lib.rs | 1 + 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/.github/workflows/codspeed.yml b/.github/workflows/codspeed.yml index c9e31cb..7475832 100644 --- a/.github/workflows/codspeed.yml +++ b/.github/workflows/codspeed.yml @@ -43,6 +43,6 @@ jobs: # - run: rm tests/__init__.py - name: Run CodSpeed benchmarks - uses: CodSpeedHQ/action@v1 + uses: CodSpeedHQ/action@v2 with: run: pytest tests/benchmarks/ --codspeed diff --git a/Cargo.toml b/Cargo.toml index bc18a23..184e8c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,10 +9,10 @@ license = "MIT" [dependencies] downsample_rs = { path = "downsample_rs", features = ["half"]} -pyo3 = { version = "0.19", features = ["extension-module"] } -numpy = { version = "0.19", features = ["half"] } -half = { version = "2.1", default-features = false } -paste = { version = "1.0.9", default-features = false } +pyo3 = { version = "0.20", features = ["extension-module"] } +numpy = { version = "0.20", features = ["half"] } +half = { version = "2.3.1", default-features = false } +paste = { version = "1.0.14", default-features = false } [lib] name = "tsdownsample" diff --git a/downsample_rs/Cargo.toml b/downsample_rs/Cargo.toml index ca0ee16..107469b 100644 --- a/downsample_rs/Cargo.toml +++ b/downsample_rs/Cargo.toml @@ -9,13 +9,13 @@ license = "MIT" [dependencies] # TODO: perhaps use polars? argminmax = { version = "0.6.1", features = ["half"] } -half = { version = "2.1", default-features = false , features=["num-traits"], optional = true} -num-traits = { version = "0.2.15", default-features = false } +half = { version = "2.3.1", default-features = false , features=["num-traits"], optional = true} +num-traits = { version = "0.2.17", default-features = false } once_cell = "1" rayon = { version = "1.8.0", default-features = false } [dev-dependencies] -rstest = { version = "0.18.1", default-features = false } +rstest = { version = "0.18.2", default-features = false } rstest_reuse = { version = "0.6", default-features = false } criterion = "0.5.1" dev_utils = { path = "dev_utils" } diff --git a/downsample_rs/src/lib.rs b/downsample_rs/src/lib.rs index 1e222c0..409294d 100644 --- a/downsample_rs/src/lib.rs +++ b/downsample_rs/src/lib.rs @@ -18,6 +18,7 @@ pub(crate) mod types; use once_cell::sync::Lazy; use rayon::{ThreadPool, ThreadPoolBuilder}; +// Inspired by: https://github.com/pola-rs/polars/blob/9a69062aa0beb2a1bc5d57294cac49961fc91058/crates/polars-core/src/lib.rs#L49 pub static POOL: Lazy = Lazy::new(|| { ThreadPoolBuilder::new() .num_threads(