Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
elijahr committed Jun 29, 2024
1 parent 40351b9 commit 3ddbdc6
Show file tree
Hide file tree
Showing 21 changed files with 763 additions and 574 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
nimcache/
nimblecache/

/src/*
/tests/*
/tests/test
/tests/test.dSyM
/examples/*
/htmldocs/*

Expand Down
2 changes: 1 addition & 1 deletion .vscode/extensions.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"recommendations": ["nimsaem.nimvscode", "junknet.nimlsp"]
"recommendations": ["NimLang.nimlang"]
}
14 changes: 13 additions & 1 deletion lockfreequeues.code-workspace
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,21 @@
"tests/test.nim",
],
"nim.lintOnSave": true,
"nim.buildOnSave": true,
"nim.buildOnSave": false,
"nim.licenseString": "# lockfreequeues # © Copyright 2020 Elijah Shaw-Rutschman # # See the file \"LICENSE\", included in this distribution for details about the # copyright.",
"search.useIgnoreFiles": false,
"nimlsp.path": "/Users/elijahrutschman/.asdf/shims/nimlsp",
"search.exclude": {
"**/htmldocs": true
},
"nim.projectMapping": [{
// open files under tests using one nimsuggest instance started with root = test/all.nim
"projectPath": "tests/test.nim",
"fileRegex": "tests/.*\\.nim"
}, {
// everything else - use main.nim as root.
"projectPath": "src/lockfreequeues.nim",
"fileRegex": "src/lockfreequeues/.*\\.nim"
}]
}
}
6 changes: 5 additions & 1 deletion src/lockfreequeues.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
# See the file "LICENSE", included in this distribution for details about the
# copyright.

when compileOption("threads"):
when compileOption("threads") or defined(nimdoc):
import ./lockfreequeues/[
atomic_dsl,
constants,
exceptions,
mupmuc,
mupsic,
ops,
Expand All @@ -17,6 +18,7 @@ when compileOption("threads"):
export
atomic_dsl,
constants,
exceptions,
mupmuc,
mupsic,
ops,
Expand All @@ -26,12 +28,14 @@ else:
import ./lockfreequeues/[
atomic_dsl,
constants,
exceptions,
ops,
sipsic,
]

export
atomic_dsl,
constants,
exceptions,
ops,
sipsic
25 changes: 16 additions & 9 deletions src/lockfreequeues/atomic_dsl.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,38 @@
import atomics


proc relaxed*[T](location: var Atomic[T]): T {.inline.} =
type
Trivial* = SomeNumber | bool | enum | ptr | pointer
# A type that is known to be atomic and whose size is known at
# compile time to be 8 bytes or less


proc relaxed*[T: Trivial](location: var Atomic[T]): T {.inline.} =
## Load the value from location using moRelaxed
result = location.load(moRelaxed)


proc acquire*[T](location: var Atomic[T]): T {.inline.} =
proc acquire*[T: Trivial](location: var Atomic[T]): T {.inline.} =
## Load the value from location using moAcquire
result = location.load(moAcquire)


proc sequential*[T](location: var Atomic[T]): T {.inline.} =
proc sequential*[T: Trivial](location: var Atomic[T]): T {.inline.} =
## Load the value from location using moSequentiallyConsistent
result = location.load(moSequentiallyConsistent)


proc relaxed*[T](location: var Atomic[T], value: T) {.inline.} =
proc relaxed*[T: Trivial](location: var Atomic[T], desired: T) {.inline.} =
## Store the value in location using moRelaxed
location.store(value, moRelaxed)
location.store(desired, moRelaxed)


proc release*[T](location: var Atomic[T], value: T) {.inline.} =
proc release*[T: Trivial](location: var Atomic[T], desired: T) {.inline.} =
## Store the value in location using moRelease
location.store(value, moRelease)
location.store(desired, moRelease)


proc sequential*[T](location: var Atomic[T], value: T) {.inline.} =
proc sequential*[T: Trivial](location: var Atomic[T], desired: T) {.inline.} =
## Store the value in location using moSequentiallyConsistent
location.store(value, moSequentiallyConsistent)
location.store(desired, moSequentiallyConsistent)

13 changes: 13 additions & 0 deletions src/lockfreequeues/exceptions.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# lockfreequeues # © Copyright 2020 Elijah Shaw-Rutschman # # See the file "LICENSE", included in this distribution for details about the # copyright.

type QueueError* = object of CatchableError
type QueueIndexError* = object of QueueError ## \
## Raised by various comparison ops to indicate an invalid head or tail value.
type NoProducersAvailableError* = object of QueueError ## \
## Raised by `getProducer()` if all producers have been assigned to othe
type NoConsumersAvailableError* = object of QueueError ## \
## Raised by `getConsumer()` if all producers have been assigned to other
## threads.
type InvalidCallDefect* = object of Defect ## \
## Raised by `Mupsic.push()`, `Mupmuc.push()`, and `Mupmuc.pop()` because
## those should happen via `Producer.push()` or `Consumer.pop()`.
78 changes: 47 additions & 31 deletions src/lockfreequeues/mupmuc.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,21 @@

## A multi-producer, multi-consumer bounded queue implemented as a ring buffer.

when not compileOption("threads"):
when not compileOption("threads") or defined(nimdoc):
{.error: "lockfreequeues/mupmuc requires --threads:on option.".}

import atomics
import options

import ./atomic_dsl
import ./exceptions
import ./ops
import ./mupsic


const NoConsumerIdx* = -1 ## The initial value of `Mupmuc.prevConsumerIdx`.


type NoConsumersAvailableError* = object of CatchableError ## \
## Raised by `getConsumer()` if all consumers have been assigned to other
## threads.


type
Mupmuc*[N, P, C: static int, T] = object of Mupsic[N, P, T]
## A multi-producer, multi-consumer bounded queue implemented as a ring
Expand All @@ -43,10 +40,10 @@ type
## A per-thread interface for popping items from a queue.
## Retrieved via a call to `Mupmuc.getConsumer()`
idx*: int ## The consumer's unique identifier.
queue*: ptr Mupmuc[N, P, C, T] ## A reference to the consumer's queue.
queuePtr*: ptr Mupmuc[N, P, C, T] ## A ptr to the consumer's queue.


proc clear[N, P, C: static int, T](
proc clear*[N, P, C: static int, T](
self: var Mupmuc[N, P, C, T]
) =
self.head.sequential(0)
Expand All @@ -68,7 +65,21 @@ proc clear[N, P, C: static int, T](

proc initMupmuc*[N, P, C: static int, T](): Mupmuc[N, P, C, T] =
## Initialize a new Mupmuc queue.
result.clear()
result.head.sequential(0)
result.tail.sequential(0)

for n in 0..<N:
result.storage[n].reset()

result.prevProducerIdx.sequential(NoConsumerIdx)
for p in 0..<P:
result.producerTails[p].sequential(0)
result.producerThreadIds[p].sequential(0)

result.prevConsumerIdx.sequential(NoConsumerIdx)
for c in 0..<C:
result.consumerHeads[c].sequential(0)
result.consumerThreadIds[c].sequential(0)


proc getConsumer*[N, P, C: static int, T](
Expand All @@ -77,9 +88,9 @@ proc getConsumer*[N, P, C: static int, T](
): Consumer[N, P, C, T]
{.raises: [NoConsumersAvailableError].} =
## Assigns and returns a `Consumer` instance for the current thread.
result.queue = addr(self)
result.queuePtr = addr self

if idx >= 0:
if likely(idx >= 0):
result.idx = idx
return

Expand Down Expand Up @@ -112,7 +123,7 @@ proc getConsumer*[N, P, C: static int, T](


proc pop*[N, P, C: static int, T](
self: Consumer[N, P, C, T],
self: var Consumer[N, P, C, T],
): Option[T] =
## Pop a single item from the queue.
## If the queue is empty, `none(T)` is returned.
Expand All @@ -125,48 +136,50 @@ proc pop*[N, P, C: static int, T](

# spin until reservation is acquired
while true:
prevConsumerIdx = self.queue.prevConsumerIdx.acquire
prevConsumerIdx = self.queuePtr.prevConsumerIdx.acquire
isFirstConsumption = prevConsumerIdx == NoConsumerIdx
var tail = self.queue.tail.sequential
var tail = self.queuePtr.tail.sequential
prevHead =
if isFirstConsumption:
0
else:
self.queue.consumerHeads[prevConsumerIdx].acquire
self.queuePtr.consumerHeads[prevConsumerIdx].acquire

if unlikely(empty(prevHead, tail, N)):
if prevHead != tail:
echo "empty but prevHead and tail are not equal! prevHead=" & $prevHead & " tail=" & $tail
return none(T)

newHead = incOrReset(prevHead, 1, N)
self.queue.consumerHeads[self.idx].release(newHead)
self.queuePtr.consumerHeads[self.idx].release(newHead)

if self.queue.prevConsumerIdx.compareExchangeWeak(
if self.queuePtr.prevConsumerIdx.compareExchangeWeak(
prevConsumerIdx,
self.idx,
moRelease,
moAcquire,
):
break

result = some(self.queue.storage[index(prevHead, N)])
result = some(self.queuePtr.storage[index(prevHead, N)])

# Wait for prev consumer to update head, then update head
if not isFirstConsumption:
while true:
var expectedHead = prevHead
if self.queue.head.compareExchangeWeak(
if self.queuePtr.head.compareExchangeWeak(
expectedHead,
newHead,
moRelease,
moAcquire,
):
break
else:
self.queue.head.release(newHead)
self.queuePtr.head.release(newHead)


proc pop*[N, P, C: static int, T](
self: Consumer[N, P, C, T],
self: var Consumer[N, P, C, T],
count: int,
): Option[seq[T]] =
## Pop `count` items from the queue.
Expand All @@ -186,14 +199,14 @@ proc pop*[N, P, C: static int, T](

# spin until reservation is acquired
while true:
prevConsumerIdx = self.queue.prevConsumerIdx.acquire
prevConsumerIdx = self.queuePtr.prevConsumerIdx.acquire
isFirstConsumption = prevConsumerIdx == NoConsumerIdx
tail = self.queue.tail.sequential
tail = self.queuePtr.tail.sequential
prevHead =
if isFirstConsumption:
0
else:
self.queue.consumerHeads[prevConsumerIdx].acquire
self.queuePtr.consumerHeads[prevConsumerIdx].acquire

used = used(prevHead, tail, N)
if likely(used >= count):
Expand All @@ -207,9 +220,9 @@ proc pop*[N, P, C: static int, T](
actualCount = min(used, N)

newHead = incOrReset(prevHead, actualCount, N)
self.queue.consumerHeads[self.idx].release(newHead)
self.queuePtr.consumerHeads[self.idx].release(newHead)

if self.queue.prevConsumerIdx.compareExchangeWeak(
if self.queuePtr.prevConsumerIdx.compareExchangeWeak(
prevConsumerIdx,
self.idx,
moRelease,
Expand All @@ -226,21 +239,21 @@ proc pop*[N, P, C: static int, T](
if start > stop:
# data may wrap
let pivot = (N-1) - start
items[0..pivot] = self.queue.storage[start..start+pivot]
items[0..pivot] = self.queuePtr.storage[start..start+pivot]
if stop > 0:
# data wraps
items[pivot+1..pivot+1+stop] = self.queue.storage[0..stop]
items[pivot+1..pivot+1+stop] = self.queuePtr.storage[0..stop]
else:
# data does not wrap
items[0..stop-start] = self.queue.storage[start..stop]
items[0..stop-start] = self.queuePtr.storage[start..stop]

result = some(items)

# Wait for prev consumer to update head, then update head
if not isFirstConsumption:
while true:
var expectedHead = prevHead
if self.queue.head.compareExchangeWeak(
if self.queuePtr.head.compareExchangeWeak(
expectedHead,
newHead,
moRelease,
Expand All @@ -249,7 +262,7 @@ proc pop*[N, P, C: static int, T](
break

elif isFirstConsumption:
self.queue.head.release(newHead)
self.queuePtr.head.release(newHead)


proc pop*[N, P, C: static int, T](
Expand Down Expand Up @@ -277,6 +290,9 @@ proc consumerCount*[N, P, C: static int, T](
result = C


proc `=copy`*[N, P, C: static int, T](a: var Mupmuc[N, P, C, T], b: Mupmuc[N, P, C, T]) {.error.}


when defined(testing):
import sugar
from unittest import check
Expand Down
Loading

0 comments on commit 3ddbdc6

Please sign in to comment.