Skip to content
This repository has been archived by the owner on Sep 27, 2024. It is now read-only.

Commit

Permalink
Initial implementation of the package. (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
mraleph authored Aug 9, 2023
1 parent 6091014 commit 94f15fe
Show file tree
Hide file tree
Showing 15 changed files with 657 additions and 16 deletions.
1 change: 1 addition & 0 deletions .github/workflows/dart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ jobs:
run: dart analyze --fatal-infos

test:
needs: analyze
runs-on: ${{ matrix.os }}
strategy:
matrix:
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ build/
# Omit committing pubspec.lock for library packages; see
# https://dart.dev/guides/libraries/private-files#pubspeclock.
pubspec.lock

# VSCode configuration files
.vscode/
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
## 0.1.0-wip

- Initial version.
- Expose `Mutex` and `ConditionVariable`
- Implement `Mailbox`.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
[![Dart](https://github.com/dart-lang/native_synchronization/actions/workflows/dart.yaml/badge.svg)](https://github.com/dart-lang/native_synchronization/actions/workflows/dart.yaml)

Low level synchronization primitives built on dart:ffi.
This package exposes a portable interface for low-level thread
synchronization primitives like `Mutex` and `ConditionVariable`.

## TODO: Projects docs

TODO: Add a brief project description here.
It also provides some slightly more high-level synchronization primitives
like `Mailbox` built on top of low-level primitives.

## Status: experimental

**NOTE**: This package is currently experimental and published under the
[labs.dart.dev](https://dart.dev/dart-team-packages) pub publisher in order to
solicit feedback.
solicit feedback.

For packages in the labs.dart.dev publisher we generally plan to either graduate
the package into a supported publisher (dart.dev, tools.dart.dev) after a period
of feedback and iteration, or discontinue the package. These packages have a
much higher expected rate of API and breaking changes.

Your feedback is valuable and will help us evolve this package. For general
feedback, suggestions, and comments, please file an issue in the
feedback, suggestions, and comments, please file an issue in the
[bug tracker](https://github.com/dart-lang/native_synchronization/issues).
125 changes: 125 additions & 0 deletions lib/mailbox.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:ffi';
import 'dart:typed_data';

import 'package:ffi/ffi.dart';

import 'package:native_synchronization/primitives.dart';
import 'package:native_synchronization/sendable.dart';

final class _MailboxRepr extends Struct {
external Pointer<Uint8> buffer;

@Int32()
external int bufferLength;

@Int32()
external int state;
}

class _SendableMailbox {
final int address;
final Sendable<Mutex> mutex;
final Sendable<ConditionVariable> condVar;

_SendableMailbox(
{required this.address, required this.mutex, required this.condVar});
}

/// Mailbox communication primitive.
///
/// This synchronization primitive allows a single producer to send messages
/// to one or more consumers. Producer uses [put] to place a message into
/// a mailbox which consumers can then [take] out.
///
/// [Mailbox] object can not be directly sent to other isolates via a
/// `SendPort`, but it can be converted to a `Sendable<Mailbox>` via
/// `asSendable` getter.
///
/// [Mailbox] object is owned by an isolate which created them.
class Mailbox {
final Pointer<_MailboxRepr> _mailbox;
final Mutex _mutex;
final ConditionVariable _condVar;

static const _stateEmpty = 0;
static const _stateFull = 1;

static final finalizer = Finalizer((Pointer<_MailboxRepr> mailbox) {
calloc.free(mailbox.ref.buffer);
calloc.free(mailbox);
});

Mailbox()
: _mailbox = calloc.allocate(sizeOf<_MailboxRepr>()),
_mutex = Mutex(),
_condVar = ConditionVariable() {
finalizer.attach(this, _mailbox);
}

Mailbox._fromSendable(_SendableMailbox sendable)
: _mailbox = Pointer.fromAddress(sendable.address),
_mutex = sendable.mutex.materialize(),
_condVar = sendable.condVar.materialize();

/// Place a message into the mailbox if has space for it.
///
/// If mailbox already contains a message then [put] will throw.
void put(Uint8List message) {
final buffer = message.isEmpty ? nullptr : _toBuffer(message);
_mutex.runLocked(() {
if (_mailbox.ref.state != _stateEmpty) {
throw StateError('Mailbox is full');
}

_mailbox.ref.state = _stateFull;
_mailbox.ref.buffer = buffer;
_mailbox.ref.bufferLength = message.length;

_condVar.notify();
});
}

/// Take a message from the mailbox.
///
/// If mailbox is empty this will synchronously block until message
/// is available.
Uint8List take() => _mutex.runLocked(() {
while (_mailbox.ref.state != _stateFull) {
_condVar.wait(_mutex);
}

final result = _toList(_mailbox.ref.buffer, _mailbox.ref.bufferLength);

_mailbox.ref.state = _stateEmpty;
_mailbox.ref.buffer = nullptr;
_mailbox.ref.bufferLength = 0;
return result;
});

static final _emptyResponse = Uint8List(0);

static Uint8List _toList(Pointer<Uint8> buffer, int length) {
return length == 0
? _emptyResponse
// We have to ignore sdk_version_since warning due to dartbug.com/53142.
// ignore: sdk_version_since
: buffer.asTypedList(length, finalizer: malloc.nativeFree);
}

static Pointer<Uint8> _toBuffer(Uint8List list) {
final buffer = malloc.allocate<Uint8>(list.length);
buffer.asTypedList(list.length).setRange(0, list.length, list);
return buffer;
}

Sendable<Mailbox> get asSendable => Sendable.wrap(
Mailbox._fromSendable,
_SendableMailbox(
address: _mailbox.address,
mutex: _mutex.asSendable,
condVar: _condVar.asSendable));
}
93 changes: 93 additions & 0 deletions lib/posix.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

part of 'primitives.dart';

class _PosixMutex extends Mutex {
/// This is maximum value of `sizeof(pthread_mutex_t)` across all supported
/// platforms.
static const _sizeInBytes = 64;

final Pointer<pthread_mutex_t> _impl;

static final _finalizer = Finalizer<Pointer<pthread_mutex_t>>((ptr) {
pthread_mutex_destroy(ptr);
malloc.free(ptr);
});

_PosixMutex()
: _impl = malloc.allocate(_PosixMutex._sizeInBytes),
super._() {
if (pthread_mutex_init(_impl, nullptr) != 0) {
malloc.free(_impl);
throw StateError('Failed to initialize mutex');
}
_finalizer.attach(this, _impl);
}

_PosixMutex.fromAddress(int address)
: _impl = Pointer.fromAddress(address),
super._();

@override
void _lock() {
if (pthread_mutex_lock(_impl) != 0) {
throw StateError('Failed to lock mutex');
}
}

@override
void _unlock() {
if (pthread_mutex_unlock(_impl) != 0) {
throw StateError('Failed to unlock mutex');
}
}

@override
int get _address => _impl.address;
}

class _PosixConditionVariable extends ConditionVariable {
/// This is maximum value of `sizeof(pthread_cond_t)` across all supported
/// platforms.
static const _sizeInBytes = 64;

final Pointer<pthread_cond_t> _impl;

static final _finalizer = Finalizer<Pointer<pthread_cond_t>>((ptr) {
pthread_cond_destroy(ptr);
malloc.free(ptr);
});

_PosixConditionVariable()
: _impl = malloc.allocate(_PosixConditionVariable._sizeInBytes),
super._() {
if (pthread_cond_init(_impl, nullptr) != 0) {
malloc.free(_impl);
throw StateError('Failed to initialize condition variable');
}
_finalizer.attach(this, _impl);
}

_PosixConditionVariable.fromAddress(int address)
: _impl = Pointer.fromAddress(address),
super._();

@override
void notify() {
if (pthread_cond_signal(_impl) != 0) {
throw StateError('Failed to signal condition variable');
}
}

@override
void wait(covariant _PosixMutex mutex) {
if (pthread_cond_wait(_impl, mutex._impl) != 0) {
throw StateError('Failed to wait on a condition variable');
}
}

@override
int get _address => _impl.address;
}
117 changes: 117 additions & 0 deletions lib/primitives.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

/// This library contains native synchronization primitives such as [Mutex]
/// and [ConditionVariable] implemented on top of low-level primitives
/// provided by the OS.
///
/// See OS specific documentation for more details:
///
/// * POSIX man pages (Linux, Android, Mac OS X and iOS X)
/// * `pthread_mutex_lock` and `pthread_mutex_unlock`,
/// * `pthread_cond_wait` and `pthread_cond_signal`.
/// * Windows
/// * [Slim Reader/Writer (SRW) Locks](https://learn.microsoft.com/en-us/windows/win32/sync/slim-reader-writer--srw--locks),
/// * [Condition Variables](https://learn.microsoft.com/en-us/windows/win32/sync/condition-variables),
library;

import 'dart:ffi';
import 'dart:io';

import 'package:ffi/ffi.dart';
import 'package:native_synchronization/sendable.dart';

import 'package:native_synchronization/src/bindings/pthread.dart';
import 'package:native_synchronization/src/bindings/winapi.dart';

part 'posix.dart';
part 'windows.dart';

/// A *mutex* synchronization primitive.
///
/// Mutex can be used to synchronize access to a native resource shared between
/// multiple threads.
///
/// [Mutex] object can not be directly sent to other isolates via a `SendPort`,
/// but it can be converted to a `Sendable<Mutex>` via `asSendable` getter.
///
/// Mutex objects are owned by an isolate which created them.
sealed class Mutex implements Finalizable {
Mutex._();

factory Mutex() => Platform.isWindows ? _WindowsMutex() : _PosixMutex();

/// Acquire exclusive ownership of this mutex.
///
/// If this mutex is already acquired then an attempt to acquire it
/// blocks the current thread until the mutex is released by the
/// current owner.
///
/// **Warning**: attempting to hold a mutex across asynchronous suspension
/// points will lead to undefined behavior and potentially crashes.
void _lock();

/// Release exclusive ownership of this mutex.
///
/// It is an error to release ownership of the mutex if it was not
/// previously acquired.
void _unlock();

/// Run the given synchronous `action` under a mutex.
///
/// This function takes exclusive ownership of the mutex, executes `action`
/// and then releases the mutex. It returns the value returned by `action`.
///
/// **Warning**: you can't combine `runLocked` with an asynchronous code.
R runLocked<R>(R Function() action) {
_lock();
try {
return action();
} finally {
_unlock();
}
}

Sendable<Mutex> get asSendable => Sendable.wrap(
Platform.isWindows ? _WindowsMutex.fromAddress : _PosixMutex.fromAddress,
_address);

int get _address;
}

/// A *condition variable* synchronization primitive.
///
/// Condition variable can be used to synchronously wait for a condition to
/// occur.
///
/// [ConditionVariable] object can not be directly sent to other isolates via a
/// `SendPort`, but it can be converted to a `Sendable<ConditionVariable>`
/// object via [asSendable] getter.
///
/// [ConditionVariable] objects are owned by an isolate which created them.
sealed class ConditionVariable implements Finalizable {
ConditionVariable._();

factory ConditionVariable() => Platform.isWindows
? _WindowsConditionVariable()
: _PosixConditionVariable();

/// Block and wait until another thread calls [notify].
///
/// `mutex` must be a [Mutex] object exclusively held by the current thread.
/// It will be released and the thread will block until another thread
/// calls [notify].
void wait(Mutex mutex);

/// Wake up at least one thread waiting on this condition variable.
void notify();

Sendable<ConditionVariable> get asSendable => Sendable.wrap(
Platform.isWindows
? _WindowsConditionVariable.fromAddress
: _PosixConditionVariable.fromAddress,
_address);

int get _address;
}
7 changes: 0 additions & 7 deletions lib/sample.dart

This file was deleted.

Loading

0 comments on commit 94f15fe

Please sign in to comment.