Skip to content

Commit

Permalink
Fix intermediate operations with ActorCompletableFuture
Browse files Browse the repository at this point in the history
  • Loading branch information
andrebrait committed Dec 3, 2021
1 parent ed44bf4 commit daf8b2a
Showing 1 changed file with 89 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* A specialization of {@link CompletableFuture} that doesn't allow synchronous operations while
Expand All @@ -34,11 +35,34 @@ public ActorCompletableFuture() {
this(new CompletableFuture<>());
}

public ActorCompletableFuture(CompletableFuture<T> delegate) {
/**
* Constructs a new ActorCompletableFuture.
*
* Made private since 6.0.3. It was never meant to be public, thus this was considered a bugfix.
* Use {@link ActorCompletableFuture#wrap(CompletableFuture)} instead.
*/
private ActorCompletableFuture(CompletableFuture<T> delegate) {
this.delegate = delegate;
}

private static <T> ActorCompletableFuture<T> wrap(CompletableFuture<T> completableFuture) {
/**
* Wraps an existing {@link CompletableFuture} into an ActorCompletableFuture.
* Please consider using one of the ActorCompletableFuture variations of the
* static {@link CompletableFuture} methods such as:
*
* <ul>
* <li>{@link ActorCompletableFuture#supplyAsync(Supplier)}</li>
* <li>{@link ActorCompletableFuture#supplyAsync(Supplier, Executor)}</li>
* <li>{@link ActorCompletableFuture#runAsync(Runnable)}</li>
* <li>{@link ActorCompletableFuture#runAsync(Runnable, Executor)}</li>
* <li>{@link ActorCompletableFuture#completedFuture(Object)}</li>
* </ul>
*
* @param completableFuture the {@link CompletableFuture} to wrap
* @return a {@link CompletableFuture} wrapped in an ActorCompletableFuture,
* or itself if it was already one.
*/
public static <T> ActorCompletableFuture<T> wrap(CompletableFuture<T> completableFuture) {
if (completableFuture instanceof ActorCompletableFuture) {
return (ActorCompletableFuture<T>) completableFuture;
}
Expand All @@ -55,6 +79,41 @@ private static void checkNoActorContext() {
}
}

/**
* The ActorCompletableFuture version of {@link CompletableFuture#supplyAsync(Supplier)}
*/
public static <U> ActorCompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return wrap(CompletableFuture.supplyAsync(supplier));
}

/**
* The ActorCompletableFuture version of {@link CompletableFuture#supplyAsync(Supplier, Executor)}
*/
public static <U> ActorCompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) {
return wrap(CompletableFuture.supplyAsync(supplier, executor));
}

/**
* The ActorCompletableFuture version of {@link CompletableFuture#runAsync(Runnable)}
*/
public static ActorCompletableFuture<Void> runAsync(Runnable runnable) {
return wrap(CompletableFuture.runAsync(runnable));
}

/**
* The ActorCompletableFuture version of {@link CompletableFuture#runAsync(Runnable, Executor)}
*/
public static ActorCompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
return wrap(CompletableFuture.runAsync(runnable, executor));
}

/**
* The ActorCompletableFuture version of {@link CompletableFuture#completedFuture(Object)}
*/
public static <U> ActorCompletableFuture<U> completedFuture(U value) {
return wrap(CompletableFuture.completedFuture(value));
}

@Override
public boolean isDone() {
return delegate.isDone();
Expand Down Expand Up @@ -169,14 +228,14 @@ public ActorCompletableFuture<Void> thenRunAsync(Runnable action, Executor execu
public <U, V> ActorCompletableFuture<V> thenCombine(
CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)
{
return wrap(delegate.thenCombine(other, fn));
return wrap(delegate.thenCombine(unwrap(other), fn));
}

@Override
public <U, V> ActorCompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)
{
return wrap(delegate.thenCombineAsync(other, fn));
return wrap(delegate.thenCombineAsync(unwrap(other), fn));
}

@Override
Expand All @@ -185,21 +244,21 @@ public <U, V> ActorCompletableFuture<V> thenCombineAsync(
BiFunction<? super T, ? super U, ? extends V> fn,
Executor executor)
{
return wrap(delegate.thenCombineAsync(other, fn, executor));
return wrap(delegate.thenCombineAsync(unwrap(other), fn, executor));
}

@Override
public <U> ActorCompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
{
return wrap(delegate.thenAcceptBoth(other, action));
return wrap(delegate.thenAcceptBoth(unwrap(other), action));
}

@Override
public <U> ActorCompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
{
return wrap(delegate.thenAcceptBothAsync(other, action));
return wrap(delegate.thenAcceptBothAsync(unwrap(other), action));
}

@Override
Expand All @@ -208,109 +267,109 @@ public <U> ActorCompletableFuture<Void> thenAcceptBothAsync(
BiConsumer<? super T, ? super U> action,
Executor executor)
{
return wrap(delegate.thenAcceptBothAsync(other, action, executor));
return wrap(delegate.thenAcceptBothAsync(unwrap(other), action, executor));
}

@Override
public ActorCompletableFuture<Void> runAfterBoth(
CompletionStage<?> other, Runnable action)
{
return wrap(delegate.runAfterBoth(other, action));
return wrap(delegate.runAfterBoth(unwrap(other), action));
}

@Override
public ActorCompletableFuture<Void> runAfterBothAsync(
CompletionStage<?> other, Runnable action)
{
return wrap(delegate.runAfterBothAsync(other, action));
return wrap(delegate.runAfterBothAsync(unwrap(other), action));
}

@Override
public ActorCompletableFuture<Void> runAfterBothAsync(
CompletionStage<?> other, Runnable action, Executor executor)
{
return wrap(delegate.runAfterBothAsync(other, action, executor));
return wrap(delegate.runAfterBothAsync(unwrap(other), action, executor));
}

@Override
public <U> ActorCompletableFuture<U> applyToEither(
CompletionStage<? extends T> other, Function<? super T, U> fn)
{
return wrap(delegate.applyToEither(other, fn));
return wrap(delegate.applyToEither(unwrap(other), fn));
}

@Override
public <U> ActorCompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn)
{
return wrap(delegate.applyToEitherAsync(other, fn));
return wrap(delegate.applyToEitherAsync(unwrap(other), fn));
}

@Override
public <U> ActorCompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)
{
return wrap(delegate.applyToEitherAsync(other, fn, executor));
return wrap(delegate.applyToEitherAsync(unwrap(other), fn, executor));
}

@Override
public ActorCompletableFuture<Void> acceptEither(
CompletionStage<? extends T> other, Consumer<? super T> action)
{
return wrap(delegate.acceptEither(other, action));
return wrap(delegate.acceptEither(unwrap(other), action));
}

@Override
public ActorCompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action)
{
return wrap(delegate.acceptEitherAsync(other, action));
return wrap(delegate.acceptEitherAsync(unwrap(other), action));
}

@Override
public ActorCompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
{
return wrap(delegate.acceptEitherAsync(other, action, executor));
return wrap(delegate.acceptEitherAsync(unwrap(other), action, executor));
}

@Override
public ActorCompletableFuture<Void> runAfterEither(
CompletionStage<?> other, Runnable action)
{
return wrap(delegate.runAfterEither(other, action));
return wrap(delegate.runAfterEither(unwrap(other), action));
}

@Override
public ActorCompletableFuture<Void> runAfterEitherAsync(
CompletionStage<?> other, Runnable action)
{
return wrap(delegate.runAfterEitherAsync(other, action));
return wrap(delegate.runAfterEitherAsync(unwrap(other), action));
}

@Override
public ActorCompletableFuture<Void> runAfterEitherAsync(
CompletionStage<?> other, Runnable action, Executor executor)
{
return wrap(delegate.runAfterEitherAsync(other, action, executor));
return wrap(delegate.runAfterEitherAsync(unwrap(other), action, executor));
}

@Override
public <U> ActorCompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {
return wrap(delegate.thenCompose(fn));
return wrap(delegate.thenCompose(fn.andThen(ActorCompletableFuture::unwrap)));
}

@Override
public <U> ActorCompletableFuture<U> thenComposeAsync(Function<? super T, ?
extends CompletionStage<U>> fn) {
return wrap(delegate.thenComposeAsync(fn));
return wrap(delegate.thenComposeAsync(fn.andThen(ActorCompletableFuture::unwrap)));
}

@Override
public <U> ActorCompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)
{
return wrap(delegate.thenComposeAsync(fn, executor));
return wrap(delegate.thenComposeAsync(fn.andThen(ActorCompletableFuture::unwrap), executor));
}

@Override
Expand Down Expand Up @@ -391,4 +450,11 @@ public int getNumberOfDependents() {
public String toString() {
return delegate.toString();
}

private static <T> CompletionStage<T> unwrap(CompletionStage<T> stage) {
while (stage instanceof ActorCompletableFuture) {
stage = ((ActorCompletableFuture<T>) stage).delegate;
}
return stage;
}
}

0 comments on commit daf8b2a

Please sign in to comment.