Skip to content

Commit

Permalink
Fix multithreading (#68)
Browse files Browse the repository at this point in the history
* 🙈 remove num_threads argument

* 🧹 remove n_threads in rust functions

* 🎉 finalize parallel implementation

* 🧹 remove n_threads from rust benchmarks
  • Loading branch information
jvdd authored Jan 15, 2024
1 parent 7f20573 commit ec24796
Show file tree
Hide file tree
Showing 18 changed files with 216 additions and 493 deletions.
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ downsample([x], y, n_out, **kwargs) -> ndarray[uint64]
- `x` and `y` are both positional arguments
- `n_out` is a mandatory keyword argument that defines the number of output values<sup>*</sup>
- `**kwargs` are optional keyword arguments *(see [table below](#downsampling-algorithms-📈))*:
- `n_threads`: how many threads to use for multi-threading (default `1`, so no multi-threading)
- `parallel`: whether to use multi-threading (default: `False`)
❗ The max number of threads can be configured with the `TSDOWNSAMPLE_MAX_THREADS` ENV var (e.g. `os.environ["TSDOWNSAMPLE_MAX_THREADS"] = "4"`)
- ...

**Returns**: a `ndarray[uint64]` of indices that can be used to index the original data.
Expand All @@ -99,10 +100,10 @@ The following downsampling algorithms (classes) are implemented:

| Downsampler | Description | `**kwargs` |
| ---:| --- |--- |
| `MinMaxDownsampler` | selects the **min and max** value in each bin | `n_threads` |
| `M4Downsampler` | selects the [**min, max, first and last**](https://dl.acm.org/doi/pdf/10.14778/2732951.2732953) value in each bin | `n_threads` |
| `LTTBDownsampler` | performs the [**Largest Triangle Three Buckets**](https://skemman.is/bitstream/1946/15343/3/SS_MSthesis.pdf) algorithm | `n_threads` |
| `MinMaxLTTBDownsampler` | (*new two-step algorithm 🎉*) first selects `n_out` * `minmax_ratio` **min and max** values, then further reduces these to `n_out` values using the **Largest Triangle Three Buckets** algorithm | `n_threads`, `minmax_ratio`<sup>*</sup> |
| `MinMaxDownsampler` | selects the **min and max** value in each bin | `parallel` |
| `M4Downsampler` | selects the [**min, max, first and last**](https://dl.acm.org/doi/pdf/10.14778/2732951.2732953) value in each bin | `parallel` |
| `LTTBDownsampler` | performs the [**Largest Triangle Three Buckets**](https://skemman.is/bitstream/1946/15343/3/SS_MSthesis.pdf) algorithm | `parallel` |
| `MinMaxLTTBDownsampler` | (*new two-step algorithm 🎉*) first selects `n_out` * `minmax_ratio` **min and max** values, then further reduces these to `n_out` values using the **Largest Triangle Three Buckets** algorithm | `parallel`, `minmax_ratio`<sup>*</sup> |

<sup>*</sup><i>Default value for `minmax_ratio` is 4, which is empirically proven to be a good default. More details here: https://arxiv.org/abs/2305.00332</i>

Expand Down
6 changes: 3 additions & 3 deletions downsample_rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ license = "MIT"
[dependencies]
# TODO: perhaps use polars?
argminmax = { version = "0.6.1", features = ["half"] }
# argminmax = { path = "../../argminmax" , features = ["half", "ndarray"] }
half = { version = "2.1", default-features = false , features=["num-traits"], optional = true}
num-traits = { version = "0.2.15", default-features = false }
rayon = { version = "1.6.0", default-features = false }
once_cell = "1"
rayon = { version = "1.8.0", default-features = false }

[dev-dependencies]
rstest = { version = "0.18.1", default-features = false }
rstest_reuse = { version = "0.6", default-features = false }
criterion = "0.4.0"
criterion = "0.5.1"
dev_utils = { path = "dev_utils" }

[[bench]]
Expand Down
19 changes: 2 additions & 17 deletions downsample_rs/benches/bench_m4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,8 @@ fn m4_f32_random_array_long_single_core(c: &mut Criterion) {
fn m4_f32_random_array_long_multi_core(c: &mut Criterion) {
let n = config::ARRAY_LENGTH_LONG;
let data = utils::get_random_array::<f32>(n, f32::MIN, f32::MAX);
let all_threads: usize = utils::get_all_threads();
c.bench_function("m4_p_f32", |b| {
b.iter(|| {
m4_mod::m4_without_x_parallel(
black_box(data.as_slice()),
black_box(2_000),
black_box(all_threads),
)
})
b.iter(|| m4_mod::m4_without_x_parallel(black_box(data.as_slice()), black_box(2_000)))
});
}

Expand All @@ -48,23 +41,15 @@ fn m4_f32_random_array_50M_multi_core(c: &mut Criterion) {
let n = 50_000_000;
let data = utils::get_random_array::<f32>(n, f32::MIN, f32::MAX);
let x = (0..n).map(|i| i as i32).collect::<Vec<i32>>();
let all_threads: usize = utils::get_all_threads();
c.bench_function("m4_p_50M_f32", |b| {
b.iter(|| {
m4_mod::m4_without_x_parallel(
black_box(data.as_slice()),
black_box(2_000),
black_box(all_threads),
)
})
b.iter(|| m4_mod::m4_without_x_parallel(black_box(data.as_slice()), black_box(2_000)))
});
c.bench_function("m4_x_p_50M_f32", |b| {
b.iter(|| {
m4_mod::m4_with_x_parallel(
black_box(x.as_slice()),
black_box(data.as_slice()),
black_box(2_000),
black_box(all_threads),
)
})
});
Expand Down
15 changes: 2 additions & 13 deletions downsample_rs/benches/bench_minmax.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,9 @@ fn minmax_f32_random_array_long_single_core(c: &mut Criterion) {
fn minmax_f32_random_array_long_multi_core(c: &mut Criterion) {
let n = config::ARRAY_LENGTH_LONG;
let data = utils::get_random_array::<f32>(n, f32::MIN, f32::MAX);
let all_threads: usize = utils::get_all_threads();
c.bench_function("minmax_p_f32", |b| {
b.iter(|| {
minmax_mod::min_max_without_x_parallel(
black_box(data.as_slice()),
black_box(2_000),
black_box(all_threads),
)
minmax_mod::min_max_without_x_parallel(black_box(data.as_slice()), black_box(2_000))
})
});
}
Expand Down Expand Up @@ -55,14 +50,9 @@ fn minmax_f32_random_array_50M_long_multi_core(c: &mut Criterion) {
let n = 50_000_000;
let data = utils::get_random_array::<f32>(n, f32::MIN, f32::MAX);
let x = (0..n).map(|i| i as i32).collect::<Vec<i32>>();
let all_threads: usize = utils::get_all_threads();
c.bench_function("minmax_p_50M_f32", |b| {
b.iter(|| {
minmax_mod::min_max_without_x_parallel(
black_box(data.as_slice()),
black_box(2_000),
black_box(all_threads),
)
minmax_mod::min_max_without_x_parallel(black_box(data.as_slice()), black_box(2_000))
})
});
c.bench_function("minmax_x_p_50M_f32", |b| {
Expand All @@ -71,7 +61,6 @@ fn minmax_f32_random_array_50M_long_multi_core(c: &mut Criterion) {
black_box(x.as_slice()),
black_box(data.as_slice()),
black_box(2_000),
black_box(all_threads),
)
})
});
Expand Down
6 changes: 0 additions & 6 deletions downsample_rs/benches/bench_minmaxlttb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,13 @@ fn minmaxlttb_f32_random_array_long_multi_core(c: &mut Criterion) {
let n = config::ARRAY_LENGTH_LONG;
let x = (0..n).map(|i| i as i32).collect::<Vec<i32>>();
let y = utils::get_random_array::<f32>(n, f32::MIN, f32::MAX);
let all_threads: usize = utils::get_all_threads();
c.bench_function("mlttb_x_p_f32", |b| {
b.iter(|| {
minmaxlttb_mod::minmaxlttb_with_x_parallel(
black_box(x.as_slice()),
black_box(y.as_slice()),
black_box(2_000),
black_box(MINMAX_RATIO),
black_box(all_threads),
)
})
});
Expand All @@ -59,15 +57,13 @@ fn minmaxlttb_f32_random_array_50M_multi_core(c: &mut Criterion) {
let n = 50_000_000;
let x = (0..n).map(|i| i as i32).collect::<Vec<i32>>();
let y = utils::get_random_array::<f32>(n, f32::MIN, f32::MAX);
let all_threads: usize = utils::get_all_threads();
c.bench_function("mlttb_x_p_50M_f32", |b| {
b.iter(|| {
minmaxlttb_mod::minmaxlttb_with_x_parallel(
black_box(x.as_slice()),
black_box(y.as_slice()),
black_box(2_000),
black_box(MINMAX_RATIO),
black_box(all_threads),
)
})
});
Expand All @@ -90,14 +86,12 @@ fn minmaxlttb_without_x_f32_random_array_50M_single_core(c: &mut Criterion) {
fn minmaxlttb_without_x_f32_random_array_50M_multi_core(c: &mut Criterion) {
let n = 50_000_000;
let y = utils::get_random_array::<f32>(n, f32::MIN, f32::MAX);
let all_threads: usize = utils::get_all_threads();
c.bench_function("mlttb_p_50M_f32", |b| {
b.iter(|| {
minmaxlttb_mod::minmaxlttb_without_x_parallel(
black_box(y.as_slice()),
black_box(2_000),
black_box(MINMAX_RATIO),
black_box(all_threads),
)
})
});
Expand Down
8 changes: 0 additions & 8 deletions downsample_rs/dev_utils/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,3 @@ where
}
arr
}

// ------------- Multi-threading -------------

use std::thread::available_parallelism;

pub fn get_all_threads() -> usize {
available_parallelism().map(|x| x.get()).unwrap_or(1)
}
18 changes: 18 additions & 0 deletions downsample_rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,21 @@ pub use m4::*;
pub(crate) mod helpers;
pub(crate) mod searchsorted;
pub(crate) mod types;

use once_cell::sync::Lazy;
use rayon::{ThreadPool, ThreadPoolBuilder};

pub static POOL: Lazy<ThreadPool> = Lazy::new(|| {
ThreadPoolBuilder::new()
.num_threads(
std::env::var("TSDOWNSAMPLE_MAX_THREADS")
.map(|s| s.parse::<usize>().expect("integer"))
.unwrap_or_else(|_| {
std::thread::available_parallelism()
.unwrap_or(std::num::NonZeroUsize::new(1).unwrap())
.get()
}),
)
.build()
.expect("could not spawn threads")
});
Loading

0 comments on commit ec24796

Please sign in to comment.