diff --git a/.github/workflows/dart.yaml b/.github/workflows/dart.yaml index 1d1bc07..f6a4ea5 100644 --- a/.github/workflows/dart.yaml +++ b/.github/workflows/dart.yaml @@ -34,6 +34,7 @@ jobs: run: dart analyze --fatal-infos test: + needs: analyze runs-on: ${{ matrix.os }} strategy: matrix: diff --git a/.gitignore b/.gitignore index 65c34dc..52160c4 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,6 @@ build/ # Omit committing pubspec.lock for library packages; see # https://dart.dev/guides/libraries/private-files#pubspeclock. pubspec.lock + +# VSCode configuration files +.vscode/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b9970e..69c03a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ ## 0.1.0-wip - Initial version. +- Expose `Mutex` and `ConditionVariable` +- Implement `Mailbox`. diff --git a/README.md b/README.md index 80d064c..b56aad1 100644 --- a/README.md +++ b/README.md @@ -1,16 +1,16 @@ [![Dart](https://github.com/dart-lang/native_synchronization/actions/workflows/dart.yaml/badge.svg)](https://github.com/dart-lang/native_synchronization/actions/workflows/dart.yaml) -Low level synchronization primitives built on dart:ffi. +This package exposes a portable interface for low-level thread +synchronization primitives like `Mutex` and `ConditionVariable`. -## TODO: Projects docs - -TODO: Add a brief project description here. +It also provides some slightly more high-level synchronization primitives +like `Mailbox` built on top of low-level primitives. ## Status: experimental **NOTE**: This package is currently experimental and published under the [labs.dart.dev](https://dart.dev/dart-team-packages) pub publisher in order to -solicit feedback. +solicit feedback. For packages in the labs.dart.dev publisher we generally plan to either graduate the package into a supported publisher (dart.dev, tools.dart.dev) after a period @@ -18,5 +18,5 @@ of feedback and iteration, or discontinue the package. These packages have a much higher expected rate of API and breaking changes. Your feedback is valuable and will help us evolve this package. For general -feedback, suggestions, and comments, please file an issue in the +feedback, suggestions, and comments, please file an issue in the [bug tracker](https://github.com/dart-lang/native_synchronization/issues). diff --git a/lib/mailbox.dart b/lib/mailbox.dart new file mode 100644 index 0000000..d7a2796 --- /dev/null +++ b/lib/mailbox.dart @@ -0,0 +1,125 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:ffi'; +import 'dart:typed_data'; + +import 'package:ffi/ffi.dart'; + +import 'package:native_synchronization/primitives.dart'; +import 'package:native_synchronization/sendable.dart'; + +final class _MailboxRepr extends Struct { + external Pointer buffer; + + @Int32() + external int bufferLength; + + @Int32() + external int state; +} + +class _SendableMailbox { + final int address; + final Sendable mutex; + final Sendable condVar; + + _SendableMailbox( + {required this.address, required this.mutex, required this.condVar}); +} + +/// Mailbox communication primitive. +/// +/// This synchronization primitive allows a single producer to send messages +/// to one or more consumers. Producer uses [put] to place a message into +/// a mailbox which consumers can then [take] out. +/// +/// [Mailbox] object can not be directly sent to other isolates via a +/// `SendPort`, but it can be converted to a `Sendable` via +/// `asSendable` getter. +/// +/// [Mailbox] object is owned by an isolate which created them. +class Mailbox { + final Pointer<_MailboxRepr> _mailbox; + final Mutex _mutex; + final ConditionVariable _condVar; + + static const _stateEmpty = 0; + static const _stateFull = 1; + + static final finalizer = Finalizer((Pointer<_MailboxRepr> mailbox) { + calloc.free(mailbox.ref.buffer); + calloc.free(mailbox); + }); + + Mailbox() + : _mailbox = calloc.allocate(sizeOf<_MailboxRepr>()), + _mutex = Mutex(), + _condVar = ConditionVariable() { + finalizer.attach(this, _mailbox); + } + + Mailbox._fromSendable(_SendableMailbox sendable) + : _mailbox = Pointer.fromAddress(sendable.address), + _mutex = sendable.mutex.materialize(), + _condVar = sendable.condVar.materialize(); + + /// Place a message into the mailbox if has space for it. + /// + /// If mailbox already contains a message then [put] will throw. + void put(Uint8List message) { + final buffer = message.isEmpty ? nullptr : _toBuffer(message); + _mutex.runLocked(() { + if (_mailbox.ref.state != _stateEmpty) { + throw StateError('Mailbox is full'); + } + + _mailbox.ref.state = _stateFull; + _mailbox.ref.buffer = buffer; + _mailbox.ref.bufferLength = message.length; + + _condVar.notify(); + }); + } + + /// Take a message from the mailbox. + /// + /// If mailbox is empty this will synchronously block until message + /// is available. + Uint8List take() => _mutex.runLocked(() { + while (_mailbox.ref.state != _stateFull) { + _condVar.wait(_mutex); + } + + final result = _toList(_mailbox.ref.buffer, _mailbox.ref.bufferLength); + + _mailbox.ref.state = _stateEmpty; + _mailbox.ref.buffer = nullptr; + _mailbox.ref.bufferLength = 0; + return result; + }); + + static final _emptyResponse = Uint8List(0); + + static Uint8List _toList(Pointer buffer, int length) { + return length == 0 + ? _emptyResponse + // We have to ignore sdk_version_since warning due to dartbug.com/53142. + // ignore: sdk_version_since + : buffer.asTypedList(length, finalizer: malloc.nativeFree); + } + + static Pointer _toBuffer(Uint8List list) { + final buffer = malloc.allocate(list.length); + buffer.asTypedList(list.length).setRange(0, list.length, list); + return buffer; + } + + Sendable get asSendable => Sendable.wrap( + Mailbox._fromSendable, + _SendableMailbox( + address: _mailbox.address, + mutex: _mutex.asSendable, + condVar: _condVar.asSendable)); +} diff --git a/lib/posix.dart b/lib/posix.dart new file mode 100644 index 0000000..b59d7ae --- /dev/null +++ b/lib/posix.dart @@ -0,0 +1,93 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +part of 'primitives.dart'; + +class _PosixMutex extends Mutex { + /// This is maximum value of `sizeof(pthread_mutex_t)` across all supported + /// platforms. + static const _sizeInBytes = 64; + + final Pointer _impl; + + static final _finalizer = Finalizer>((ptr) { + pthread_mutex_destroy(ptr); + malloc.free(ptr); + }); + + _PosixMutex() + : _impl = malloc.allocate(_PosixMutex._sizeInBytes), + super._() { + if (pthread_mutex_init(_impl, nullptr) != 0) { + malloc.free(_impl); + throw StateError('Failed to initialize mutex'); + } + _finalizer.attach(this, _impl); + } + + _PosixMutex.fromAddress(int address) + : _impl = Pointer.fromAddress(address), + super._(); + + @override + void _lock() { + if (pthread_mutex_lock(_impl) != 0) { + throw StateError('Failed to lock mutex'); + } + } + + @override + void _unlock() { + if (pthread_mutex_unlock(_impl) != 0) { + throw StateError('Failed to unlock mutex'); + } + } + + @override + int get _address => _impl.address; +} + +class _PosixConditionVariable extends ConditionVariable { + /// This is maximum value of `sizeof(pthread_cond_t)` across all supported + /// platforms. + static const _sizeInBytes = 64; + + final Pointer _impl; + + static final _finalizer = Finalizer>((ptr) { + pthread_cond_destroy(ptr); + malloc.free(ptr); + }); + + _PosixConditionVariable() + : _impl = malloc.allocate(_PosixConditionVariable._sizeInBytes), + super._() { + if (pthread_cond_init(_impl, nullptr) != 0) { + malloc.free(_impl); + throw StateError('Failed to initialize condition variable'); + } + _finalizer.attach(this, _impl); + } + + _PosixConditionVariable.fromAddress(int address) + : _impl = Pointer.fromAddress(address), + super._(); + + @override + void notify() { + if (pthread_cond_signal(_impl) != 0) { + throw StateError('Failed to signal condition variable'); + } + } + + @override + void wait(covariant _PosixMutex mutex) { + if (pthread_cond_wait(_impl, mutex._impl) != 0) { + throw StateError('Failed to wait on a condition variable'); + } + } + + @override + int get _address => _impl.address; +} diff --git a/lib/primitives.dart b/lib/primitives.dart new file mode 100644 index 0000000..c792dd9 --- /dev/null +++ b/lib/primitives.dart @@ -0,0 +1,117 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +/// This library contains native synchronization primitives such as [Mutex] +/// and [ConditionVariable] implemented on top of low-level primitives +/// provided by the OS. +/// +/// See OS specific documentation for more details: +/// +/// * POSIX man pages (Linux, Android, Mac OS X and iOS X) +/// * `pthread_mutex_lock` and `pthread_mutex_unlock`, +/// * `pthread_cond_wait` and `pthread_cond_signal`. +/// * Windows +/// * [Slim Reader/Writer (SRW) Locks](https://learn.microsoft.com/en-us/windows/win32/sync/slim-reader-writer--srw--locks), +/// * [Condition Variables](https://learn.microsoft.com/en-us/windows/win32/sync/condition-variables), +library; + +import 'dart:ffi'; +import 'dart:io'; + +import 'package:ffi/ffi.dart'; +import 'package:native_synchronization/sendable.dart'; + +import 'package:native_synchronization/src/bindings/pthread.dart'; +import 'package:native_synchronization/src/bindings/winapi.dart'; + +part 'posix.dart'; +part 'windows.dart'; + +/// A *mutex* synchronization primitive. +/// +/// Mutex can be used to synchronize access to a native resource shared between +/// multiple threads. +/// +/// [Mutex] object can not be directly sent to other isolates via a `SendPort`, +/// but it can be converted to a `Sendable` via `asSendable` getter. +/// +/// Mutex objects are owned by an isolate which created them. +sealed class Mutex implements Finalizable { + Mutex._(); + + factory Mutex() => Platform.isWindows ? _WindowsMutex() : _PosixMutex(); + + /// Acquire exclusive ownership of this mutex. + /// + /// If this mutex is already acquired then an attempt to acquire it + /// blocks the current thread until the mutex is released by the + /// current owner. + /// + /// **Warning**: attempting to hold a mutex across asynchronous suspension + /// points will lead to undefined behavior and potentially crashes. + void _lock(); + + /// Release exclusive ownership of this mutex. + /// + /// It is an error to release ownership of the mutex if it was not + /// previously acquired. + void _unlock(); + + /// Run the given synchronous `action` under a mutex. + /// + /// This function takes exclusive ownership of the mutex, executes `action` + /// and then releases the mutex. It returns the value returned by `action`. + /// + /// **Warning**: you can't combine `runLocked` with an asynchronous code. + R runLocked(R Function() action) { + _lock(); + try { + return action(); + } finally { + _unlock(); + } + } + + Sendable get asSendable => Sendable.wrap( + Platform.isWindows ? _WindowsMutex.fromAddress : _PosixMutex.fromAddress, + _address); + + int get _address; +} + +/// A *condition variable* synchronization primitive. +/// +/// Condition variable can be used to synchronously wait for a condition to +/// occur. +/// +/// [ConditionVariable] object can not be directly sent to other isolates via a +/// `SendPort`, but it can be converted to a `Sendable` +/// object via [asSendable] getter. +/// +/// [ConditionVariable] objects are owned by an isolate which created them. +sealed class ConditionVariable implements Finalizable { + ConditionVariable._(); + + factory ConditionVariable() => Platform.isWindows + ? _WindowsConditionVariable() + : _PosixConditionVariable(); + + /// Block and wait until another thread calls [notify]. + /// + /// `mutex` must be a [Mutex] object exclusively held by the current thread. + /// It will be released and the thread will block until another thread + /// calls [notify]. + void wait(Mutex mutex); + + /// Wake up at least one thread waiting on this condition variable. + void notify(); + + Sendable get asSendable => Sendable.wrap( + Platform.isWindows + ? _WindowsConditionVariable.fromAddress + : _PosixConditionVariable.fromAddress, + _address); + + int get _address; +} diff --git a/lib/sample.dart b/lib/sample.dart deleted file mode 100644 index ca6b24e..0000000 --- a/lib/sample.dart +++ /dev/null @@ -1,7 +0,0 @@ -// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file -// for details. All rights reserved. Use of this source code is governed by a -// BSD-style license that can be found in the LICENSE file. - -void main(List args) { - print('hello world'); -} diff --git a/lib/sendable.dart b/lib/sendable.dart new file mode 100644 index 0000000..6afba3d --- /dev/null +++ b/lib/sendable.dart @@ -0,0 +1,21 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +abstract final class Sendable { + static Sendable wrap(T Function(U) make, U data) { + return _SendableImpl._(make, data); + } + + T materialize(); +} + +final class _SendableImpl implements Sendable { + final U _data; + final T Function(U v) _make; + + _SendableImpl._(this._make, this._data); + + @override + T materialize() => _make(_data); +} diff --git a/lib/src/bindings/pthread.dart b/lib/src/bindings/pthread.dart new file mode 100644 index 0000000..d177671 --- /dev/null +++ b/lib/src/bindings/pthread.dart @@ -0,0 +1,38 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +// ignore_for_file: non_constant_identifier_names, camel_case_types + +import 'dart:ffi'; + +final class pthread_mutex_t extends Opaque {} + +final class pthread_cond_t extends Opaque {} + +@Native, Pointer)>() +external int pthread_mutex_init( + Pointer mutex, Pointer attrs); + +@Native)>() +external int pthread_mutex_lock(Pointer mutex); + +@Native)>() +external int pthread_mutex_unlock(Pointer mutex); + +@Native)>() +external int pthread_mutex_destroy(Pointer cond); + +@Native, Pointer)>() +external int pthread_cond_init( + Pointer cond, Pointer attrs); + +@Native, Pointer)>() +external int pthread_cond_wait( + Pointer cond, Pointer mutex); + +@Native)>() +external int pthread_cond_destroy(Pointer cond); + +@Native)>() +external int pthread_cond_signal(Pointer cond); diff --git a/lib/src/bindings/winapi.dart b/lib/src/bindings/winapi.dart new file mode 100644 index 0000000..20aec4e --- /dev/null +++ b/lib/src/bindings/winapi.dart @@ -0,0 +1,32 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +// ignore_for_file: non_constant_identifier_names, camel_case_types + +import 'dart:ffi'; + +final class SRWLOCK extends Opaque {} + +final class CONDITION_VARIABLE extends Opaque {} + +@Native)>() +external void InitializeSRWLock(Pointer lock); + +@Native)>() +external void AcquireSRWLockExclusive(Pointer lock); + +@Native)>() +external void ReleaseSRWLockExclusive(Pointer mutex); + +@Native)>() +external void InitializeConditionVariable(Pointer condVar); + +@Native< + Int Function( + Pointer, Pointer, Uint32, Uint32)>() +external int SleepConditionVariableSRW(Pointer condVar, + Pointer srwLock, int timeOut, int flags); + +@Native)>() +external void WakeConditionVariable(Pointer condVar); diff --git a/lib/windows.dart b/lib/windows.dart new file mode 100644 index 0000000..b15da7e --- /dev/null +++ b/lib/windows.dart @@ -0,0 +1,74 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +part of 'primitives.dart'; + +class _WindowsMutex extends Mutex { + static const _sizeInBytes = 8; // `sizeof(SRWLOCK)` + + final Pointer _impl; + + static final _finalizer = Finalizer>((ptr) { + malloc.free(ptr); + }); + + _WindowsMutex() + : _impl = malloc.allocate(_WindowsMutex._sizeInBytes), + super._() { + InitializeSRWLock(_impl); + _finalizer.attach(this, _impl); + } + + _WindowsMutex.fromAddress(int address) + : _impl = Pointer.fromAddress(address), + super._(); + + @override + void _lock() => AcquireSRWLockExclusive(_impl); + + @override + void _unlock() => ReleaseSRWLockExclusive(_impl); + + @override + int get _address => _impl.address; +} + +class _WindowsConditionVariable extends ConditionVariable { + static const _sizeInBytes = 8; // `sizeof(CONDITION_VARIABLE)` + + final Pointer _impl; + + static final _finalizer = Finalizer>((ptr) { + malloc.free(ptr); + }); + + _WindowsConditionVariable() + : _impl = malloc.allocate(_WindowsConditionVariable._sizeInBytes), + super._() { + InitializeConditionVariable(_impl); + _finalizer.attach(this, _impl); + } + + _WindowsConditionVariable.fromAddress(int address) + : _impl = Pointer.fromAddress(address), + super._(); + + @override + void notify() { + WakeConditionVariable(_impl); + } + + @override + void wait(covariant _WindowsMutex mutex) { + const infinite = 0xFFFFFFFF; + const exclusive = 0; + if (SleepConditionVariableSRW(_impl, mutex._impl, infinite, exclusive) == + 0) { + throw StateError('Failed to wait on a condition variable'); + } + } + + @override + int get _address => _impl.address; +} diff --git a/pubspec.yaml b/pubspec.yaml index 299b70e..f715012 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -4,10 +4,10 @@ version: 0.1.0-wip repository: https://github.com/dart-lang/native_synchronization environment: - sdk: ^3.0.0 + sdk: ">=3.1.0-348.0.dev <4.0.0" dependencies: - # lib_name: ^1.2.0 + ffi: ^2.1.0 dev_dependencies: dart_flutter_team_lints: ^1.0.0 diff --git a/test/mailbox_test.dart b/test/mailbox_test.dart new file mode 100644 index 0000000..2400d56 --- /dev/null +++ b/test/mailbox_test.dart @@ -0,0 +1,32 @@ +// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:io'; +import 'dart:isolate'; +import 'dart:typed_data'; + +import 'package:native_synchronization/mailbox.dart'; +import 'package:native_synchronization/sendable.dart'; +import 'package:test/test.dart'; + +void main() { + Future startHelperIsolate(Sendable sendableMailbox) { + return Isolate.run(() { + sleep(const Duration(milliseconds: 500)); + final mailbox = sendableMailbox.materialize(); + mailbox.put(Uint8List(42)..[41] = 42); + return 'success'; + }); + } + + test('mailbox', () async { + final mailbox = Mailbox(); + final helperResult = startHelperIsolate(mailbox.asSendable); + final value = mailbox.take(); + expect(value, isA()); + expect(value.length, equals(42)); + expect(value[41], equals(42)); + expect(await helperResult, equals('success')); + }); +} diff --git a/test/primitives_test.dart b/test/primitives_test.dart index d68b515..e1d6b1f 100644 --- a/test/primitives_test.dart +++ b/test/primitives_test.dart @@ -2,8 +2,118 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. +import 'dart:ffi'; +import 'dart:io'; +import 'dart:isolate'; + +import 'package:ffi/ffi.dart'; +import 'package:native_synchronization/primitives.dart'; +import 'package:native_synchronization/sendable.dart'; import 'package:test/test.dart'; void main() { - test('dummy', () {}); + group('mutex', () { + test('simple', () { + final mutex = Mutex(); + expect(mutex.runLocked(() => 42), equals(42)); + }); + + Future spawnHelperIsolate( + int ptrAddress, Sendable sendableMutex) { + return Isolate.run(() { + final ptr = Pointer.fromAddress(ptrAddress); + final mutex = sendableMutex.materialize(); + + while (true) { + sleep(Duration(milliseconds: 10)); + if (mutex.runLocked(() { + if (ptr.value == 2) { + return true; + } + ptr.value = 0; + sleep(Duration(milliseconds: 500)); + ptr.value = 1; + return false; + })) { + break; + } + } + + return 'success'; + }); + } + + test('isolate', () async { + await using((arena) async { + final ptr = arena.allocate(1); + final mutex = Mutex(); + + final helperResult = spawnHelperIsolate(ptr.address, mutex.asSendable); + + while (true) { + final sw = Stopwatch()..start(); + if (mutex.runLocked(() { + if (sw.elapsedMilliseconds > 300 && ptr.value == 1) { + ptr.value = 2; + return true; + } + return false; + })) { + break; + } + await Future.delayed(const Duration(milliseconds: 10)); + } + expect(await helperResult, equals('success')); + }); + }); + }); + + group('condvar', () { + Future spawnHelperIsolate( + int ptrAddress, + Sendable sendableMutex, + Sendable sendableCondVar) { + return Isolate.run(() { + final ptr = Pointer.fromAddress(ptrAddress); + final mutex = sendableMutex.materialize(); + final condVar = sendableCondVar.materialize(); + + return mutex.runLocked(() { + ptr.value = 1; + while (ptr.value == 1) { + condVar.wait(mutex); + } + return ptr.value == 2 ? 'success' : 'failure'; + }); + }); + } + + test('isolate', () async { + await using((arena) async { + final ptr = arena.allocate(1); + final mutex = Mutex(); + final condVar = ConditionVariable(); + + final helperResult = spawnHelperIsolate( + ptr.address, mutex.asSendable, condVar.asSendable); + + while (true) { + final success = mutex.runLocked(() { + if (ptr.value == 1) { + ptr.value = 2; + condVar.notify(); + return true; + } + return false; + }); + if (success) { + break; + } + await Future.delayed(const Duration(milliseconds: 20)); + } + + expect(await helperResult, equals('success')); + }); + }); + }); }