Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory pool waiters management #334

Merged
merged 3 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ntex-bytes/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.1.25] (2024-04-02)

* Fix pool waiters management

## [0.1.24] (2024-02-01)

* Add `checked` api
Expand Down
14 changes: 8 additions & 6 deletions ntex-bytes/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
[package]
name = "ntex-bytes"
version = "0.1.24"
version = "0.1.25"
authors = ["Nikolay Kim <fafhrd91@gmail.com>", "Carl Lerche <me@carllerche.com>"]
description = "Types and traits for working with bytes (bytes crate fork)"
documentation = "https://docs.rs/ntex-bytes"
repository = "https://github.com/ntex-rs/ntex.git"
repository = "https://github.com/ntex-rs/ntex"
readme = "README.md"
keywords = ["buffers", "zero-copy", "io"]
categories = ["network-programming", "data-structures"]
Expand All @@ -18,13 +18,15 @@ default = []
simd = ["simdutf8"]

[dependencies]
bitflags = "2.4"
bitflags = "2"
bytes = "1"
serde = "1"
futures-core = { version = "0.3", default-features = false, features = ["alloc"] }
futures-core = { version = "0.3", default-features = false }
simdutf8 = { version = "0.1.4", optional = true }

backtrace = "*"

[dev-dependencies]
serde_test = "1.0"
serde_json = "1.0"
serde_test = "1"
serde_json = "1"
ntex = { version = "1", features = ["tokio"] }
176 changes: 85 additions & 91 deletions ntex-bytes/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl PoolId {
#[inline]
pub fn pool(self) -> Pool {
POOLS.with(|pools| Pool {
idx: Cell::new(0),
idx: Cell::new(usize::MAX),
inner: pools[self.0 as usize],
})
}
Expand Down Expand Up @@ -462,7 +462,7 @@ impl Clone for Pool {
#[inline]
fn clone(&self) -> Pool {
Pool {
idx: Cell::new(0),
idx: Cell::new(usize::MAX),
inner: self.inner,
}
}
Expand All @@ -484,12 +484,10 @@ impl From<PoolRef> for Pool {

impl Drop for Pool {
fn drop(&mut self) {
// cleanup waiter
let idx = self.idx.get();
if idx > 0 {
// cleanup waiter
let mut waiters = self.inner.waiters.borrow_mut();
waiters.remove(idx - 1);
waiters.truncate();
if idx != usize::MAX {
self.inner.waiters.borrow_mut().remove(idx);
}
}
}
Expand All @@ -515,10 +513,8 @@ impl Pool {
/// Check if pool is ready
pub fn is_ready(&self) -> bool {
let idx = self.idx.get();
if idx > 0 {
if let Some(Entry::Occupied(_)) =
self.inner.waiters.borrow().entries.get(idx - 1)
{
if idx != usize::MAX {
if let Some(Entry::Occupied(_)) = self.inner.waiters.borrow().entries.get(idx) {
return false;
}
}
Expand All @@ -543,26 +539,26 @@ impl Pool {
let allocated = self.inner.size.load(Relaxed);
if allocated < window_l {
let idx = self.idx.get();
if idx > 0 {
if idx != usize::MAX {
// cleanup waiter
let mut waiters = self.inner.waiters.borrow_mut();
waiters.remove(idx - 1);
waiters.truncate();
self.idx.set(0);
self.inner.waiters.borrow_mut().remove(idx);
self.idx.set(usize::MAX);
}
return Poll::Ready(());
}

// register waiter only if spawn fn is provided
if let Some(spawn) = &*self.inner.spawn.borrow() {
let idx = self.idx.get();
let mut flags = self.inner.flags.get();
let mut waiters = self.inner.waiters.borrow_mut();
let new = if idx == 0 {
self.idx.set(waiters.append(ctx.waker().clone()) + 1);
true
} else {
waiters.update(idx - 1, ctx.waker().clone())
let new = {
let idx = self.idx.get();
if idx == usize::MAX {
self.idx.set(waiters.append(ctx.waker().clone()));
true
} else {
waiters.update(idx, ctx.waker().clone())
}
};

// if memory usage has increased since last window change,
Expand Down Expand Up @@ -600,7 +596,7 @@ impl Driver {
fn release(&self, waiters_num: usize) {
let mut waiters = self.pool.waiters.borrow_mut();

let mut to_release = waiters.occupied_len / 100 * 5;
let mut to_release = waiters.occupied_len >> 4;
if waiters_num > to_release {
to_release += waiters_num >> 1;
} else {
Expand Down Expand Up @@ -654,7 +650,7 @@ impl Future for Driver {
pool.flags.set(Flags::INCREASED);
return Poll::Ready(());
} else {
// release 5% of pending waiters
// release 6% of pending waiters
self.release(waiters);

if allocated > windows[idx].0 {
Expand Down Expand Up @@ -725,15 +721,6 @@ impl Waiters {
}
}

fn truncate(&mut self) {
if self.len == 0 {
self.entries.truncate(0);
self.root = usize::MAX;
self.tail = usize::MAX;
self.free = 0;
}
}

fn get_node(&mut self, key: usize) -> &mut Node {
if let Some(Entry::Occupied(ref mut node)) = self.entries.get_mut(key) {
return node;
Expand All @@ -745,11 +732,10 @@ impl Waiters {
fn consume(&mut self) -> Option<Waker> {
if self.root != usize::MAX {
self.occupied_len -= 1;
let entry =
mem::replace(self.entries.get_mut(self.root).unwrap(), Entry::Consumed);

let entry = self.entries.get_mut(self.root).unwrap();
let prev = mem::replace(entry, Entry::Consumed);

match prev {
match entry {
Entry::Occupied(node) => {
debug_assert!(node.prev == usize::MAX);

Expand All @@ -760,57 +746,63 @@ impl Waiters {
} else {
// remove from root
self.root = node.next;
self.get_node(self.root).prev = usize::MAX;
if self.root != usize::MAX {
self.get_node(self.root).prev = usize::MAX;
}
}
Some(node.item)
}
_ => {
unreachable!()
}
_ => unreachable!(),
}
} else {
None
}
}

fn update(&mut self, key: usize, val: Waker) -> bool {
if let Some(entry) = self.entries.get_mut(key) {
match entry {
Entry::Occupied(ref mut node) => {
node.item = val;
return false;
fn update(&mut self, idx: usize, val: Waker) -> bool {
let entry = self
.entries
.get_mut(idx)
.expect("Entry is expected to exist");
match entry {
Entry::Occupied(ref mut node) => {
node.item = val;
false
}
Entry::Consumed => {
// append to the tail
*entry = Entry::Occupied(Node {
item: val,
prev: self.tail,
next: usize::MAX,
});

self.occupied_len += 1;
if self.root == usize::MAX {
self.root = idx;
}
Entry::Consumed => {
*entry = Entry::Occupied(Node {
item: val,
prev: self.tail,
next: usize::MAX,
});
if self.tail != usize::MAX {
self.get_node(self.tail).next = idx;
}
_ => unreachable!(),
self.tail = idx;
true
}
Entry::Vacant(_) => unreachable!(),
}
self.occupied_len += 1;
if self.root == usize::MAX {
self.root = key;
}
if self.tail != usize::MAX {
self.get_node(self.tail).next = key;
}
self.tail = key;
true
}

fn remove(&mut self, key: usize) {
if let Some(entry) = self.entries.get_mut(key) {
// Swap the entry at the provided value
let prev = mem::replace(entry, Entry::Vacant(self.free));
let entry = mem::replace(entry, Entry::Vacant(self.free));

self.len -= 1;
self.free = key;

match prev {
match entry {
Entry::Occupied(node) => {
self.len -= 1;
self.occupied_len -= 1;
self.free = key;

// remove from root
if self.root == key {
self.root = node.next;
Expand All @@ -826,52 +818,54 @@ impl Waiters {
}
}
}
Entry::Consumed => {
self.len -= 1;
self.free = key;
}
_ => {
unreachable!()
}
Entry::Consumed => {}
Entry::Vacant(_) => unreachable!(),
}

if self.len == 0 {
self.entries.truncate(128);
}
}
}

fn append(&mut self, val: Waker) -> usize {
let idx = self.free;

self.len += 1;
self.occupied_len += 1;
let key = self.free;

if key == self.entries.len() {
if self.root == usize::MAX {
self.root = key;
}
if self.tail != usize::MAX {
self.get_node(self.tail).next = key;
}
// root points to first entry, append to empty list
if self.root == usize::MAX {
self.root = idx;
}
// tail points to last entry
if self.tail != usize::MAX {
self.get_node(self.tail).next = idx;
}

// append item to entries, first free item is not allocated yet
if idx == self.entries.len() {
self.entries.push(Entry::Occupied(Node {
item: val,
prev: self.tail,
next: usize::MAX,
}));
self.tail = key;
self.free = key + 1;
self.tail = idx;
self.free = idx + 1;
} else {
self.free = match self.entries.get(key) {
// entries has enough capacity
self.free = match self.entries.get(idx) {
Some(&Entry::Vacant(next)) => next,
_ => unreachable!(),
};
if self.tail != usize::MAX {
self.get_node(self.tail).next = key;
}
self.entries[key] = Entry::Occupied(Node {
self.entries[idx] = Entry::Occupied(Node {
item: val,
prev: self.tail,
next: usize::MAX,
});
self.tail = key;
self.tail = idx;
}
key

idx
}
}
4 changes: 2 additions & 2 deletions ntex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services"
readme = "README.md"
keywords = ["ntex", "networking", "framework", "async", "futures"]
repository = "https://github.com/ntex-rs/ntex.git"
repository = "https://github.com/ntex-rs/ntex"
documentation = "https://docs.rs/ntex/"
categories = [
"network-programming",
Expand Down Expand Up @@ -63,7 +63,7 @@ ntex-router = "0.5.3"
ntex-service = "2.0.1"
ntex-macros = "0.1.3"
ntex-util = "1.0.1"
ntex-bytes = "0.1.24"
ntex-bytes = "0.1.25"
ntex-server = "1.0.5"
ntex-h2 = "0.5.2"
ntex-rt = "0.4.12"
Expand Down
Loading