Skip to content

Commit

Permalink
Add closed state to Mailbox (dart-archive/native_synchronization#26)
Browse files Browse the repository at this point in the history
* Add closed state to Mailbox

* Update documentation
  • Loading branch information
ntkme committed Aug 1, 2024
1 parent dc5c91d commit 80894f7
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 6 deletions.
4 changes: 4 additions & 0 deletions pkgs/native_synchronization/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 0.3.0

- Add a closed state to `Mailbox`.

## 0.2.0

- Lower SDK lower bound to 3.0.0.
Expand Down
32 changes: 27 additions & 5 deletions pkgs/native_synchronization/lib/mailbox.dart
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class Mailbox {

static const _stateEmpty = 0;
static const _stateFull = 1;
static const _stateClosed = 2;

static final finalizer = Finalizer((Pointer<_MailboxRepr> mailbox) {
calloc.free(mailbox.ref.buffer);
Expand All @@ -67,12 +68,13 @@ class Mailbox {

/// Place a message into the mailbox if has space for it.
///
/// If mailbox already contains a message then [put] will throw.
/// If mailbox already contains a message or mailbox is closed then [put] will
/// throw [StateError].
void put(Uint8List message) {
final buffer = message.isEmpty ? nullptr : _toBuffer(message);
_mutex.runLocked(() {
if (_mailbox.ref.state != _stateEmpty) {
throw StateError('Mailbox is full');
throw StateError('Mailbox is closed or full');
}

_mailbox.ref.state = _stateFull;
Expand All @@ -83,15 +85,35 @@ class Mailbox {
});
}

/// Close a mailbox.
///
/// If mailbox already contains a message then [close] will drop the message.
void close() => _mutex.runLocked(() {
if (_mailbox.ref.state == _stateFull && _mailbox.ref.bufferLength > 0) {
malloc.free(_mailbox.ref.buffer);
}

_mailbox.ref.state = _stateClosed;
_mailbox.ref.buffer = nullptr;
_mailbox.ref.bufferLength = 0;

_condVar.notify();
});

/// Take a message from the mailbox.
///
/// If mailbox is empty this will synchronously block until message
/// is available.
/// If mailbox is empty then [take] will synchronously block until message
/// is available or mailbox is closed. If mailbox is closed then [take] will
/// throw [StateError].
Uint8List take() => _mutex.runLocked(() {
while (_mailbox.ref.state != _stateFull) {
while (_mailbox.ref.state == _stateEmpty) {
_condVar.wait(_mutex);
}

if (_mailbox.ref.state == _stateClosed) {
throw StateError('Mailbox is closed');
}

final result = _toList(_mailbox.ref.buffer, _mailbox.ref.bufferLength);

_mailbox.ref.state = _stateEmpty;
Expand Down
2 changes: 1 addition & 1 deletion pkgs/native_synchronization/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: native_synchronization
description: Low level synchronization primitives built on dart:ffi.
version: 0.2.0
version: 0.3.0
repository: https://github.com/dart-lang/native_synchronization

environment:
Expand Down
21 changes: 21 additions & 0 deletions pkgs/native_synchronization/test/mailbox_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,25 @@ void main() {
expect(value[41], equals(42));
expect(await helperResult, equals('success'));
});

Future<String> startHelperIsolateClose(Sendable<Mailbox> sendableMailbox) {
return Isolate.run(() {
sleep(const Duration(milliseconds: 500));
final mailbox = sendableMailbox.materialize();
try {
mailbox.take();
} catch (_) {
return 'success';
}
return 'failed';
});
}

test('mailbox close', () async {
final mailbox = Mailbox();
mailbox.put(Uint8List(42)..[41] = 42);
mailbox.close();
final helperResult = startHelperIsolateClose(mailbox.asSendable);
expect(await helperResult, equals('success'));
});
}

0 comments on commit 80894f7

Please sign in to comment.