From 5f192fac7d279f2a432d524347bb4be4d0d7b7d3 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Wed, 7 Aug 2024 18:15:31 +0100 Subject: [PATCH] small changes --- src/arc_slice.rs | 46 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/src/arc_slice.rs b/src/arc_slice.rs index 1b1f654..7b57745 100644 --- a/src/arc_slice.rs +++ b/src/arc_slice.rs @@ -63,6 +63,7 @@ pub(crate) struct ArcSliceInnerMeta { pub(crate) struct ArcSlotInner { index: usize, wake_lock: AtomicBool, + woken: AtomicBool, next: AtomicUsize, } @@ -131,21 +132,31 @@ impl ArcSlice { impl ArcSliceInner { /// The push function from the 1024cores intrusive MPSC queue algorithm. + /// https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue /// /// Safety: index must be within capacity - pub(crate) unsafe fn push(&self, index: usize) { - self.slice - .get_unchecked(index) - .next - .store(self.meta.len + 1, Ordering::Relaxed); + pub(crate) unsafe fn push(&self, index: usize) -> bool { + let node = self.slice.get_unchecked(index); + + node.woken.store(true, Ordering::Relaxed); + // if node.wake_lock.swap(true, Ordering::SeqCst) { + // // already woken + // return false; + // } + + node.next.store(usize::MAX, Ordering::Relaxed); + let prev = self.meta.list_head.swap(index, Ordering::AcqRel); self.slice .get_unchecked(prev) .next .store(index, Ordering::Release); + + true } /// The pop function from the 1024cores intrusive MPSC queue algorithm + /// https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue /// /// Note that this is unsafe as it required mutual exclusion (only one /// thread can call this) to be guaranteed elsewhere. @@ -154,7 +165,7 @@ impl ArcSliceInner { let mut next = self.slice[tail].next.load(Ordering::Acquire); if tail == self.meta.len { - if next > self.meta.len { + if next == usize::MAX { return ReadySlot::None; } @@ -166,6 +177,8 @@ impl ArcSliceInner { if next <= self.meta.len { *self.meta.list_tail.get() = next; debug_assert!(tail != self.meta.len); + + // self.slice[tail].wake_lock.store(false, Ordering::SeqCst); return ReadySlot::Ready(tail); } @@ -179,6 +192,8 @@ impl ArcSliceInner { if next <= self.meta.len { *self.meta.list_tail.get() = next; + + // self.slice[tail].wake_lock.store(false, Ordering::SeqCst); return ReadySlot::Ready(tail); } @@ -265,13 +280,13 @@ mod slot { } pub(super) fn waker(ptr: *const ArcSlotInner) -> Waker { - static VTABLE: RawWakerVTable = - RawWakerVTable::new(clone_waker, wake, wake_by_ref, drop_waker); + static VTABLE: &RawWakerVTable = + &RawWakerVTable::new(clone_waker, wake, wake_by_ref, drop_waker); // Increment the reference count of the arc to clone it. unsafe fn clone_waker(waker: *const ()) -> RawWaker { meta_ref(waker.cast()).inc_strong(); - RawWaker::new(waker, &VTABLE) + RawWaker::new(waker, VTABLE) } // We don't need ownership. Just wake_by_ref and drop the waker @@ -283,10 +298,12 @@ mod slot { // Find the `ArcSliceInnerMeta` and push the current index value into it, // then call the stored waker to trigger a poll unsafe fn wake_by_ref(waker: *const ()) { - let slot = waker.cast(); + let slot = waker.cast::(); + let index = *core::ptr::addr_of!((*slot).index); let inner = inner_ref(slot); - inner.push((*slot).index); - inner.meta.waker.notify(); + if inner.push(index) { + inner.meta.waker.notify(); + } } // Decrement the reference count of the Arc on drop @@ -299,7 +316,7 @@ mod slot { } } - let raw_waker = RawWaker::new(ptr as *const (), &VTABLE); + let raw_waker = RawWaker::new(ptr as *const (), VTABLE); unsafe { Waker::from_raw(raw_waker) } } } @@ -435,7 +452,8 @@ impl ArcSlice { ArcSlotInner { index: i, wake_lock: AtomicBool::new(false), - next: AtomicUsize::new(cap + 1), + woken: AtomicBool::new(false), + next: AtomicUsize::new(usize::MAX), }, ); }