Skip to content

Commit

Permalink
[prelude][core] effect isolates (#853)
Browse files Browse the repository at this point in the history
Please see the `Isolate` scaladoc for more background. Ideally, we
should find a way to merge `Boundary` and `Isolate` but that's
challenging with the inference issues we've been having and it's
difficult to match the current performance of `Boundary` since it's
optimized by using more low-level kernel APIs. This new mechanism is
focused only on effects that maintain state but don't transform values.

Main benefits of this change:

- It allows users to fork computations with `Var`, `Emit`, `Memo`, and
`Check`.
- Users can leverage `isolate.run` directly to enforce the effect
handling for a computation
- Isolates provide transaction-like behavior via interaction with
`Abort`. This behavior will be necessary for an STM-based effect since
it'll need to rollback state to retry computations if necessary.

`Isolate` can't support effects like `Choice` since it transforms `A`
into `Seq[A]`.
  • Loading branch information
fwbrasil authored Nov 26, 2024
1 parent 70f91b4 commit 08765c9
Show file tree
Hide file tree
Showing 13 changed files with 1,662 additions and 26 deletions.
214 changes: 214 additions & 0 deletions kyo-core/shared/src/main/scala/kyo/Async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,29 @@ object Async:
): A < (Abort[E] & Async & Ctx) =
_run(v).map(_.mask.map(_.get))

/** Runs an async computation with interrupt masking and state isolation.
*
* @param isolate
* Controls state propagation during masked execution
* @param v
* The computation to mask
* @return
* The computation result
*/
inline def mask[E, A: Flat, S, Ctx](isolate: Isolate[S])(v: => A < (Abort[E] & Async & S & Ctx))(
using frame: Frame
): A < (Abort[E] & Async & S & Ctx) =
_mask(isolate)(v)

private def _mask[E, A: Flat, S, Ctx](isolate: Isolate[S])(v: => A < (Abort[E] & Async & S & Ctx))(
using
boundary: Boundary[Ctx, S & Async & Abort[E]],
frame: Frame
): A < (Abort[E] & Async & S & Ctx) =
isolate.use { state =>
_mask(isolate.resume(state, v)).map(isolate.restore(_, _))
}

/** Delays execution of a computation by a specified duration.
*
* @param d
Expand Down Expand Up @@ -150,6 +173,31 @@ object Async:
}
}

/** Runs a computation with timeout and state isolation.
*
* @param after
* The timeout duration
* @param isolate
* Controls state propagation during execution
* @param v
* The computation to timeout
* @return
* The result or Timeout error
*/
inline def timeout[E, A: Flat, S, Ctx](after: Duration, isolate: Isolate[S])(v: => A < (Abort[E] & Async & S & Ctx))(
using frame: Frame
): A < (Abort[E | Timeout] & Async & S & Ctx) =
_timeout(after, isolate)(v)

private def _timeout[E, A: Flat, S, Ctx](after: Duration, isolate: Isolate[S])(v: => A < (Abort[E] & Async & S & Ctx))(
using
boundary: Boundary[Ctx, S & Async & Abort[E | Timeout]],
frame: Frame
): A < (Abort[E | Timeout] & Async & S & Ctx) =
isolate.use { state =>
_timeout(after)(isolate.resume(state, v)).map(isolate.restore(_, _))
}

/** Races multiple computations and returns the result of the first to complete. When one computation completes, all other computations
* are interrupted.
*
Expand All @@ -173,6 +221,29 @@ object Async:
if seq.isEmpty then seq(0)
else Fiber._race(seq).map(_.get)

/** Races computations with state isolation, returning first to complete.
*
* @param isolate
* Controls state propagation during race
* @param seq
* Computations to race
* @return
* First successful result
*/
inline def race[E, A: Flat, S, Ctx](isolate: Isolate[S])(seq: Seq[A < (Abort[E] & Async & S & Ctx)])(
using frame: Frame
): A < (Abort[E] & Async & S & Ctx) =
_race(isolate)(seq)

private def _race[E, A: Flat, S, Ctx](isolate: Isolate[S])(seq: Seq[A < (Abort[E] & Async & S & Ctx)])(
using
boundary: Boundary[Ctx, S & Async & Abort[E]],
frame: Frame
): A < (Abort[E] & Async & S & Ctx) =
isolate.use { state =>
_race(seq.map(isolate.resume(state, _))).map(isolate.restore(_, _))
}

/** Races two or more computations and returns the result of the first to complete.
*
* @param first
Expand All @@ -190,6 +261,25 @@ object Async:
): A < (Abort[E] & Async & Ctx) =
race[E, A, Ctx](first +: rest)

/** Races multiple computations with state isolation.
*
* @param isolate
* Controls state propagation during race
* @param first
* First computation to race
* @param rest
* Additional computations to race
* @return
* First successful result
*/
inline def race[E, A: Flat, S, Ctx](isolate: Isolate[S])(
first: A < (Abort[E] & Async & S & Ctx),
rest: A < (Abort[E] & Async & S & Ctx)*
)(
using frame: Frame
): A < (Abort[E] & Async & S & Ctx) =
race[E, A, S, Ctx](isolate)(first +: rest)

/** Concurrently executes effects and collects their successful results.
*
* WARNING: Executes all computations in parallel without bounds. Use with caution on large sequences to avoid resource exhaustion.
Expand Down Expand Up @@ -309,6 +399,31 @@ object Async:
end match
end _parallelUnbounded

/** Runs computations in parallel with unlimited concurrency and state isolation.
*
* @param isolate
* Controls state propagation during parallel execution
* @param seq
* Computations to run in parallel
* @return
* Results in original order
*/
inline def parallelUnbounded[E, A: Flat, S, Ctx](isolate: Isolate[S], seq: Seq[A < (Abort[E] & Async & S & Ctx)])(
using frame: Frame
): Seq[A] < (Abort[E] & Async & S & Ctx) =
_parallelUnbounded(isolate, seq)

private def _parallelUnbounded[E, A: Flat, S, Ctx](isolate: Isolate[S], seq: Seq[A < (Abort[E] & Async & S & Ctx)])(
using
boundary: Boundary[Ctx, S & Async & Abort[E]],
frame: Frame
): Seq[A] < (Abort[E] & Async & S & Ctx) =
isolate.use { state =>
_parallelUnbounded(seq.map(isolate.resume(state, _))).map { results =>
Kyo.collect(results.map((state, result) => isolate.restore(state, result)))
}
}

/** Runs multiple computations in parallel with a specified level of parallelism and returns their results.
*
* This method allows you to execute a sequence of computations with controlled parallelism by grouping them into batches. The
Expand Down Expand Up @@ -342,6 +457,33 @@ object Async:
case 1 => seq(0).map(Seq(_))
case n => Fiber._parallel(parallelism)(seq).map(_.get)

/** Runs computations in parallel with controlled concurrency and state isolation.
*
* @param parallelism
* Maximum concurrent computations
* @param isolate
* Controls state propagation during parallel execution
* @param seq
* Computations to run in parallel
* @return
* Results in original order
*/
inline def parallel[E, A: Flat, S, Ctx](parallelism: Int, isolate: Isolate[S])(seq: Seq[A < (Abort[E] & Async & S & Ctx)])(
using frame: Frame
): Seq[A] < (Abort[E] & Async & S & Ctx) =
_parallel(parallelism, isolate)(seq)

private def _parallel[E, A: Flat, S, Ctx](parallelism: Int, isolate: Isolate[S])(seq: Seq[A < (Abort[E] & Async & S & Ctx)])(
using
boundary: Boundary[Ctx, S & Async & Abort[E]],
frame: Frame
): Seq[A] < (Abort[E] & Async & S & Ctx) =
isolate.use { state =>
_parallel(parallelism)(seq.map(isolate.resume(state, _))).map { results =>
Kyo.collect(results.map((state, result) => isolate.restore(state, result)))
}
}

/** Runs two computations in parallel and returns their results as a tuple.
*
* @param v1
Expand All @@ -361,6 +503,27 @@ object Async:
(s(0).asInstanceOf[A1], s(1).asInstanceOf[A2])
}

/** Runs two computations in parallel with state isolation.
*
* @param isolate
* Controls state propagation during parallel execution
* @param v1
* First computation
* @param v2
* Second computation
* @return
* Tuple of results
*/
inline def parallel[E, A1: Flat, A2: Flat, S, Ctx](isolate: Isolate[S])(
v1: A1 < (Abort[E] & Async & S & Ctx),
v2: A2 < (Abort[E] & Async & S & Ctx)
)(
using frame: Frame
): (A1, A2) < (Abort[E] & Async & S & Ctx) =
parallelUnbounded(isolate, Seq(v1, v2))(using Flat.unsafe.bypass).map { s =>
(s(0).asInstanceOf[A1], s(1).asInstanceOf[A2])
}

/** Runs three computations in parallel and returns their results as a tuple.
*
* @param v1
Expand All @@ -383,6 +546,30 @@ object Async:
(s(0).asInstanceOf[A1], s(1).asInstanceOf[A2], s(2).asInstanceOf[A3])
}

/** Runs three computations in parallel with state isolation.
*
* @param isolate
* Controls state propagation during parallel execution
* @param v1
* First computation
* @param v2
* Second computation
* @param v3
* Third computation
* @return
* Tuple of results
*/
inline def parallel[E, A1: Flat, A2: Flat, A3: Flat, S, Ctx](isolate: Isolate[S])(
v1: A1 < (Abort[E] & Async & Ctx),
v2: A2 < (Abort[E] & Async & Ctx),
v3: A3 < (Abort[E] & Async & Ctx)
)(
using frame: Frame
): (A1, A2, A3) < (Abort[E] & Async & Ctx) =
parallelUnbounded(Seq(v1, v2, v3))(using Flat.unsafe.bypass).map { s =>
(s(0).asInstanceOf[A1], s(1).asInstanceOf[A2], s(2).asInstanceOf[A3])
}

/** Runs four computations in parallel and returns their results as a tuple.
*
* @param v1
Expand All @@ -408,6 +595,33 @@ object Async:
(s(0).asInstanceOf[A1], s(1).asInstanceOf[A2], s(2).asInstanceOf[A3], s(3).asInstanceOf[A4])
}

/** Runs four computations in parallel with state isolation.
*
* @param isolate
* Controls state propagation during parallel execution
* @param v1
* First computation
* @param v2
* Second computation
* @param v3
* Third computation
* @param v4
* Fourth computation
* @return
* Tuple of results
*/
inline def parallel[E, A1: Flat, A2: Flat, A3: Flat, A4: Flat, S, Ctx](isolate: Isolate[S])(
v1: A1 < (Abort[E] & Async & S & Ctx),
v2: A2 < (Abort[E] & Async & S & Ctx),
v3: A3 < (Abort[E] & Async & S & Ctx),
v4: A4 < (Abort[E] & Async & S & Ctx)
)(
using frame: Frame
): (A1, A2, A3, A4) < (Abort[E] & Async & S & Ctx) =
parallelUnbounded(isolate, Seq(v1, v2, v3, v4))(using Flat.unsafe.bypass).map { s =>
(s(0).asInstanceOf[A1], s(1).asInstanceOf[A2], s(2).asInstanceOf[A3], s(3).asInstanceOf[A4])
}

/** Converts a Future to an asynchronous computation.
*
* This method allows integration of existing Future-based code with Kyo's asynchronous system. It handles successful completion and
Expand Down
111 changes: 111 additions & 0 deletions kyo-core/shared/src/test/scala/kyo/AsyncTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,117 @@ class AsyncTest extends Test:
}
}

"with isolates" - {
"mask with isolate" in run {
val varIsolate = Var.isolate.update[Int]

Var.runTuple(1) {
for
start <- Var.get[Int]
_ <- Async.mask(varIsolate) {
for
_ <- Var.set(2)
_ <- Async.sleep(1.millis)
_ <- Var.set(3)
yield ()
}
end <- Var.get[Int]
yield (start, end)
}.map { result =>
assert(result == (3, (1, 3)))
}
}

"timeout with isolate" in run {
val emitIsolate = Emit.isolate.merge[Int]

Emit.run {
Async.timeout(1.hour, emitIsolate) {
for
_ <- Emit(1)
_ <- Async.sleep(1.millis)
_ <- Emit(2)
yield "done"
}
}.map { result =>
assert(result == (Chunk(1, 2), "done"))
}
}

"race with isolate" in run {
val varIsolate = Var.isolate.update[Int]

Var.runTuple(0) {
Async.race(varIsolate)(
Seq(
for
_ <- Var.set(1)
_ <- Async.sleep(1.millis)
v <- Var.get[Int]
yield v,
for
_ <- Var.set(2)
_ <- Async.sleep(2.millis)
v <- Var.get[Int]
yield v
)
)
}.map { result =>
assert(result == (1, 1))
}
}

"parallel with isolate" in run {
val memoIsolate = Memo.isolate.merge
var count = 0
val f = Memo[Int, Int, Any] { x =>
count += 1
x * 2
}

Memo.run {
Kyo.zip(
Async.parallel(2, memoIsolate)(
Seq(
f(1),
f(1),
f(1)
)
),
f(1)
)
}.map { result =>
assert(result == (Seq(2, 2, 2), 2))
assert(count == 3)
}
}

"parallelUnbounded with isolate" in run {
val emitIsolate = Emit.isolate.merge[String]

Emit.run {
Async.parallelUnbounded(
emitIsolate,
Seq(
for
_ <- Emit("a1")
_ <- Async.sleep(2.millis)
_ <- Emit("a2")
yield 1,
for
_ <- Emit("b1")
_ <- Async.sleep(1.millis)
_ <- Emit("b2")
yield 2
)
)
}.map { result =>
assert(result._1.size == 4)
assert(result._2 == Seq(1, 2))
}
}
}

"gather" - {
"sequence" - {
"delegates to Fiber.gather" in run {
Expand Down
Loading

0 comments on commit 08765c9

Please sign in to comment.