Skip to content
This repository has been archived by the owner on Sep 27, 2024. It is now read-only.

Add closed state to Mailbox #26

Merged
merged 2 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 0.3.0
devoncarew marked this conversation as resolved.
Show resolved Hide resolved

- Add a closed state to `Mailbox`.

## 0.2.0

- Lower SDK lower bound to 3.0.0.
Expand Down
32 changes: 27 additions & 5 deletions lib/mailbox.dart
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class Mailbox {

static const _stateEmpty = 0;
static const _stateFull = 1;
static const _stateClosed = 2;

static final finalizer = Finalizer((Pointer<_MailboxRepr> mailbox) {
calloc.free(mailbox.ref.buffer);
Expand All @@ -67,12 +68,13 @@ class Mailbox {

/// Place a message into the mailbox if has space for it.
///
/// If mailbox already contains a message then [put] will throw.
/// If mailbox already contains a message or mailbox is closed then [put] will
/// throw [StateError].
void put(Uint8List message) {
final buffer = message.isEmpty ? nullptr : _toBuffer(message);
_mutex.runLocked(() {
if (_mailbox.ref.state != _stateEmpty) {
throw StateError('Mailbox is full');
throw StateError('Mailbox is closed or full');
}

_mailbox.ref.state = _stateFull;
Expand All @@ -83,15 +85,35 @@ class Mailbox {
});
}

/// Close a mailbox.
///
/// If mailbox already contains a message then [close] will drop the message.
void close() => _mutex.runLocked(() {
if (_mailbox.ref.state == _stateFull && _mailbox.ref.bufferLength > 0) {
malloc.free(_mailbox.ref.buffer);
}

_mailbox.ref.state = _stateClosed;
_mailbox.ref.buffer = nullptr;
_mailbox.ref.bufferLength = 0;

_condVar.notify();
});

/// Take a message from the mailbox.
///
/// If mailbox is empty this will synchronously block until message
/// is available.
/// If mailbox is empty then [take] will synchronously block until message
/// is available or mailbox is closed. If mailbox is closed then [take] will
/// throw [StateError].
Uint8List take() => _mutex.runLocked(() {
while (_mailbox.ref.state != _stateFull) {
while (_mailbox.ref.state == _stateEmpty) {
_condVar.wait(_mutex);
}

if (_mailbox.ref.state == _stateClosed) {
throw StateError('Mailbox is closed');
}

final result = _toList(_mailbox.ref.buffer, _mailbox.ref.bufferLength);

_mailbox.ref.state = _stateEmpty;
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: native_synchronization
description: Low level synchronization primitives built on dart:ffi.
version: 0.2.0
version: 0.3.0
repository: https://github.com/dart-lang/native_synchronization

environment:
Expand Down
21 changes: 21 additions & 0 deletions test/mailbox_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,25 @@ void main() {
expect(value[41], equals(42));
expect(await helperResult, equals('success'));
});

Future<String> startHelperIsolateClose(Sendable<Mailbox> sendableMailbox) {
return Isolate.run(() {
sleep(const Duration(milliseconds: 500));
final mailbox = sendableMailbox.materialize();
try {
mailbox.take();
} catch (_) {
return 'success';
}
return 'failed';
});
}

test('mailbox close', () async {
final mailbox = Mailbox();
mailbox.put(Uint8List(42)..[41] = 42);
mailbox.close();
final helperResult = startHelperIsolateClose(mailbox.asSendable);
expect(await helperResult, equals('success'));
});
}