Skip to content

Commit

Permalink
Detach with last (#88)
Browse files Browse the repository at this point in the history
* make sure the last action is invoked when interpreting the Eval effect

* use MonadError for Eff.detach to make sure that last actions are triggered
  • Loading branch information
etorreborre authored Apr 4, 2017
1 parent defa054 commit 2cca365
Show file tree
Hide file tree
Showing 14 changed files with 138 additions and 48 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ lazy val catsVersion = "0.9.0"
lazy val monixVersion = "2.1.0"
lazy val scalazVersion = "7.2.7"
lazy val fs2Version = "0.9.2"
lazy val specs2Version = "3.8.7"
lazy val specs2Version = "3.8.9"
lazy val twitterUtilVersion = "6.42.0"
lazy val catbirdVersion = "0.13.0"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object TimedTask {
)
}

implicit final def TimedTaskMonad: Monad[TimedTask] = new Monad[TimedTask] {
implicit final val TimedTaskMonad: MonadError[TimedTask, Throwable] = new MonadError[TimedTask, Throwable] {
def pure[A](x: A): TimedTask[A] =
TimedTask((_, _) => Task.now(x))

Expand All @@ -54,6 +54,13 @@ object TimedTask {
def loop(na: A): Task[B] = f(na).runNow(strategy, scheduler).flatMap(_.fold(loop, Task.now))
loop(a)
})

def raiseError[A](e: Throwable): TimedTask[A] =
TimedTask((strategy, scheduler) => Task.fail(e))

def handleErrorWith[A](fa: TimedTask[A])(f: Throwable => TimedTask[A]): TimedTask[A] =
TimedTask((strategy, scheduler) => fa.runNow(strategy, scheduler).handleWith[A] { case t => f(t).runNow(strategy, scheduler) })

}

final def now[A](value: A): TimedTask[A] = TimedTask((_, _) => Task.now(value))
Expand Down Expand Up @@ -148,7 +155,7 @@ trait TaskInterpretation extends TaskTypes {
Eff.detachA(e)(TimedTask.TimedTaskMonad, TimedTask.TimedTaskApplicative, m).runNow(strat, sched)

def runSequential[R, A](e: Eff[R, A])(implicit strat: Strategy, sched: Scheduler, m: Member.Aux[TimedTask, R, NoFx]): Task[A] =
Eff.detach(e)(Monad[TimedTask], m).runNow(strat, sched)
Eff.detach(e)(TimedTask.TimedTaskMonad, m).runNow(strat, sched)

def attempt[A](task: TimedTask[A]): TimedTask[Throwable Either A] =
TimedTask[Throwable Either A](task.runNow(_, _).attempt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ import org.scalacheck.Arbitrary._
import org.scalacheck._
import org.specs2.{ScalaCheck, Specification}

class ArrsSpec extends Specification with ScalaCheck { def is = s2"""
class ContinuationSpec extends Specification with ScalaCheck { def is = s2"""

A function can run at the end of a Kleisli arrow into the Eff monad $mapLast

"""
"""

def mapLast = prop { xs: List[Int] =>
type R = Fx.fx1[Eval]
val plusOne = Continuation.unit[R, Int].mapLast(_.map(_ + 1))
xs.traverseA(plusOne).detach.value ==== xs.map(_ + 1)
xs.traverseA(plusOne).runEval.run ==== xs.map(_ + 1)
}.setGen(Gen.listOf(Gen.oneOf(1, 2, 3)))

}
}
2 changes: 1 addition & 1 deletion jvm/src/test/scala/org/atnos/eff/EffSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class EffSpec extends Specification with ScalaCheck { def is = s2"""
type S = Fx.append[Fx.fx2[Writer[Int, ?], Either[String, ?]], Fx.fx1[Option]]
val e: Eff[S, Int] = OptionEffect.some[S, Int](1)

e.runWriter.runEither.detachA(Applicative[Option]) must beSome(Right((1, Nil)))
e.runWriter.runEither.detachA(Applicative[Option]) must beSome(Right[String, (Int, List[Int])]((1, Nil)))
}

def traverseEff = {
Expand Down
2 changes: 1 addition & 1 deletion jvm/src/test/scala/org/atnos/eff/EvalEffectSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class EvalEffectSpec extends Specification { def is = s2"""
Eval.now(eval.defer(loop(i - 1)).map(_ + 1))
}

loop(100000).value.detach.value ==== 100001
loop(100000).value.runEval.run ==== 100001
}
}

6 changes: 3 additions & 3 deletions jvm/src/test/scala/org/atnos/eff/FutureEffectSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class FutureEffectSpec(implicit ee: ExecutionEnv) extends Specification with Sca

def e14 = {
type S = Fx2[TimedFuture, Option]
var lastActionDone = false
var lastActionDone = 0

val action: Eff[S, Int] =
for {
Expand All @@ -173,11 +173,11 @@ class FutureEffectSpec(implicit ee: ExecutionEnv) extends Specification with Sca

val execute: Eff[S, Throwable Either Int] =
action.
addLast(futureDelay[S, Unit](lastActionDone = true)).
addLast(futureDelay[S, Unit](lastActionDone += 1)).
futureAttempt

execute.runOption.runSequential must beSome(beLeft[Throwable]).awaitFor(20.seconds)
lastActionDone must beTrue
lastActionDone must beEqualTo(1)
}

def e15 = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ object TaskCreation extends TaskCreation

trait TaskInterpretation extends TaskTypes {

private val monixTaskMonad: Monad[Task] =
Monad[Task]
private val monixTaskMonad: MonadError[Task, Throwable] =
monix.cats.monixToCatsMonadError(Task.typeClassInstances.monadError)

private val monixTaskApplicative : Applicative[Task] =
monixToCatsApplicative(Task.nondeterminism.applicative)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object TimedTask {
override def toString = "Applicative[Task]"
}

implicit def TimedTaskMonad: Monad[TimedTask] = new Monad[TimedTask] {
implicit final val TimedTaskMonad: MonadError[TimedTask, Throwable] = new MonadError[TimedTask, Throwable] {
def pure[A](x: A): TimedTask[A] =
TimedTask(_ => Task.now(x))

Expand All @@ -57,7 +57,13 @@ object TimedTask {
loop(a)
}

override def toString = "Monad[Task]"
def raiseError[A](e: Throwable): TimedTask[A] =
TimedTask(ess => Task.fail(e))

def handleErrorWith[A](fa: TimedTask[A])(f: Throwable => TimedTask[A]): TimedTask[A] =
TimedTask(ess => fa.runNow(ess).handleWith[A] { case t => f(t).runNow(ess) })

override def toString = "MonadError[Task, Throwable]"

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import org.atnos.eff.syntax.addon.scalaz.task._
import scala.concurrent._
import duration._
import org.specs2.matcher.TaskMatchers._
import org.specs2.matcher.ThrownExpectations

import scalaz.concurrent.Task

class TaskEffectSpec(implicit ee: ExecutionEnv) extends Specification with ScalaCheck { def is = "scalaz task".title ^ sequential ^ s2"""
class TaskEffectSpec(implicit ee: ExecutionEnv) extends Specification with ScalaCheck with ThrownExpectations { def is = "scalaz task".title ^ sequential ^ s2"""

Task effects can be used as normal values $e1
Task effects can be attempted $e2
Expand All @@ -29,6 +31,8 @@ class TaskEffectSpec(implicit ee: ExecutionEnv) extends Specification with Scala
Simple Task calls with timeout can be memoized $e9
Attempted Task calls with timeout can be memoized $e10

Last actions must be triggered in case of a failure $e11

"""

type S = Fx.fx2[TimedTask, Option]
Expand Down Expand Up @@ -130,6 +134,25 @@ class TaskEffectSpec(implicit ee: ExecutionEnv) extends Specification with Scala
invocationsNumber must be_==(1)
}

def e11 = {
type S = Fx2[TimedTask, Option]
var lastActionDone = 0

val action: Eff[S, Int] =
for {
i <- taskDelay[S, Int] { sleepFor(10.millis); 1 }
_ <- taskFailed[S, Int](new Exception("boom"))
j = i + 1
} yield j

val execute: Eff[S, Int] =
action.
addLast(taskDelay[S, Unit](lastActionDone += 1))

execute.runOption.runAsync must failWith[Exception]
lastActionDone must beEqualTo(1)
}


/**
* HELPERS
Expand All @@ -139,6 +162,5 @@ class TaskEffectSpec(implicit ee: ExecutionEnv) extends Specification with Scala

def sleepFor(duration: FiniteDuration) =
try Thread.sleep(duration.toMillis) catch { case t: Throwable => () }

}

28 changes: 20 additions & 8 deletions shared/src/main/scala/org/atnos/eff/Eff.scala
Original file line number Diff line number Diff line change
Expand Up @@ -337,25 +337,25 @@ trait EffInterpretation {
/**
* peel-off the only present effect
*/
def detach[M[_] : Monad, R, A](eff: Eff[R, A])(implicit m: Member.Aux[M, R, NoFx]): M[A] =
def detach[M[_], R, A, E](eff: Eff[R, A])(implicit monad: MonadError[M, E], m: Member.Aux[M, R, NoFx]): M[A] =
detachA(Eff.effInto[R, Fx1[M], A](eff))

/**
* peel-off the only present effect
*/
def detach[M[_] : Monad, A](eff: Eff[Fx1[M], A]): M[A] =
def detach[M[_], A, E](eff: Eff[Fx1[M], A])(implicit monad: MonadError[M, E]): M[A] =
detachA(eff)

/**
* peel-off the only present effect, using an Applicative instance where possible
*/
def detachA[M[_], R, A](eff: Eff[R, A])(implicit monad: Monad[M], applicative: Applicative[M], member: Member.Aux[M, R, NoFx]): M[A] =
def detachA[M[_], R, A, E](eff: Eff[R, A])(implicit monad: MonadError[M, E], applicative: Applicative[M], member: Member.Aux[M, R, NoFx]): M[A] =
detachA(Eff.effInto[R, Fx1[M], A](eff))(monad, applicative)

/**
* peel-off the only present effect, using an Applicative instance where possible
*/
def detachA[M[_], A](eff: Eff[Fx1[M], A])(implicit monad: Monad[M], applicative: Applicative[M]): M[A] =
def detachA[M[_], A, E](eff: Eff[Fx1[M], A])(implicit monad: MonadError[M, E], applicative: Applicative[M]): M[A] =
Monad[M].tailRecM[Eff[Fx1[M], A], A](eff) {
case Pure(a, Last(Some(l))) => monad.pure(Left(l.value.as(a)))
case Pure(a, Last(None)) => monad.pure(Right(a))
Expand All @@ -365,18 +365,30 @@ trait EffInterpretation {

case Impure(u: Union[_, _], continuation, last) =>
val ta = u.tagged.valueUnsafe.asInstanceOf[M[A]]
val result: M[Either[Eff[Fx1[M], A], A]] =
ta.map(x => Left(Impure(NoEffect(x.asInstanceOf[Any]), continuation, last)))

last match {
case Last(Some(l)) => ta.map(x => Left(Impure(NoEffect(x.asInstanceOf[Any]), continuation, last)))
case Last(None) => ta.map(x => Left(Impure(NoEffect(x.asInstanceOf[Any]), continuation)))
case Last(Some(l)) =>
monad.handleErrorWith(result)(t => detachA(l.value) >> monad.raiseError(t))

case Last(None) =>
result
}

case ap @ ImpureAp(unions, continuation, last) =>
val effects = unions.unions.map(_.tagged.valueUnsafe.asInstanceOf[M[Any]])
val sequenced = applicative.sequence(effects)

val result: M[Either[Eff[Fx1[M], A], A]] =
sequenced.map(xs => Left(Impure(NoEffect(xs), continuation, last)))

last match {
case Last(Some(_)) => sequenced.map(xs => Left(Impure(NoEffect(xs), continuation, last)))
case Last(None) => sequenced.map(xs => Left(Impure(NoEffect(xs), continuation)))
case Last(Some(l)) =>
monad.handleErrorWith(result)(t => detachA(l.value) >> monad.raiseError(t))

case Last(None) =>
result
}
}

Expand Down
61 changes: 46 additions & 15 deletions shared/src/main/scala/org/atnos/eff/EvalEffect.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,29 +40,60 @@ trait EvalCreation extends EvalTypes {
trait EvalInterpretation extends EvalTypes {

def runEval[R, U, A](effect: Eff[R, A])(implicit m: Member.Aux[Eval, R, U]): Eff[U, A] =
recurse(effect)(new Recurser[Eval, U, A, A] {
def onPure(a: A): A =
a
interpret.runInterpreter(effect)(new Interpreter[Eval, U, A, A] {
def onPure(a: A): Eff[U, A] =
pure(a)

def onEffect[X](e: Eval[X]): X Either Eff[U, A] =
Left(e.value)
def onEffect[X](x: Eval[X], continuation: Continuation[U, X, A]): Eff[U, A] =
Eff.impure(x.value, continuation)

def onApplicative[X, T[_]: Traverse](ms: T[Eval[X]]): T[X] Either Eval[T[X]] =
Right(Eval.later(ms.map(_.value)))
def onLastEffect[X](x: Eval[X], continuation: Continuation[U, X, Unit]): Eff[U, Unit] =
Eff.impure(x.value, continuation)

def onApplicativeEffect[X, T[_] : Traverse](xs: T[Eval[X]], continuation: Continuation[U, T[X], A]): Eff[U, A] =
Eff.impure(xs.map(_.value), continuation)
})

def attemptEval[R, U, A](effect: Eff[R, A])(implicit m: Member.Aux[Eval, R, U]): Eff[U, Throwable Either A] =
recurse(effect)(new Recurser[Eval, U, A, Throwable Either A] {
def onPure(a: A): Throwable Either A =
Right(a)
interpret.runInterpreter(effect)(new Interpreter[Eval, U, A, Throwable Either A] {
def onPure(a: A): Eff[U, Throwable Either A] =
pure(Right(a))

def onEffect[X](x: Eval[X], continuation: Continuation[U, X, Throwable Either A]): Eff[U, Throwable Either A] =
try { Eff.impure(x.value, continuation) }
catch { case NonFatal(t) => Eff.pure(Left(t)) }

def onEffect[X](e: Eval[X]): X Either Eff[U, Throwable Either A] =
try { Left(e.value) }
catch { case NonFatal(t) => Right(Eff.pure(Left(t))) }
def onLastEffect[X](x: Eval[X], continuation: Continuation[U, X, Unit]): Eff[U, Unit] =
Eff.impure(x.value, continuation)

def onApplicative[X, T[_]: Traverse](ms: T[Eval[X]]): T[X] Either Eval[T[X]] =
Right(ms.sequence)
def onApplicativeEffect[X, T[_] : Traverse](xs: T[Eval[X]], continuation: Continuation[U, T[X], Throwable Either A]): Eff[U, Throwable Either A] =
Eff.impure(xs.map(_.value), continuation)
})

/** the monad error instance for Eval is useful for using detach on Eff[Fx1[Eval], A] */
implicit final val monadErrorEval: MonadError[Eval, Throwable] = new MonadError[Eval, Throwable] {
private val m: Monad[Eval] = Eval.catsBimonadForEval

def pure[A](x: A): Eval[A] =
m.pure(x)

def flatMap[A, B](fa: Eval[A])(f: (A) => Eval[B]) =
m.flatMap(fa)(f)

def tailRecM[A, B](a: A)(f: (A) => Eval[Either[A, B]]): Eval[B] =
m.tailRecM(a)(f)

def raiseError[A](e: Throwable): Eval[A] =
Eval.later(throw e)

def handleErrorWith[A](fa: Eval[A])(f: Throwable => Eval[A]): Eval[A] =
Eval.later {
try Eval.now(fa.value)
catch { case t: Throwable => f(t)}
}.flatten

}

}

object EvalInterpretation extends EvalInterpretation
Expand Down
10 changes: 8 additions & 2 deletions shared/src/main/scala/org/atnos/eff/FutureEffect.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object TimedFuture {
override def toString = "Applicative[TimedFuture]"
}

implicit final def MonadTimedFuture: Monad[TimedFuture] = new Monad[TimedFuture] {
implicit final def MonadTimedFuture: MonadError[TimedFuture, Throwable] = new MonadError[TimedFuture, Throwable] {
def pure[A](x: A): TimedFuture[A] =
TimedFuture((_, _) => Future.successful(x))

Expand All @@ -56,7 +56,13 @@ object TimedFuture {
loop(a)
})

override def toString = "Monad[TimedFuture]"
def raiseError[A](e: Throwable): TimedFuture[A] =
TimedFuture((s, ec) => Future.failed(e))

def handleErrorWith[A](fa: TimedFuture[A])(f: Throwable => TimedFuture[A]): TimedFuture[A] =
TimedFuture((s, ec) => fa.runNow(s, ec).recoverWith[A] { case t => f(t).runNow(s, ec) }(ec))

override def toString = "MonadError[TimedFuture, Throwable]"
}
}

Expand Down
4 changes: 2 additions & 2 deletions shared/src/main/scala/org/atnos/eff/syntax/eff.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ final class EffPureOps[A](val a: A) extends AnyVal {
}

final class EffOneEffectOps[M[_], A](val e: Eff[Fx1[M], A]) extends AnyVal {
def detach(implicit M: Monad[M]): M[A] =
def detach[E](implicit M: MonadError[M, E]): M[A] =
Eff.detach(e)

def detachA(applicative: Applicative[M])(implicit monad: Monad[M]): M[A] =
def detachA[E](applicative: Applicative[M])(implicit monad: MonadError[M, E]): M[A] =
Eff.detachA(e)(monad, applicative)
}

Expand Down
Loading

0 comments on commit 2cca365

Please sign in to comment.