-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] async gather #860
[core] async gather #860
Conversation
@@ -153,6 +153,8 @@ object Async: | |||
/** Races multiple computations and returns the result of the first to complete. When one computation completes, all other computations | |||
* are interrupted. | |||
* | |||
* WARNING: Executes all computations in parallel without bounds. Use with caution on large sequences to avoid resource exhaustion. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added this warning to race
and gather
methods since they don't have a parallelism limit.
class State extends IOPromise[E, A] with Function1[Result[E, A], Unit]: | ||
val pending = new AtomicInteger(seq.size) | ||
val pending = AtomicInt.Unsafe.init(seq.size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unrelated cleanup: avoid java API
@@ -283,7 +285,7 @@ object Fiber extends FiberPlatformSpecific: | |||
import state.* | |||
boundary { (trace, context) => | |||
IO { | |||
val interruptPanic = Result.Panic(Fiber.Interrupted(frame)) | |||
inline def interruptPanic = Result.Panic(Fiber.Interrupted(frame)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unrelated cleanup: avoid allocations in the happy path
end State | ||
val state = new State | ||
import state.* | ||
boundary { (trace, context) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
other methods have a very similar code. I'll follow up to reduce duplication
@@ -42,7 +42,8 @@ sealed abstract class Chunk[A] extends Seq[A] derives CanEqual: | |||
* a new Chunk containing the first n elements | |||
*/ | |||
override def take(n: Int): Chunk[A] = | |||
dropLeftAndRight(0, length - Math.min(Math.max(0, n), length)) | |||
if n == length then this | |||
else dropLeftAndRight(0, length - Math.min(Math.max(0, n), length)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optimization to avoid unnecessary allocation in Fiber._gather
} | ||
|
||
"can handle larger arrays" - { | ||
val size = 1000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
quickSort
isn't stack-safe but the implementation can handle larger arrays with a relatively low stack depth. Also, I think we can consider gathering a large number of computations as misuse.
…e clarification comments" This reverts commit 3dfb6c7.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall
inline def gather[E, A: Flat, Ctx](seq: Seq[A < (Abort[E] & Async & Ctx)])( | ||
using frame: Frame | ||
): Chunk[A] < (Abort[E] & Async & Ctx) = | ||
_gather(seq.size)(seq) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should aim to avoid Seq#size
when possible. It is not always known. Perhaps we should build a chunk?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think building chunk would have the same issue since it needs to know the size
def apply(idx: Int, result: Result[E, A]): Unit = | ||
result.fold(this.interruptDiscard) { value => | ||
results(idx) = value | ||
if pending.decrementAndGet() == 0 then | ||
this.completeDiscard(Result.success(ArraySeq.unsafeWrapArray(results))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will now copy the Array, no? We should have Chunk.unsafeFromArray
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch! I've added Chunk.fromNoCopy
but made it private[kyo]
since it's unsafe.
val indices = new Array[Int](max) | ||
Arrays.fill(indices, Int.MaxValue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason to avoid: Array.fill(max)(Int.MaxValue)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's definitely more heavyweight since it allocates thunks and functions + has to dispatch them. Arrays.fill
is a simple for loop amenable to auto vectorization. I imagine JIT compilers might even replace the method with intrinsics.
// Packed representation to avoid allocations and ensure atomicity | ||
// - lower 32 bits => successful results count (ok) | ||
// - higher 32 bits => failed results count (nok) | ||
val packed = AtomicLong.Unsafe.init(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be possible to use an opaque type here, right? Not necessary for this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought it wasn't worth the extra code since the implementation is relatively long already. I can revisit when refactoring these methods.
quickSort(indices, results, size) | ||
|
||
// Limit final result to max successful results | ||
completeDiscard(Result.success(Chunk.from(results).take(size))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be able to avoid copying the buffer here as well.
New
gather
methods inFiber
andAsync
to collect successful computations out of a sequence. It also allows specifying a max number of expected results. See scaladocs for more information.