Skip to content

Commit

Permalink
Merge branch 'main' into feature/polars-integration
Browse files Browse the repository at this point in the history
  • Loading branch information
jvdd authored Feb 16, 2024
2 parents d3d71cd + dbf9eea commit f633114
Show file tree
Hide file tree
Showing 25 changed files with 1,091 additions and 680 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci-tsdownsample.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:
matrix:
os: ['windows-latest', 'macOS-latest', 'ubuntu-latest']
rust: ['nightly'] # ['stable', 'beta']
python-version: ['3.7', '3.8', '3.9', '3.10', '3.11']
python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', '3.12']

env:
PYTHON: ${{ matrix.python-version }}
Expand Down Expand Up @@ -155,7 +155,7 @@ jobs:
target: ${{ matrix.target }}
manylinux: ${{ matrix.manylinux || 'auto' }}
container: ${{ matrix.container }}
args: --release --out dist --interpreter ${{ matrix.interpreter || '3.7 3.8 3.9 3.10 3.11' }}
args: --release --out dist --interpreter ${{ matrix.interpreter || '3.7 3.8 3.9 3.10 3.11 3.12' }}

- run: ${{ matrix.ls || 'ls -lh' }} dist/

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/codspeed.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ jobs:
# - run: rm tests/__init__.py

- name: Run CodSpeed benchmarks
uses: CodSpeedHQ/action@v1
uses: CodSpeedHQ/action@v2
with:
run: pytest tests/benchmarks/ --codspeed
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ license = "MIT"

[dependencies]
downsample_rs = { path = "downsample_rs", features = ["half"]}
pyo3 = { version = "0.19", features = ["extension-module"] }
numpy = { version = "0.19", features = ["half"] }
half = { version = "2.1", default-features = false }
paste = { version = "1.0.9", default-features = false }
pyo3 = { version = "0.20", features = ["extension-module"] }
numpy = { version = "0.20", features = ["half"] }
half = { version = "2.3.1", default-features = false }
paste = { version = "1.0.14", default-features = false }

[lib]
name = "tsdownsample"
Expand Down
50 changes: 35 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
[![CodeQL](https://github.com/predict-idlab/tsdownsample/actions/workflows/codeql.yml/badge.svg)](https://github.com/predict-idlab/tsdownsample/actions/workflows/codeql.yml)
[![Testing](https://github.com/predict-idlab/tsdownsample/actions/workflows/ci-downsample_rs.yml/badge.svg)](https://github.com/predict-idlab/tsdownsample/actions/workflows/ci-downsample_rs.yml)
[![Testing](https://github.com/predict-idlab/tsdownsample/actions/workflows/ci-tsdownsample.yml/badge.svg)](https://github.com/predict-idlab/tsdownsample/actions/workflows/ci-tsdownsample.yml)

<!-- TODO: codecov -->

Extremely fast **time series downsampling 📈** for visualization, written in Rust.

## Features ✨

* **Fast**: written in rust with PyO3 bindings
- **Fast**: written in rust with PyO3 bindings
- leverages optimized [argminmax](https://github.com/jvdd/argminmax) - which is SIMD accelerated with runtime feature detection
- scales linearly with the number of data points
<!-- TODO check if it scales sublinearly -->
Expand All @@ -25,21 +26,21 @@ Extremely fast **time series downsampling 📈** for visualization, written in R
</blockquote>
In Rust - which is a compiled language - there is no GIL, so CPU-bound tasks can be parallelized (with <a href="https://github.com/rayon-rs/rayon">Rayon</a>) with little to no overhead.
</details>
* **Efficient**: memory efficient
- **Efficient**: memory efficient
- works on views of the data (no copies)
- no intermediate data structures are created
* **Flexible**: works on any type of data
- supported datatypes are
- for `x`: `f32`, `f64`, `i16`, `i32`, `i64`, `u16`, `u32`, `u64`, `datetime64`, `timedelta64`
- for `y`: `f16`, `f32`, `f64`, `i8`, `i16`, `i32`, `i64`, `u8`, `u16`, `u32`, `u64`, `datetime64`, `timedelta64`, `bool`
- **Flexible**: works on any type of data
- supported datatypes are
- for `x`: `f32`, `f64`, `i16`, `i32`, `i64`, `u16`, `u32`, `u64`, `datetime64`, `timedelta64`
- for `y`: `f16`, `f32`, `f64`, `i8`, `i16`, `i32`, `i64`, `u8`, `u16`, `u32`, `u64`, `datetime64`, `timedelta64`, `bool`
<details>
<summary><i>!! 🚀 <code>f16</code> <a href="https://github.com/jvdd/argminmax">argminmax</a> is 200-300x faster than numpy</i></summary>
In contrast with all other data types above, <code>f16</code> is *not* hardware supported (i.e., no instructions for f16) by most modern CPUs!! <br>
🐌 Programming languages facilitate support for this datatype by either (i) upcasting to <u>f32</u> or (ii) using a software implementation. <br>
💡 As for argminmax, only comparisons are needed - and thus no arithmetic operations - creating a <u>symmetrical ordinal mapping from <code>f16</code> to <code>i16</code></u> is sufficient. This mapping allows to use the hardware supported scalar and SIMD <code>i16</code> instructions - while not producing any memory overhead 🎉 <br>
<i>More details are described in <a href="https://github.com/jvdd/argminmax/pull/1">argminmax PR #1</a>.</i>
</details>
* **Easy to use**: simple & flexible API
- **Easy to use**: simple & flexible API

## Install

Expand Down Expand Up @@ -83,35 +84,54 @@ downsample([x], y, n_out, **kwargs) -> ndarray[uint64]
```

**Arguments**:

- `x` is optional
- `x` and `y` are both positional arguments
- `n_out` is a mandatory keyword argument that defines the number of output values<sup>*</sup>
- `**kwargs` are optional keyword arguments *(see [table below](#downsampling-algorithms-📈))*:
- `n_threads`: how many threads to use for multi-threading (default `1`, so no multi-threading)
- `parallel`: whether to use multi-threading (default: `False`)
❗ The max number of threads can be configured with the `TSDOWNSAMPLE_MAX_THREADS` ENV var (e.g. `os.environ["TSDOWNSAMPLE_MAX_THREADS"] = "4"`)
- ...

**Returns**: a `ndarray[uint64]` of indices that can be used to index the original data.

<sup>*</sup><i>When there are gaps in the time series, fewer than `n_out` indices may be returned.</i>
<sup>\*</sup><i>When there are gaps in the time series, fewer than `n_out` indices may be returned.</i>

### Downsampling algorithms 📈

The following downsampling algorithms (classes) are implemented:

| Downsampler | Description | `**kwargs` |
| ---:| --- |--- |
| `MinMaxDownsampler` | selects the **min and max** value in each bin | `n_threads` |
| `M4Downsampler` | selects the [**min, max, first and last**](https://dl.acm.org/doi/pdf/10.14778/2732951.2732953) value in each bin | `n_threads` |
| `LTTBDownsampler` | performs the [**Largest Triangle Three Buckets**](https://skemman.is/bitstream/1946/15343/3/SS_MSthesis.pdf) algorithm | `n_threads` |
| `MinMaxLTTBDownsampler` | (*new two-step algorithm 🎉*) first selects `n_out` * `minmax_ratio` **min and max** values, then further reduces these to `n_out` values using the **Largest Triangle Three Buckets** algorithm | `n_threads`, `minmax_ratio`<sup>*</sup> |
| `MinMaxDownsampler` | selects the **min and max** value in each bin | `parallel` |
| `M4Downsampler` | selects the [**min, max, first and last**](https://dl.acm.org/doi/pdf/10.14778/2732951.2732953) value in each bin | `parallel` |
| `LTTBDownsampler` | performs the [**Largest Triangle Three Buckets**](https://skemman.is/bitstream/1946/15343/3/SS_MSthesis.pdf) algorithm | `parallel` |
| `MinMaxLTTBDownsampler` | (*new two-step algorithm 🎉*) first selects `n_out` * `minmax_ratio` **min and max** values, then further reduces these to `n_out` values using the **Largest Triangle Three Buckets** algorithm | `parallel`, `minmax_ratio`<sup>*</sup> |

<sup>*</sup><i>Default value for `minmax_ratio` is 4, which is empirically proven to be a good default. More details here: https://arxiv.org/abs/2305.00332</i>

<sup>*</sup><i>Default value for `minmax_ratio` is 30, which is empirically proven to be a good default. (More details in our upcomming paper)</i>
### Handling NaNs

This library supports two `NaN`-policies:

1. Omit `NaN`s (`NaN`s are ignored during downsampling).
2. Return index of first `NaN` once there is at least one present in the bin of the considered data.

| Omit `NaN`s | Return `NaN`s |
| ----------------------: | :------------------------- |
| `MinMaxDownsampler` | `NaNMinMaxDownsampler` |
| `M4Downsampler` | `NaNM4Downsampler` |
| `MinMaxLTTBDownsampler` | `NaNMinMaxLTTBDownsampler` |
| `LTTBDownsampler` | |

> Note that NaNs are not supported for `x`-data.
## Limitations & assumptions 🚨

Assumes;

1. `x`-data is (non-strictly) monotonic increasing (i.e., sorted)
2. no `NaNs` in the data
2. no `NaN`s in `x`-data

---

Expand Down
12 changes: 6 additions & 6 deletions downsample_rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ license = "MIT"
argminmax = { version = "0.6.1", features = ["half"] }
# For some reason we need to explicitely add chrono, otherwise polars refuses to compile?
chrono = "0.4.31"
# argminmax = { path = "../../argminmax" , features = ["half", "ndarray"] }
half = { version = "2.1", default-features = false , features=["num-traits"], optional = true}
num-traits = { version = "0.2.15", default-features = false }
polars = { version = "0.33.2", features = ["lazy", "streaming", "zip_with"] }
rayon = { version = "1.6.0", default-features = false }
half = { version = "2.3.1", default-features = false , features=["num-traits"], optional = true}
num-traits = { version = "0.2.17", default-features = false }
once_cell = "1"
rayon = { version = "1.8.0", default-features = false }

[dev-dependencies]
rstest = { version = "0.18.1", default-features = false }
rstest = { version = "0.18.2", default-features = false }
rstest_reuse = { version = "0.6", default-features = false }
criterion = "0.4.0"
criterion = "0.5.1"
dev_utils = { path = "dev_utils" }

[[bench]]
Expand Down
19 changes: 2 additions & 17 deletions downsample_rs/benches/bench_m4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,8 @@ fn m4_f32_random_array_long_single_core(c: &mut Criterion) {
fn m4_f32_random_array_long_multi_core(c: &mut Criterion) {
let n = config::ARRAY_LENGTH_LONG;
let data = utils::get_random_array::<f32>(n, f32::MIN, f32::MAX);
let all_threads: usize = utils::get_all_threads();
c.bench_function("m4_p_f32", |b| {
b.iter(|| {
m4_mod::m4_without_x_parallel(
black_box(data.as_slice()),
black_box(2_000),
black_box(all_threads),
)
})
b.iter(|| m4_mod::m4_without_x_parallel(black_box(data.as_slice()), black_box(2_000)))
});
}

Expand All @@ -48,23 +41,15 @@ fn m4_f32_random_array_50M_multi_core(c: &mut Criterion) {
let n = 50_000_000;
let data = utils::get_random_array::<f32>(n, f32::MIN, f32::MAX);
let x = (0..n).map(|i| i as i32).collect::<Vec<i32>>();
let all_threads: usize = utils::get_all_threads();
c.bench_function("m4_p_50M_f32", |b| {
b.iter(|| {
m4_mod::m4_without_x_parallel(
black_box(data.as_slice()),
black_box(2_000),
black_box(all_threads),
)
})
b.iter(|| m4_mod::m4_without_x_parallel(black_box(data.as_slice()), black_box(2_000)))
});
c.bench_function("m4_x_p_50M_f32", |b| {
b.iter(|| {
m4_mod::m4_with_x_parallel(
black_box(x.as_slice()),
black_box(data.as_slice()),
black_box(2_000),
black_box(all_threads),
)
})
});
Expand Down
15 changes: 2 additions & 13 deletions downsample_rs/benches/bench_minmax.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,9 @@ fn minmax_f32_random_array_long_single_core(c: &mut Criterion) {
fn minmax_f32_random_array_long_multi_core(c: &mut Criterion) {
let n = config::ARRAY_LENGTH_LONG;
let data = utils::get_random_array::<f32>(n, f32::MIN, f32::MAX);
let all_threads: usize = utils::get_all_threads();
c.bench_function("minmax_p_f32", |b| {
b.iter(|| {
minmax_mod::min_max_without_x_parallel(
black_box(data.as_slice()),
black_box(2_000),
black_box(all_threads),
)
minmax_mod::min_max_without_x_parallel(black_box(data.as_slice()), black_box(2_000))
})
});
}
Expand Down Expand Up @@ -55,14 +50,9 @@ fn minmax_f32_random_array_50M_long_multi_core(c: &mut Criterion) {
let n = 50_000_000;
let data = utils::get_random_array::<f32>(n, f32::MIN, f32::MAX);
let x = (0..n).map(|i| i as i32).collect::<Vec<i32>>();
let all_threads: usize = utils::get_all_threads();
c.bench_function("minmax_p_50M_f32", |b| {
b.iter(|| {
minmax_mod::min_max_without_x_parallel(
black_box(data.as_slice()),
black_box(2_000),
black_box(all_threads),
)
minmax_mod::min_max_without_x_parallel(black_box(data.as_slice()), black_box(2_000))
})
});
c.bench_function("minmax_x_p_50M_f32", |b| {
Expand All @@ -71,7 +61,6 @@ fn minmax_f32_random_array_50M_long_multi_core(c: &mut Criterion) {
black_box(x.as_slice()),
black_box(data.as_slice()),
black_box(2_000),
black_box(all_threads),
)
})
});
Expand Down
6 changes: 0 additions & 6 deletions downsample_rs/benches/bench_minmaxlttb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,13 @@ fn minmaxlttb_f32_random_array_long_multi_core(c: &mut Criterion) {
let n = config::ARRAY_LENGTH_LONG;
let x = (0..n).map(|i| i as i32).collect::<Vec<i32>>();
let y = utils::get_random_array::<f32>(n, f32::MIN, f32::MAX);
let all_threads: usize = utils::get_all_threads();
c.bench_function("mlttb_x_p_f32", |b| {
b.iter(|| {
minmaxlttb_mod::minmaxlttb_with_x_parallel(
black_box(x.as_slice()),
black_box(y.as_slice()),
black_box(2_000),
black_box(MINMAX_RATIO),
black_box(all_threads),
)
})
});
Expand All @@ -59,15 +57,13 @@ fn minmaxlttb_f32_random_array_50M_multi_core(c: &mut Criterion) {
let n = 50_000_000;
let x = (0..n).map(|i| i as i32).collect::<Vec<i32>>();
let y = utils::get_random_array::<f32>(n, f32::MIN, f32::MAX);
let all_threads: usize = utils::get_all_threads();
c.bench_function("mlttb_x_p_50M_f32", |b| {
b.iter(|| {
minmaxlttb_mod::minmaxlttb_with_x_parallel(
black_box(x.as_slice()),
black_box(y.as_slice()),
black_box(2_000),
black_box(MINMAX_RATIO),
black_box(all_threads),
)
})
});
Expand All @@ -90,14 +86,12 @@ fn minmaxlttb_without_x_f32_random_array_50M_single_core(c: &mut Criterion) {
fn minmaxlttb_without_x_f32_random_array_50M_multi_core(c: &mut Criterion) {
let n = 50_000_000;
let y = utils::get_random_array::<f32>(n, f32::MIN, f32::MAX);
let all_threads: usize = utils::get_all_threads();
c.bench_function("mlttb_p_50M_f32", |b| {
b.iter(|| {
minmaxlttb_mod::minmaxlttb_without_x_parallel(
black_box(y.as_slice()),
black_box(2_000),
black_box(MINMAX_RATIO),
black_box(all_threads),
)
})
});
Expand Down
8 changes: 0 additions & 8 deletions downsample_rs/dev_utils/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,3 @@ where
}
arr
}

// ------------- Multi-threading -------------

use std::thread::available_parallelism;

pub fn get_all_threads() -> usize {
available_parallelism().map(|x| x.get()).unwrap_or(1)
}
19 changes: 19 additions & 0 deletions downsample_rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,22 @@ pub use m4::*;
pub(crate) mod helpers;
pub(crate) mod searchsorted;
pub(crate) mod types;

use once_cell::sync::Lazy;
use rayon::{ThreadPool, ThreadPoolBuilder};

// Inspired by: https://github.com/pola-rs/polars/blob/9a69062aa0beb2a1bc5d57294cac49961fc91058/crates/polars-core/src/lib.rs#L49
pub static POOL: Lazy<ThreadPool> = Lazy::new(|| {
ThreadPoolBuilder::new()
.num_threads(
std::env::var("TSDOWNSAMPLE_MAX_THREADS")
.map(|s| s.parse::<usize>().expect("integer"))
.unwrap_or_else(|_| {
std::thread::available_parallelism()
.unwrap_or(std::num::NonZeroUsize::new(1).unwrap())
.get()
}),
)
.build()
.expect("could not spawn threads")
});
Loading

0 comments on commit f633114

Please sign in to comment.