Skip to content

heap-code/concurrency-synchronization

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Concurrency synchronization

CI npm version Code coverage Comment coverage

Manage concurrency in Javascript "threads" with promises.

Preface

The aim of this package is to mimic the various tools used for concurrency synchronization, as multithreading does not exist in Javascript in the same way as in C, Java or other languages.

From the mozilla documentation:

Note that JavaScript is single-threaded by nature, so at a given instant, only one task will be executing, although control can shift between different promises, making execution of the promises appear concurrent. Parallel execution in JavaScript can only be achieved through worker threads.

A "threads" is, in this context, an asynchronous task.

Installation

Simply run:

npm i @heap-code/concurrency-synchronization

CDN

Thanks to jsdelivr, this package can easily be used in browsers like this:

<script
 src="https://cdn.jsdelivr.net/npm/@heap-code/concurrency-synchronization/dist/bundles/concurrency-synchronization.umd.js"
 type="application/javascript"
></script>

Note:
It is recommended to use a minified and versioned bundle.

For example:

<script
 src="https://cdn.jsdelivr.net/npm/@heap-code/concurrency-synchronization@1.0.1/dist/bundles/concurrency-synchronization.umd.min.js"
 type="application/javascript"
></script>

More at this jsdelivr package page.

Usages

In the code examples, the sleep function is the following:

function sleep(time: number) {
  return new Promise(resolve => setTimeout(resolve, time));
}

This is a placeholder for any asynchronous task.

Note:
Avoid using this package on "production" code.
Go here to understand why.

Mutex

A mutex is a mechanism that enforces limits on access to a resource. It generally protects the access to shared variables.

Use semaphores for synchronization rather than a mutex.


With the given example:

const myVar = { i: 0 };

async function do1() {
  await sleep(200);
  myVar.i += 1;
}

async function do2() {
  await sleep(50);
  myVar.i *= 3;
}

async function bootstrap() {
  await Promise.all([do1(), sleep(10).then(() => do2())]);
  console.log(myVar.i); // => 1
}
bootstrap();

Even with the sleep, we could expect that myVar.i += 1 is done before myVar.i *= 3 as it is called before.
However myVar.i is not protected, then the final value is 1.


If a mutex locks the task, then the variable is protected:

import { Mutex } from "@heap-code/concurrency-synchronization";

const mutex = new Mutex();
const myVar = { i: 0 };

async function do1() {
  await mutex.lock();

  await sleep(200);
  myVar.i += 1;

  await mutex.unlock();
}

async function do2() {
  await mutex.lock();

  await sleep(50);
  myVar.i *= 3;

  await mutex.unlock();
}

async function bootstrap() {
  await Promise.all([do1(), sleep(10).then(() => do2())]);
  console.log(myVar.i); // => 3
}
bootstrap();

From this wikipedia section:

The task that locked the mutex is supposed to unlock it.

Mutex tryLock

It is possible to try to lock a mutex in a given time limit.
The function will then throw an exception if the mutex could not lock in time:

import { ConcurrencyExceedTimeoutException } from "@heap-code/concurrency-synchronization";

mutex.tryLock(250).catch((error: unknown) => {
  if (error instanceof ConcurrencyExceedTimeoutException) {
    console.log("Could not lock in the given time.");
  }

  throw error;
});

Mutex interrupt

A mutex can be interrupted at any time.
All awaiting "threads" will then receive an exception:

import { ConcurrencyInterruptedException, Mutex } from "@heap-code/concurrency-synchronization";

const mutex = new Mutex();
const myVar = { i: 0 };

async function do1() {
  await mutex.lock().catch((error: unknown) => {
    if (error instanceof ConcurrencyInterruptedException) {
      console.log("The mutex has been interrupted", error.getReason());
    }

    throw error;
  });

  await sleep(200);
  myVar.i += 1;

  await mutex.unlock();
}

async function bootstrap() {
  await Promise.all([
    do1(),
    do1(),
    sleep(20).then(() => mutex.interrupt({ message: "Take too much time" }))
  ]);
}
bootstrap();

Semaphore

From wikipedia:

Semaphores are a type of synchronization primitive.

They can be used to protect certain resources (like mutexes), but are generally used for synchronization:

import { Semaphore } from "@heap-code/concurrency-synchronization";

async function bootstrap() {
  const semaphore = new Semaphore(0);

  const time1 = 100;
  const time2 = 150;

  sleep(time1).then(() => semaphore.release());
  sleep(time2).then(() => semaphore.release());

  const maxTime = Math.max(time1, time2);

  const before = performance.now();
  await semaphore.acquire(2); // waiting until releases
  const after = performance.now();

  const elapsed = after - before; // ~150
  console.log("Done. took %dms with expected %dms", elapsed, maxTime);
}
bootstrap();

Semaphore tryAcquire

It is possible to try to acquire a semaphore in a given time limit.
The function will then throw an exception if the semaphore could not acquire in time:

import {
 ConcurrencyExceedTimeoutException,
 Semaphore
} from "@heap-code/concurrency-synchronization";

async function bootstrap() {
 const semaphore = new Semaphore(2);

 const acquired1 = await semaphore.tryAcquire(100).then(() => true);
 const acquired2 = await semaphore.tryAcquire(100, 2).catch((error: unknown) => {
   if (error instanceof ConcurrencyExceedTimeoutException) {
     return false;
   }
 
   throw error;
 });

 console.log(acquired1); // true
 console.log(acquired2); // false
}
bootstrap();

Semaphore interrupt

A semaphore can be interrupted at any time.
All awaiting "threads" will then receive an exception:

import { ConcurrencyInterruptedException, Semaphore } from "@heap-code/concurrency-synchronization";

async function bootstrap() {
 const semaphore = new Semaphore(1);

 void sleep(100).then(() => semaphore.interrupt({ code: 502 }, 2));

 const succeed = await Promise.all([
   semaphore.acquire(),
   semaphore.acquire(2),
   semaphore.tryAcquire(200),
   semaphore.tryAcquire(200, 1)
 ]).catch((error: unknown) => {
   if (error instanceof ConcurrencyInterruptedException) {
     return false;
   }
   throw error;
 });

 console.log(succeed); // false
 console.log(semaphore.permitsAvailable); // 2
}
bootstrap();

Semaphore releaseAll

Very similar to interrupt, but it does not throw an exception.

import { Semaphore } from "@heap-code/concurrency-synchronization";

async function bootstrap() {
 const semaphore = new Semaphore(1);

 void sleep(100).then(() => semaphore.releaseAll(3));

 await Promise.all([
   semaphore.acquire(),
   semaphore.acquire(2),
   semaphore.tryAcquire(200),
   semaphore.tryAcquire(200, 1)
 ]);

 console.log("ok");
 console.log(semaphore.permitsAvailable); // 3
}
bootstrap()

Note: Unless it is really desired, prefer interrupt over releaseAll.

Producer-Consumer

The ProducerConsumer looks a lot like a Semaphore, but it returns values on acquire.

By default, all readings use an array:

import { ProducerConsumer } from "@heap-code/concurrency-synchronization";

async function bootstrap() {
  const producerConsumer = new ProducerConsumer([1]);

  const time1 = 100;
  const time2 = 150;

  sleep(time1).then(() => producerConsumer.write(3, 4));
  sleep(time2).then(() => producerConsumer.write(2));

  const maxTime = Math.max(time1, time2);

  const before = performance.now();
  const valuesRead = await producerConsumer.read(4); // waiting until all is read
  const after = performance.now();

  const elapsed = after - before; // ~150
  console.log("Done. took %dms with expected %dms", elapsed, maxTime);
  console.log(valuesRead) // [1, 3, 4, 2]
}
bootstrap();

Producer-Consumer tryRead

It is possible to try to read some values in a given time limit.
The function will then throw an exception if it could not read in time:

import {
  ConcurrencyExceedTimeoutException,
  ProducerConsumer
} from "@heap-code/concurrency-synchronization";

async function bootstrap() {
  const producerConsumer = new ProducerConsumer([1, 2, 3]);

  const success1 = await producerConsumer.tryRead(100, 2).then(() => true);
  const success2 = await producerConsumer.tryRead(100, 2).catch((error: unknown) => {
    if (error instanceof ConcurrencyExceedTimeoutException) {
      return false;
    }
  
    throw error;
  });

  console.log(success1); // true
  console.log(success2); // false
}
bootstrap();

Producer-Consumer readOne

The read and tryRead have their "one"-method that do the same thing but return only one value instead of an array:

import { ProducerConsumer } from "@heap-code/concurrency-synchronization";

async function bootstrap() {
  const producerConsumer = new ProducerConsumer([1, 2]);

  // const [value1] = producerConsumer.read(1);
  // can be written:
  const value1 = await producerConsumer.readOne();

  // const [value2] = producerConsumer.tryRead(100, 1);
 // can be written:
  const value2 = await producerConsumer.tryReadOne(100);
  
  console.log(value1, value2); // 1 2
}
bootstrap();

Producer-Consumer interrupt

A ProducerConsumer can be interrupted at any time.
All awaiting "threads" will then receive an exception:

import {
  ConcurrencyInterruptedException,
  ProducerConsumer
} from "@heap-code/concurrency-synchronization";

async function bootstrap() {
  const producerConsumer = new ProducerConsumer([1]);

  void sleep(100).then(() => producerConsumer.interrupt({ code: 502 }, [1, 2, 3]));

  const succeed = await Promise.all([
    producerConsumer.read(3),
    producerConsumer.readOne(),
    producerConsumer.tryRead(200, 3),
    producerConsumer.tryReadOne(200)
  ]).catch((error: unknown) => {
    if (error instanceof ConcurrencyInterruptedException) {
      return false;
    }
    throw error;
  });

  console.log(succeed); // false
  console.log(producerConsumer.permitsAvailable); // 3
}
bootstrap();

When to use

This package can be useful when writing test and wanting to synchronize events.

For example, the RxJs observable behavior slightly differs if it comes from regular observable or subjects.

firstValueFrom returns the value immediately.

So this difference can be omitted with the following:

import { ProducerConsumer } from "@heap-code/concurrency-synchronization";

describe("My test", () => {
  it("should work", async () => {
    const producerConsumer = new ProducerConsumer();
    const subscription = myObservable.subscribe(value => producerConsumer.write(value));
    // something that updates the observable
 
    // Need to pass 2 times in the event
    const [r1, r2] = await producerConsumer.tryRead(500, 2);
 
    expect(r1).toBe(1);
    expect(r2).toBe(2);
    subscription.unsubscribe()
  });
});

However, these synchronizations are generally not wanted in production code as it is wanted to keep the javascript code as "parallelized" as possible, to not block code branches.
Moreover, better solutions might exist for these problems, such as firstValueFrom and lastValueFrom for RxJs.

Releases

See information about breaking changes and release notes here.