Skip to content
This repository has been archived by the owner on Sep 27, 2024. It is now read-only.

Add closed state to Mailbox #26

Merged
merged 2 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 0.3.0
devoncarew marked this conversation as resolved.
Show resolved Hide resolved

- Add a closed state to `Mailbox`.

## 0.2.0

- Lower SDK lower bound to 3.0.0.
Expand Down
24 changes: 22 additions & 2 deletions lib/mailbox.dart
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class Mailbox {

static const _stateEmpty = 0;
static const _stateFull = 1;
static const _stateClosed = 2;

static final finalizer = Finalizer((Pointer<_MailboxRepr> mailbox) {
calloc.free(mailbox.ref.buffer);
Expand All @@ -72,7 +73,7 @@ class Mailbox {
final buffer = message.isEmpty ? nullptr : _toBuffer(message);
_mutex.runLocked(() {
if (_mailbox.ref.state != _stateEmpty) {
throw StateError('Mailbox is full');
throw StateError('Mailbox is closed or full');
}

_mailbox.ref.state = _stateFull;
Expand All @@ -83,15 +84,34 @@ class Mailbox {
});
}

/// Close a mailbox.
///
/// If mailbox already contains a message then it will be dropped.
void close() => _mutex.runLocked(() {
if (_mailbox.ref.state == _stateFull && _mailbox.ref.bufferLength > 0) {
malloc.free(_mailbox.ref.buffer);
}

_mailbox.ref.state = _stateClosed;
_mailbox.ref.buffer = nullptr;
_mailbox.ref.bufferLength = 0;

_condVar.notify();
});

/// Take a message from the mailbox.
///
/// If mailbox is empty this will synchronously block until message
/// is available.
ntkme marked this conversation as resolved.
Show resolved Hide resolved
Uint8List take() => _mutex.runLocked(() {
while (_mailbox.ref.state != _stateFull) {
while (_mailbox.ref.state == _stateEmpty) {
_condVar.wait(_mutex);
}

if (_mailbox.ref.state == _stateClosed) {
throw StateError('Mailbox is closed');
}

final result = _toList(_mailbox.ref.buffer, _mailbox.ref.bufferLength);

_mailbox.ref.state = _stateEmpty;
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: native_synchronization
description: Low level synchronization primitives built on dart:ffi.
version: 0.2.0
version: 0.3.0
repository: https://github.com/dart-lang/native_synchronization

environment:
Expand Down
21 changes: 21 additions & 0 deletions test/mailbox_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,25 @@ void main() {
expect(value[41], equals(42));
expect(await helperResult, equals('success'));
});

Future<String> startHelperIsolateClose(Sendable<Mailbox> sendableMailbox) {
return Isolate.run(() {
sleep(const Duration(milliseconds: 500));
final mailbox = sendableMailbox.materialize();
try {
mailbox.take();
} catch (_) {
return 'success';
}
return 'failed';
});
}

test('mailbox close', () async {
final mailbox = Mailbox();
mailbox.put(Uint8List(42)..[41] = 42);
mailbox.close();
final helperResult = startHelperIsolateClose(mailbox.asSendable);
expect(await helperResult, equals('success'));
});
}