Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] async gather #860

Merged
merged 7 commits into from
Nov 26, 2024
Merged

[core] async gather #860

merged 7 commits into from
Nov 26, 2024

Conversation

fwbrasil
Copy link
Collaborator

New gather methods in Fiber and Async to collect successful computations out of a sequence. It also allows specifying a max number of expected results. See scaladocs for more information.

@@ -153,6 +153,8 @@ object Async:
/** Races multiple computations and returns the result of the first to complete. When one computation completes, all other computations
* are interrupted.
*
* WARNING: Executes all computations in parallel without bounds. Use with caution on large sequences to avoid resource exhaustion.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added this warning to race and gather methods since they don't have a parallelism limit.

class State extends IOPromise[E, A] with Function1[Result[E, A], Unit]:
val pending = new AtomicInteger(seq.size)
val pending = AtomicInt.Unsafe.init(seq.size)
Copy link
Collaborator Author

@fwbrasil fwbrasil Nov 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated cleanup: avoid java API

@@ -283,7 +285,7 @@ object Fiber extends FiberPlatformSpecific:
import state.*
boundary { (trace, context) =>
IO {
val interruptPanic = Result.Panic(Fiber.Interrupted(frame))
inline def interruptPanic = Result.Panic(Fiber.Interrupted(frame))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated cleanup: avoid allocations in the happy path

end State
val state = new State
import state.*
boundary { (trace, context) =>
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other methods have a very similar code. I'll follow up to reduce duplication

@@ -42,7 +42,8 @@ sealed abstract class Chunk[A] extends Seq[A] derives CanEqual:
* a new Chunk containing the first n elements
*/
override def take(n: Int): Chunk[A] =
dropLeftAndRight(0, length - Math.min(Math.max(0, n), length))
if n == length then this
else dropLeftAndRight(0, length - Math.min(Math.max(0, n), length))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optimization to avoid unnecessary allocation in Fiber._gather

}

"can handle larger arrays" - {
val size = 1000
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quickSort isn't stack-safe but the implementation can handle larger arrays with a relatively low stack depth. Also, I think we can consider gathering a large number of computations as misuse.

Copy link
Collaborator

@hearnadam hearnadam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall

inline def gather[E, A: Flat, Ctx](seq: Seq[A < (Abort[E] & Async & Ctx)])(
using frame: Frame
): Chunk[A] < (Abort[E] & Async & Ctx) =
_gather(seq.size)(seq)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should aim to avoid Seq#size when possible. It is not always known. Perhaps we should build a chunk?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think building chunk would have the same issue since it needs to know the size

def apply(idx: Int, result: Result[E, A]): Unit =
result.fold(this.interruptDiscard) { value =>
results(idx) = value
if pending.decrementAndGet() == 0 then
this.completeDiscard(Result.success(ArraySeq.unsafeWrapArray(results)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will now copy the Array, no? We should have Chunk.unsafeFromArray?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! I've added Chunk.fromNoCopy but made it private[kyo] since it's unsafe.

Comment on lines +357 to +358
val indices = new Array[Int](max)
Arrays.fill(indices, Int.MaxValue)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to avoid: Array.fill(max)(Int.MaxValue)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's definitely more heavyweight since it allocates thunks and functions + has to dispatch them. Arrays.fill is a simple for loop amenable to auto vectorization. I imagine JIT compilers might even replace the method with intrinsics.

Comment on lines +360 to +363
// Packed representation to avoid allocations and ensure atomicity
// - lower 32 bits => successful results count (ok)
// - higher 32 bits => failed results count (nok)
val packed = AtomicLong.Unsafe.init(0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be possible to use an opaque type here, right? Not necessary for this PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it wasn't worth the extra code since the implementation is relatively long already. I can revisit when refactoring these methods.

quickSort(indices, results, size)

// Limit final result to max successful results
completeDiscard(Result.success(Chunk.from(results).take(size)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to avoid copying the buffer here as well.

@fwbrasil fwbrasil merged commit 70f91b4 into main Nov 26, 2024
3 checks passed
@fwbrasil fwbrasil deleted the gather branch November 26, 2024 07:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants