Skip to content
This repository has been archived by the owner on Sep 27, 2024. It is now read-only.

Added an optional timeout to the Mailbox.take method #27

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
3f16540
Added an optional timeout to the Mailbox.take method as well as the u…
bsutton Jul 25, 2024
d58f953
updated the change log.
bsutton Jul 25, 2024
0fbdc04
Made the TimeoutException messages more consistent.
bsutton Jul 27, 2024
e54c311
ignored .failed_tracker
bsutton Jul 31, 2024
8d596a0
Released 0.4.0.
bsutton Jul 31, 2024
0c2a0c4
Released 0.4.0.
bsutton Jul 31, 2024
e07ba21
improved the timeout to support microseconds duration. Fixed a bug in…
bsutton Aug 8, 2024
34ec74c
Moved the timepsec creation into a separate function and now support …
bsutton Aug 8, 2024
fc34d15
removed old code.
bsutton Aug 8, 2024
991d59e
formated with dart format.
bsutton Aug 8, 2024
0630538
Merge branch 'main' into main
bsutton Aug 9, 2024
b4f1b74
merged in the mailbox.close method from upstream.
bsutton Aug 9, 2024
5972360
Merge branch 'main' of github.com:onepub-dev/native_synchronization
bsutton Aug 9, 2024
d180be6
unit test were still using the temp package name.
bsutton Aug 9, 2024
efde5b8
comment out code to help isolate crash
bsutton Aug 9, 2024
a013b3c
commit for dart ffi issue
bsutton Aug 9, 2024
6d84455
changed /// to // as there comments are just to help with visual pars…
bsutton Aug 14, 2024
9be511f
added comments in winapi to help break up the code to make it easier …
bsutton Aug 14, 2024
2a29657
changed all print statemetns to use the dart logger and cleaned up th…
bsutton Aug 14, 2024
1a7c214
Added an initial call to GetLastError for windows to get around the d…
bsutton Aug 14, 2024
ab255e9
Released 0.5.0.
bsutton Aug 14, 2024
beb7244
deployed native_synchronization_temp and then reverted back the packa…
bsutton Aug 14, 2024
87e92ee
restored the call _mailbox.ref.state that cuases the ffi core dump.
bsutton Aug 14, 2024
8d95611
Merge branch 'ffi-crash'
bsutton Aug 14, 2024
58b3f6d
removed the test in _takeTimed for a closed mailbox as a work around …
bsutton Aug 14, 2024
2184525
Added method to the public API to determine the mailboxes state.
bsutton Aug 17, 2024
bc5bae8
Released 0.6.0.
bsutton Aug 17, 2024
fb1ce8f
Applied dart format to code.
bsutton Aug 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ pubspec.lock

# VSCode configuration files
.vscode/
.history/
.failed_tracker
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## 0.3.0
# 0.4.0
- Added a timeout to the Mailbox.take, Mutex.runLocked and ConditionVariable.wait methods.
bsutton marked this conversation as resolved.
Show resolved Hide resolved
Note: the Mutex timeout is ignored on Windows.


## 0.3.0
- Add a closed state to `Mailbox`.

## 0.2.0
Expand Down
73 changes: 62 additions & 11 deletions lib/mailbox.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import 'dart:typed_data';

import 'package:ffi/ffi.dart';

import 'package:native_synchronization/primitives.dart';
bsutton marked this conversation as resolved.
Show resolved Hide resolved
import 'package:native_synchronization/sendable.dart';
import 'primitives.dart';
import 'sendable.dart';

final class _MailboxRepr extends Struct {
external Pointer<Uint8> buffer;
Expand Down Expand Up @@ -49,9 +49,11 @@ class Mailbox {
static const _stateFull = 1;
static const _stateClosed = 2;

static final finalizer = Finalizer((Pointer<_MailboxRepr> mailbox) {
calloc.free(mailbox.ref.buffer);
calloc.free(mailbox);
static final finalizer = Finalizer((mailbox) {
final p = mailbox! as Pointer<_MailboxRepr>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because I dev using strong type lint options and the linter was complaining, so I fixed it.

Copy link
Contributor

@mraleph mraleph Aug 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was it complaining about? The change makes the code less typed: finalizer had static type Finalizer<Pointer<_MailboxRepr>> after your change this type is lost and you are resorting to runtime casting for some reason.

Please revert this change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still not resolved

calloc
..free(p.ref.buffer)
..free(p);
});

Mailbox()
Expand Down Expand Up @@ -102,10 +104,59 @@ class Mailbox {

/// Take a message from the mailbox.
///
/// If mailbox is empty then [take] will synchronously block until message
/// is available or mailbox is closed. If mailbox is closed then [take] will
/// throw [StateError].
Uint8List take() => _mutex.runLocked(() {
/// If the mailbox is empty this will synchronously block until message
/// is available or a timeout occurs.
/// If the mailbox is closed then [take] will throw [StateError].
///
/// If not [timeout] is provided then this method will block
/// indefinitely.
///
/// If [timeout] is provided then this will block for at most [timeout].
/// If the timeout expires before a message is available then this will
/// throw a [TimeoutException].
/// The [timeout] supports a resolution of microseconds.
Uint8List take({Duration? timeout}) {
if (timeout != null) {
return _takeTimed(timeout);
} else {
return _take();
}
}

Uint8List _takeTimed(final Duration timeout) {
final start = DateTime.now();

return _mutex.runLocked(
timeout: timeout,
() {
/// Wait for an item to be posted into the mailbox.
while (_mailbox.ref.state == _stateEmpty) {
final remainingTime = _remainingTime(timeout, start);
_condVar.wait(_mutex, timeout: remainingTime);
}
// if (_mailbox.ref.state == _stateClosed) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented out code?

// throw StateError('Mailbox is closed');
// }

final result = _toList(_mailbox.ref.buffer, _mailbox.ref.bufferLength);

_mailbox.ref.state = _stateEmpty;
_mailbox.ref.buffer = nullptr;
_mailbox.ref.bufferLength = 0;
return result;
},
);
}

Duration _remainingTime(Duration timeout, DateTime start) {
var remainingTime = timeout - (DateTime.now().difference(start));
if (remainingTime < Duration.zero) {
remainingTime = Duration.zero;
}
return remainingTime;
}

Uint8List _take() => _mutex.runLocked(() {
while (_mailbox.ref.state == _stateEmpty) {
_condVar.wait(_mutex);
}
Expand Down Expand Up @@ -137,8 +188,8 @@ class Mailbox {
return asTypedList(length, finalizer: malloc.nativeFree);
}

final result = Uint8List(length);
result.setRange(0, length, buffer.asTypedList(length));
final result = Uint8List(length)
..setRange(0, length, buffer.asTypedList(length));
malloc.free(buffer);
return result;
}
Expand Down
68 changes: 64 additions & 4 deletions lib/posix.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

part of 'primitives.dart';

/// Posix timeout error number.
// ignore: constant_identifier_names
const ETIMEDOUT = 110;

class _PosixMutex extends Mutex {
/// This is maximum value of `sizeof(pthread_mutex_t)` across all supported
/// platforms.
Expand Down Expand Up @@ -31,8 +35,25 @@ class _PosixMutex extends Mutex {
super._();

@override
void _lock() {
if (pthread_mutex_lock(_impl) != 0) {
void _lock({Duration? timeout}) {
if (timeout == null) {
if (pthread_mutex_lock(_impl) != 0) {
throw StateError('Failed to lock mutex');
}
} else {
_timedLock(timeout);
}
}

void _timedLock(Duration timeout) {
var timespec = _allocateTimespec(timeout);
final result = pthread_mutex_timedlock(_impl, timespec);
malloc.free(timespec);

if (result == ETIMEDOUT) {
throw TimeoutException('Timed out waiting for Mutex lock');
}
if (result != 0) {
throw StateError('Failed to lock mutex');
}
}
Expand Down Expand Up @@ -82,12 +103,51 @@ class _PosixConditionVariable extends ConditionVariable {
}

@override
void wait(covariant _PosixMutex mutex) {
if (pthread_cond_wait(_impl, mutex._impl) != 0) {
void wait(covariant _PosixMutex mutex, {Duration? timeout}) {
if (timeout == null) {
if (pthread_cond_wait(_impl, mutex._impl) != 0) {
throw StateError('Failed to wait on a condition variable');
}
} else {
_timedWait(timeout, mutex);
}
}

/// Waits on a condition variable with a timeout.
void _timedWait(Duration timeout, _PosixMutex mutex) {
final wakeUpTime = _allocateTimespec(timeout);
final result = pthread_cond_timedwait(_impl, mutex._impl, wakeUpTime);

malloc.free(wakeUpTime);

if (result == ETIMEDOUT) {
throw TimeoutException('Timed out waiting for conditional variable');
}

if (result != 0) {
throw StateError('Failed to wait on a condition variable');
}
}

@override
int get _address => _impl.address;
}

/// Create a posix timespec from a [timeout].
/// The returned [pthread_timespec_t] must be freed by a call
/// to [malloc.free]
Pointer<pthread_timespec_t> _allocateTimespec(Duration timeout) {
final timespec =
malloc.allocate<pthread_timespec_t>(sizeOf<pthread_timespec_t>());

/// calculate the absolute timeout in microseconds
final microSecondsSinceEpoc = DateTime.now().microsecondsSinceEpoch;
final wakupTime = microSecondsSinceEpoc + timeout.inMicroseconds;

/// seconds since the epoc to wait until.
timespec.ref.tv_sec = wakupTime ~/ 1000000;

/// additional nano-seconds after tv_sec to wait
timespec.ref.tv_nsec = (wakupTime % 1000000) * 1000;
return timespec;
}
27 changes: 19 additions & 8 deletions lib/primitives.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
/// * [Condition Variables](https://learn.microsoft.com/en-us/windows/win32/sync/condition-variables),
library;

import 'dart:async';
import 'dart:ffi';
import 'dart:io';

import 'package:ffi/ffi.dart';
import 'package:native_synchronization/sendable.dart';

import 'package:native_synchronization/src/bindings/pthread.dart';
import 'package:native_synchronization/src/bindings/winapi.dart';
import 'sendable.dart';
import 'src/bindings/pthread.dart';
import 'src/bindings/winapi.dart';

part 'posix.dart';
part 'windows.dart';
Expand All @@ -46,11 +47,15 @@ sealed class Mutex implements Finalizable {
///
/// If this mutex is already acquired then an attempt to acquire it
/// blocks the current thread until the mutex is released by the
/// current owner.
/// current owner or the timeout expires.
///
/// If no [timeout] is supplied then the method waits indefinitely.
///
/// If the [timeout] expires then a [TimeoutException] is thrown.
///
/// **Warning**: attempting to hold a mutex across asynchronous suspension
/// points will lead to undefined behavior and potentially crashes.
void _lock();
void _lock({Duration? timeout});

/// Release exclusive ownership of this mutex.
///
Expand All @@ -59,13 +64,15 @@ sealed class Mutex implements Finalizable {
void _unlock();

/// Run the given synchronous `action` under a mutex.
/// The lock will return if the lock is gained.
/// If a timeout occurs then a [TimeoutException] is thrown.
///
/// This function takes exclusive ownership of the mutex, executes `action`
/// and then releases the mutex. It returns the value returned by `action`.
///
/// **Warning**: you can't combine `runLocked` with an asynchronous code.
R runLocked<R>(R Function() action) {
_lock();
R runLocked<R>(R Function() action, {Duration? timeout}) {
_lock(timeout: timeout);
try {
return action();
} finally {
Expand Down Expand Up @@ -102,7 +109,11 @@ sealed class ConditionVariable implements Finalizable {
/// `mutex` must be a [Mutex] object exclusively held by the current thread.
/// It will be released and the thread will block until another thread
/// calls [notify].
void wait(Mutex mutex);
///
/// if a [timeout] is passed and it expires
/// then a [TimeoutException] is thrown.
/// Timeout resolution to microseconds is supported.
void wait(Mutex mutex, {Duration? timeout});

/// Wake up at least one thread waiting on this condition variable.
void notify();
Expand Down
30 changes: 30 additions & 0 deletions lib/src/bindings/pthread.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,59 @@ final class pthread_mutex_t extends Opaque {}

final class pthread_cond_t extends Opaque {}

final class pthread_timespec_t extends Struct {
@Int64()
external int tv_sec;

@Int64()
external int tv_nsec;
}

/// mutex_init
bsutton marked this conversation as resolved.
Show resolved Hide resolved
@Native<Int Function(Pointer<pthread_mutex_t>, Pointer<Void>)>()
external int pthread_mutex_init(
Pointer<pthread_mutex_t> mutex, Pointer<Void> attrs);

/// mutex_lock
@Native<Int Function(Pointer<pthread_mutex_t>)>()
external int pthread_mutex_lock(Pointer<pthread_mutex_t> mutex);

/// mutex_timedlock
@Native<
Int Function(
Pointer<pthread_mutex_t>, Pointer<pthread_timespec_t> abstime)>()
external int pthread_mutex_timedlock(
Pointer<pthread_mutex_t> mutex, Pointer<pthread_timespec_t> abstime);

/// mutex_unlock
@Native<Int Function(Pointer<pthread_mutex_t>)>()
external int pthread_mutex_unlock(Pointer<pthread_mutex_t> mutex);

/// mutex_destroy
@Native<Int Function(Pointer<pthread_mutex_t>)>()
external int pthread_mutex_destroy(Pointer<pthread_mutex_t> cond);

/// cond_init
@Native<Int Function(Pointer<pthread_cond_t>, Pointer<Void>)>()
external int pthread_cond_init(
Pointer<pthread_cond_t> cond, Pointer<Void> attrs);

/// cond_wait
@Native<Int Function(Pointer<pthread_cond_t>, Pointer<pthread_mutex_t>)>()
external int pthread_cond_wait(
Pointer<pthread_cond_t> cond, Pointer<pthread_mutex_t> mutex);

/// cond_timedwait
@Native<
Int Function(Pointer<pthread_cond_t>, Pointer<pthread_mutex_t>,
Pointer<pthread_timespec_t> abstime)>()
external int pthread_cond_timedwait(Pointer<pthread_cond_t> cond,
Pointer<pthread_mutex_t> mutex, Pointer<pthread_timespec_t> abstime);

/// cond_destroy
@Native<Int Function(Pointer<pthread_cond_t>)>()
external int pthread_cond_destroy(Pointer<pthread_cond_t> cond);

/// cond_signal
@Native<Int Function(Pointer<pthread_cond_t>)>()
external int pthread_cond_signal(Pointer<pthread_cond_t> cond);
6 changes: 6 additions & 0 deletions lib/src/bindings/winapi.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ final class SRWLOCK extends Opaque {}

final class CONDITION_VARIABLE extends Opaque {}

/// SWRLocks

@Native<Void Function(Pointer<SRWLOCK>)>()
external void InitializeSRWLock(Pointer<SRWLOCK> lock);

Expand All @@ -19,6 +21,7 @@ external void AcquireSRWLockExclusive(Pointer<SRWLOCK> lock);
@Native<Void Function(Pointer<SRWLOCK>)>()
external void ReleaseSRWLockExclusive(Pointer<SRWLOCK> mutex);

/// Condition Variables
@Native<Void Function(Pointer<CONDITION_VARIABLE>)>()
external void InitializeConditionVariable(Pointer<CONDITION_VARIABLE> condVar);

Expand All @@ -30,3 +33,6 @@ external int SleepConditionVariableSRW(Pointer<CONDITION_VARIABLE> condVar,

@Native<Void Function(Pointer<CONDITION_VARIABLE>)>()
external void WakeConditionVariable(Pointer<CONDITION_VARIABLE> condVar);

@Native<Long Function()>()
external int GetLastError();
3 changes: 3 additions & 0 deletions lib/src/version/version.g.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/// GENERATED BY pub_release do not modify.
/// Instance of 'Name' version
String packageVersion = '0.4.0';
Loading
Loading