Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate from libsnooze #8

Merged
merged 6 commits into from
Oct 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions dub.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
],
"copyright": "Copyright © 2023, Tristan B. Kildaire",
"dependencies": {
"bformat": ">=4.1.1",
"libsnooze": ">=1.3.0-beta"
"bformat": ">=4.1.1"
},
"description": "Tristanable network message queuing framework",
"homepage": "https://deavmi.assigned.network/projects/tristanable",
Expand Down
117 changes: 66 additions & 51 deletions source/tristanable/queue.d
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@
*/
module tristanable.queue;

// TODO: Examine the below import which seemingly fixes stuff for libsnooze
import libsnooze.clib;
import libsnooze;

import core.sync.mutex : Mutex;
import core.sync.condition : Condition;
import core.sync.exception : SyncError;
import std.container.slist : SList;
import tristanable.encoding;
import core.thread : dur;
import core.time : Duration, dur;
import tristanable.exceptions;

version(unittest)
Expand All @@ -27,10 +25,15 @@ version(unittest)
public class Queue
{
/**
* The libsnooze event used to sleep/wake
* on queue events
* Mutex for the condition variable
*/
private Mutex mutex;

/**
* The condition variable used to sleep/wake
* on queue of events
*/
private Event event;
private Condition signal;

/**
* The queue of messages
Expand All @@ -47,6 +50,15 @@ public class Queue
*/
private ulong queueID;

/**
* If a message is enqueued prior
* to us sleeping then we won't
* wake up and return for it.
*
* Therefore a periodic wakeup
* is required.
*/
private Duration wakeInterval;

/**
* Constructs a new Queue and immediately sets up the notification
Expand All @@ -62,14 +74,37 @@ public class Queue
/* Initialize the queue lock */
this.queueLock = new Mutex();

/* Initialize the event */
this.event = new Event();
/* Initialize the condition variable */
this.mutex = new Mutex();
this.signal = new Condition(this.mutex);

/* Set the queue id */
this.queueID = queueID;

/* Ensure pipe existence (see https://deavmi.assigned.network/git/deavmi/tristanable/issues/5) */
event.wait(dur!("seconds")(0));
/* Set the slumber interval */
this.wakeInterval = dur!("msecs")(50); // TODO: Decide on value
}

/**
* Returns the current wake interval
* for the queue checker
*
* Returns: the `Duration`
*/
public Duration getWakeInterval()
{
return this.wakeInterval;
}

/**
* Sets the wake up interval
*
* Params:
* interval = the new interval
*/
public void setWakeInterval(Duration interval)
{
this.wakeInterval = interval;
}

/**
Expand Down Expand Up @@ -111,9 +146,9 @@ public class Queue
try
{
// TODO: Make us wait on the event (optional with a time-out)
event.notifyAll();
signal.notifyAll();
}
catch(FatalException snozErr)
catch(SyncError snozErr)
{
// Throw an exception on a fatal exception
throw new TristanableException(ErrorType.ENQUEUE_FAILED);
Expand Down Expand Up @@ -151,45 +186,25 @@ public class Queue
/* Block till we dequeue a message successfully */
while(dequeuedMessage is null)
{
/**
* Call `wait()` and catch any interrupts
* in which case loop back and call `wait()`
* again
*/
while(true)
scope(exit)
{
try
{
// TODO: Make us wait on the event (optional with a time-out)
event.wait();
}
catch(InterruptedException e)
{
version(unittest)
{
import std.stdio;
writeln("dequeue() had libsnooze wait() get interrupted!");
}

// Retry the wait()
continue;
}
catch(FatalException fatalErr)
{
version(unittest)
{
import std.stdio;
writeln("dequeue() had libsnooze wait() get FATALLY fail! Exception will now throw...");
}

// Throw an exception on a fatal exception
throw new TristanableException(ErrorType.DEQUEUE_FAILED);
}

// On successful wait() wake-up exit this wait()-retry loop
break;
// Unlock the mutex
this.mutex.unlock();
}


// Lock the mutex
this.mutex.lock();

try
{
this.signal.wait(this.wakeInterval);
}
catch(SyncError e)
{
// Throw an exception on a fatal exception
throw new TristanableException(ErrorType.DEQUEUE_FAILED);
}


/* Lock the item queue */
queueLock.lock();
Expand Down
Loading