diff --git a/pkgs/native_synchronization/CHANGELOG.md b/pkgs/native_synchronization/CHANGELOG.md index 9acca142..7a52326a 100644 --- a/pkgs/native_synchronization/CHANGELOG.md +++ b/pkgs/native_synchronization/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.3.0 + +- Add a closed state to `Mailbox`. + ## 0.2.0 - Lower SDK lower bound to 3.0.0. diff --git a/pkgs/native_synchronization/lib/mailbox.dart b/pkgs/native_synchronization/lib/mailbox.dart index 0871a7c0..57cb9975 100644 --- a/pkgs/native_synchronization/lib/mailbox.dart +++ b/pkgs/native_synchronization/lib/mailbox.dart @@ -47,6 +47,7 @@ class Mailbox { static const _stateEmpty = 0; static const _stateFull = 1; + static const _stateClosed = 2; static final finalizer = Finalizer((Pointer<_MailboxRepr> mailbox) { calloc.free(mailbox.ref.buffer); @@ -67,12 +68,13 @@ class Mailbox { /// Place a message into the mailbox if has space for it. /// - /// If mailbox already contains a message then [put] will throw. + /// If mailbox already contains a message or mailbox is closed then [put] will + /// throw [StateError]. void put(Uint8List message) { final buffer = message.isEmpty ? nullptr : _toBuffer(message); _mutex.runLocked(() { if (_mailbox.ref.state != _stateEmpty) { - throw StateError('Mailbox is full'); + throw StateError('Mailbox is closed or full'); } _mailbox.ref.state = _stateFull; @@ -83,15 +85,35 @@ class Mailbox { }); } + /// Close a mailbox. + /// + /// If mailbox already contains a message then [close] will drop the message. + void close() => _mutex.runLocked(() { + if (_mailbox.ref.state == _stateFull && _mailbox.ref.bufferLength > 0) { + malloc.free(_mailbox.ref.buffer); + } + + _mailbox.ref.state = _stateClosed; + _mailbox.ref.buffer = nullptr; + _mailbox.ref.bufferLength = 0; + + _condVar.notify(); + }); + /// Take a message from the mailbox. /// - /// If mailbox is empty this will synchronously block until message - /// is available. + /// If mailbox is empty then [take] will synchronously block until message + /// is available or mailbox is closed. If mailbox is closed then [take] will + /// throw [StateError]. Uint8List take() => _mutex.runLocked(() { - while (_mailbox.ref.state != _stateFull) { + while (_mailbox.ref.state == _stateEmpty) { _condVar.wait(_mutex); } + if (_mailbox.ref.state == _stateClosed) { + throw StateError('Mailbox is closed'); + } + final result = _toList(_mailbox.ref.buffer, _mailbox.ref.bufferLength); _mailbox.ref.state = _stateEmpty; diff --git a/pkgs/native_synchronization/pubspec.yaml b/pkgs/native_synchronization/pubspec.yaml index e16317f0..bf93c3f1 100644 --- a/pkgs/native_synchronization/pubspec.yaml +++ b/pkgs/native_synchronization/pubspec.yaml @@ -1,6 +1,6 @@ name: native_synchronization description: Low level synchronization primitives built on dart:ffi. -version: 0.2.0 +version: 0.3.0 repository: https://github.com/dart-lang/native_synchronization environment: diff --git a/pkgs/native_synchronization/test/mailbox_test.dart b/pkgs/native_synchronization/test/mailbox_test.dart index 2400d56a..9b32bfa3 100644 --- a/pkgs/native_synchronization/test/mailbox_test.dart +++ b/pkgs/native_synchronization/test/mailbox_test.dart @@ -29,4 +29,25 @@ void main() { expect(value[41], equals(42)); expect(await helperResult, equals('success')); }); + + Future startHelperIsolateClose(Sendable sendableMailbox) { + return Isolate.run(() { + sleep(const Duration(milliseconds: 500)); + final mailbox = sendableMailbox.materialize(); + try { + mailbox.take(); + } catch (_) { + return 'success'; + } + return 'failed'; + }); + } + + test('mailbox close', () async { + final mailbox = Mailbox(); + mailbox.put(Uint8List(42)..[41] = 42); + mailbox.close(); + final helperResult = startHelperIsolateClose(mailbox.asSendable); + expect(await helperResult, equals('success')); + }); }