Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async lock + waitFor #9

Open
davidmartos96 opened this issue Dec 1, 2023 · 1 comment
Open

Async lock + waitFor #9

davidmartos96 opened this issue Dec 1, 2023 · 1 comment

Comments

@davidmartos96
Copy link

Hello!
Thank you for the simple and great library. I was considering using it as a way to synchronize writes to a SQLite database from different isolates, since SQLite only supports one writer at a time. Context: simolus3/drift#2760

The problem is that the SQLite library is async, but runLocked doesn't work with async callbacks as stated in the docs.

I managed to "solve" it (at least for my simple demo) by creating an async runLocked + waitFor. The problem is that waitFor is being removed in Dart 3.3.
How could this be solved in an alternative manner without the waitFor?

Thank you!

Future<R> runLockedAsync<R>(Future<R> Function() action) async {
    _lock();
    try {
      return waitFor(action());
    } finally {
      _unlock();
    }
  }

Demo:

import 'dart:isolate';
import 'dart:math';

import 'package:native_synchronization/primitives.dart';

void main() async {
  final mutex = Mutex();
  const numIsolates = 20;

  final fs = <Future>[];
  for (var i = 0; i < numIsolates; i++) {
    fs.add(runInIsolate(mutex, i));
  }

  await Future.wait(fs);
}

Future<void> runInIsolate(Mutex mutex, int id) async {
  final sendableMutex = mutex.asSendable;
  await Isolate.run(debugName: 'isolate_$id', () async {
    final r = Random();
    final mutex = sendableMutex.materialize();
    for (var i = 0; i < 100; i++) {
      await mutex.runLockedAsync(() async {
        print('ASYNC isolate $id inside $i');
        final ms = r.nextInt(50);
        await Future.delayed(Duration(milliseconds: ms));
      });

      await Future.delayed(Duration(milliseconds: 100));
    }
  });
}
@mraleph
Copy link
Member

mraleph commented Mar 20, 2024

How could this be solved in an alternative manner without the waitFor?

I think a better design might be to do something which is based on message passing (e.g. you have a single writer which other isolates send messages asking it to write the data).

It is not a good idea to use a low-level synchronous mutex in a place like this to begin with because attempt to grab a lock will completely block the caller if lock is already taken.

You can concoct an asynchronous mutex out of existing synchronous mutex and ports. Here is the sketch with few missing details:

final class AsyncMutex {
  final Mutex mutex;
  // List of waiters, the first one is the owner.
  final Pointer<Pointer<_Waiter>> waiters;
  final Pointer<_Waiter> self;
  final Completer<void>? _waitingToAcquire;

  final port = ReceivePort();
  
  AsyncMutex() {
    port.listen((_) { 
      _waitingToAcquire.complete();
    });
    self = /* TODO: allocate Waiter and populate port */;
  }  

  void dispose() {
    port.close();
  }

  Future<void> acquire() {
    if (_waitingToAcquire != null) {
      throw StateError('Already waiting to acquire!');
    }

    final acquired = mutex.runLocked(() {
      // The current mutex is unowned and we can freely grab it.
      if (waiters.ref == nullptr) {
        waiters.ref = self;
        return true;
      } else {
        // Add yourself to the list of waiters.
        var w = waiters.ref;
        while (w.ref.next != nullptr) {
          w = w.ref.next;
        } 
        w.ref.next = self;
        return false;
      }
    });
    if (!acquired) {
      _waitingToAcquire = Completer();
      return _waitingToAcquire.future;
    } else {
      return Future.value(null);
    }
  }

  void release() {
    // Release the mutex and notify the next waiter the they are now 
    // the owner.
    final port = mutex.runLocked(() {
      assert(waiters.ref == self);
      waiters.ref = waiters.ref.next;
      self.next = nullptr;
      return waiters.ref == nullptr ? 0 : waiters.port;
    });
    if (port != 0) {
      /* TODO: Dart_Post can be acquired from DL C API */
      Dart_Post(port, null);
    }
  }
}

final class _Waiter extends Struct {
  @Int64()
  external int port;

  external Pointer<_Waiter> next;
}

@devoncarew devoncarew transferred this issue from dart-archive/native_synchronization Sep 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants