Skip to content

Commit

Permalink
fixed the forking of scalaz Task effects
Browse files Browse the repository at this point in the history
 - it should not use an implicit global ExecutorService.
 - use eff.ExecutorServices to run TimedTasks
 - remove the use of an implicit Strategy for fs2 task effect
  • Loading branch information
etorreborre committed Mar 24, 2017
1 parent 67a316e commit 3099353
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 56 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ You can also check out [this presentation](http://bit.ly/eff_flatmap_2016) at fl

You add `eff` as an sbt dependency:
```scala
libraryDependencies += "org.atnos" %% "eff" % "4.0.2"
libraryDependencies += "org.atnos" %% "eff" % "4.1.0"

// to write types like Reader[String, ?]
addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.9.3")
Expand Down
24 changes: 18 additions & 6 deletions fs2/shared/src/main/scala/org/atnos/eff/addon/fs2/TaskEffect.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,29 @@ trait TaskCreation extends TaskTypes {
final def taskFailed[R :_task, A](t: Throwable): Eff[R, A] =
fromTask[R, A](Task.fail(t))

final def taskSuspend[R :_task, A](tisk: => TimedTask[Eff[R, A]]): Eff[R, A] =
TimedTask((strategy, scheduler) => Task.suspend(tisk.runNow(strategy, scheduler))).send.flatten
final def taskSuspend[R :_task, A](task: =>TimedTask[Eff[R, A]]): Eff[R, A] =
TimedTask((strategy, scheduler) => Task.suspend(task.runNow(strategy, scheduler))).send.flatten

final def taskDelay[R :_task, A](call: => A, timeout: Option[FiniteDuration] = None): Eff[R, A] =
final def taskDelay[R :_task, A](call: =>A, timeout: Option[FiniteDuration] = None): Eff[R, A] =
fromTask[R, A](Task.delay(call), timeout)

final def taskForkStrategy[R :_task, A](call: TimedTask[A])(implicit strategy: Strategy): Eff[R, A] =
TimedTask[A]((_, scheduler) => Task.start(call.runNow(strategy, scheduler)).flatMap(identity)).send
final def taskFork[R :_task, A](call: TimedTask[A], strategy: Strategy): Eff[R, A] =
taskForkWithStrategyAndTimeout(call, strategy, None)

final def taskFork[R :_task, A](call: TimedTask[A], strategy: Strategy, timeout: FiniteDuration): Eff[R, A] =
taskForkWithStrategyAndTimeout(call, strategy, Some(timeout))

final def taskFork[R :_task, A](call: TimedTask[A]): Eff[R, A] =
TimedTask[A]((strategy, scheduler) => Task.start(call.runNow(strategy, scheduler))(strategy).flatMap(identity)).send[R]
taskForkWithTimeout(call, None)

final def taskFork[R :_task, A](call: TimedTask[A], timeout: FiniteDuration): Eff[R, A] =
taskForkWithTimeout(call, Some(timeout))

final def taskForkWithTimeout[R :_task, A](call: TimedTask[A], timeout: Option[FiniteDuration]): Eff[R, A] =
TimedTask[A]((strategy, scheduler) => Task.start(call.runNow(strategy, scheduler))(strategy).flatMap(identity), timeout).send[R]

final def taskForkWithStrategyAndTimeout[R :_task, A](call: TimedTask[A], strategy: Strategy, timeout: Option[FiniteDuration]): Eff[R, A] =
TimedTask[A]((_, scheduler) => Task.start(call.runNow(strategy, scheduler))(strategy).flatMap(identity), timeout).send[R]

final def taskAsync[R :_task, A](callbackConsumer: ((Throwable Either A) => Unit) => Unit,
timeout: Option[FiniteDuration] = None): Eff[R, A] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ final class TaskOps[R, A](val e: Eff[R, A]) extends AnyVal {
def taskMemo(key: AnyRef, cache: Cache)(implicit task: TimedTask /= R): Eff[R, A] =
TaskInterpretation.taskMemo(key, cache, e)

def runAsync(implicit strat: Strategy, sched: Scheduler, m: Member.Aux[TimedTask, R, NoFx]): Task[A] =
def runAsync(implicit strategy: Strategy, scheduler: Scheduler, m: Member.Aux[TimedTask, R, NoFx]): Task[A] =
TaskInterpretation.runAsync(e)

def runSequential(implicit strat: Strategy, sched: Scheduler, m: Member.Aux[TimedTask, R, NoFx]): Task[A] =
def runSequential(implicit strategy: Strategy, scheduler: Scheduler, m: Member.Aux[TimedTask, R, NoFx]): Task[A] =
TaskInterpretation.runSequential(e)
}
11 changes: 11 additions & 0 deletions notes/4.1.0.markdown
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Tasks fixes (and slight API changes).

* fixed the forking of scalaz `Task` effects, they should not use an implicit global `ExecutorService`

* removed the `taskForkStrategy` method to create scalaz `Task` effect with a specific `ExecutorService` (and not a `Strategy` as the name was implying)
now all methods for forking are `taskFork` but with different possible arguments, with or without `ExecutorServices`, with or
without `Timeout`

* the `run` methods for scalaz `TimedTasks` now take an implicit `eff.ExecutorServices`

* remove the use of an implicit `Strategy` to create forked `fs2` task effects
Original file line number Diff line number Diff line change
Expand Up @@ -14,64 +14,67 @@ import scala.concurrent.{ExecutionContext, Promise, TimeoutException}
import scala.concurrent.duration.FiniteDuration
import scala.util.{Either, Failure, Success}

case class TimedTask[A](task: (Scheduler, ExecutionContext) => Task[A], timeout: Option[FiniteDuration] = None) {
@inline def runNow(scheduler: Scheduler, ec: ExecutionContext): Task[A] =
timeout.fold(task(scheduler, ec)) { t =>
case class TimedTask[A](run: ExecutorServices => Task[A], timeout: Option[FiniteDuration] = None) {

@inline def runNow(es: ExecutorServices): Task[A] =
timeout.fold(run(es)) { t =>
Task.async[A] { register =>
val promise = Promise[A]
val cancelTimeout = scheduler.schedule({ val _ = promise.tryFailure(new TimeoutException) }, t)
val cancelTimeout = es.scheduler.schedule({ val _ = promise.tryFailure(new TimeoutException) }, t)

task(scheduler, ec).unsafePerformAsync { tea =>
run(es).unsafePerformAsync { tea =>
promise.tryComplete(tea.fold(Failure(_), Success(_)))
cancelTimeout()
}
promise.future.onComplete(t => register(if (t.isSuccess) \/-(t.get) else -\/(t.failed.get)))(ec)
promise.future.onComplete(t => register(if (t.isSuccess) \/-(t.get) else -\/(t.failed.get)))(es.executionContext)
}
}

}

object TimedTask {

def TimedTaskApplicative: Applicative[TimedTask] = new Applicative[TimedTask] {
def pure[A](x: A): TimedTask[A] =
TimedTask((_, _) => Task.now(x))
TimedTask(_ => Task.now(x))

def ap[A, B](ff: TimedTask[A => B])(fa: TimedTask[A]): TimedTask[B] =
TimedTask[B]((scheduler, ec) => Nondeterminism[Task].mapBoth(ff.runNow(scheduler, ec), fa.runNow(scheduler, ec))(_(_)))
TimedTask[B](es => Nondeterminism[Task].mapBoth(ff.runNow(es), fa.runNow(es))(_(_)))

override def toString = "Applicative[Task]"
}

implicit def TimedTaskMonad: Monad[TimedTask] = new Monad[TimedTask] {
def pure[A](x: A): TimedTask[A] =
TimedTask((_, _) => Task.now(x))
TimedTask(_ => Task.now(x))

def flatMap[A, B](fa: TimedTask[A])(f: A => TimedTask[B]): TimedTask[B] =
TimedTask((scheduler, ec) => fa.runNow(scheduler, ec).flatMap(f(_).runNow(scheduler, ec)))
TimedTask(es => fa.runNow(es).flatMap(f(_).runNow(es)))

def tailRecM[A, B](a: A)(f: A => TimedTask[Either[A, B]]): TimedTask[B] =
TimedTask({ (scheduler, ec) =>
def loop(na: A): Task[B] = { f(na).runNow(scheduler, ec).flatMap(_.fold(loop, Task.now)) }
TimedTask { es =>
def loop(na: A): Task[B] = { f(na).runNow(es).flatMap(_.fold(loop, Task.now)) }
loop(a)
})
}

override def toString = "Monad[Task]"

}

final def now[A](value: A): TimedTask[A] = TimedTask((_, _) => Task.now(value))
final def now[A](value: A): TimedTask[A] =
TimedTask(_ => Task.now(value))

implicit final def fromTask[A](task: Task[A]): TimedTask[A] =
TimedTask((_, _) => task)
TimedTask(_ => task)

final def fromTask[A](task: Task[A], timeout: Option[FiniteDuration] = None): TimedTask[A] =
TimedTask((_, _) => task, timeout)
TimedTask(_ => task, timeout)

implicit val timedTaskSequenceCached: SequenceCached[TimedTask] = new SequenceCached[TimedTask] {
def apply[X](cache: Cache, key: AnyRef, sequenceKey: Int, tx: =>TimedTask[X]): TimedTask[X] = TimedTask { (scheduler, ec) =>
implicit val executionContext = ec
def apply[X](cache: Cache, key: AnyRef, sequenceKey: Int, tx: =>TimedTask[X]): TimedTask[X] = TimedTask { es =>
implicit val executionContext = es.executionContext
// there is no built-in memoization for Scalaz tasks so we need to memoize future instead
lazy val cached = cache.memo((key, sequenceKey), taskToFuture(tx.runNow(scheduler, ec)))
lazy val cached = cache.memo((key, sequenceKey), taskToFuture(tx.runNow(es)))

Task async { cb =>
cached.onComplete {
Expand Down Expand Up @@ -111,31 +114,46 @@ trait TaskTypes {

trait TaskCreation extends TaskTypes {

final def taskWithExecutors[R :_task, A](c: (Scheduler, ExecutionContext) => Task[A],
final def taskWithExecutors[R :_task, A](run: ExecutorServices => Task[A],
timeout: Option[FiniteDuration] = None): Eff[R, A] =
Eff.send[TimedTask, R, A](TimedTask(c, timeout))
Eff.send[TimedTask, R, A](TimedTask(run, timeout))

final def fromTask[R :_task, A](task: Task[A], timeout: Option[FiniteDuration] = None): Eff[R, A] =
TimedTask((_, _) => task, timeout).send[R]
TimedTask(_ => task, timeout).send[R]

final def taskFailed[R :_task, A](t: Throwable): Eff[R, A] =
TimedTask((_, _) => Task.fromDisjunction[Throwable, A](-\/(t))).send[R]
TimedTask(_ => Task.fromDisjunction[Throwable, A](-\/(t))).send[R]

final def taskSuspend[R :_task, A](task: =>Task[Eff[R, A]], timeout: Option[FiniteDuration] = None): Eff[R, A] =
TimedTask(_ => Task.suspend(task), timeout).send[R].flatten

final def taskDelay[R :_task, A](call: =>A, timeout: Option[FiniteDuration] = None): Eff[R, A] =
TimedTask(_ => Task.delay(call), timeout).send[R]

/** use a specific executor service to run the task */
final def taskFork[R :_task, A](call: Task[A], executorService: ExecutorService, timeout: FiniteDuration): Eff[R, A] =
taskForkWithExecutorServiceAndTimeout(call, executorService, Some(timeout))

/** use a specific executor service to run the task */
final def taskFork[R :_task, A](call: Task[A], executorService: ExecutorService): Eff[R, A] =
taskForkWithExecutorServiceAndTimeout(call, executorService, None)

final def taskForkWithExecutorServiceAndTimeout[R :_task, A](call: Task[A], executorService: ExecutorService, timeout: Option[FiniteDuration]): Eff[R, A] =
TimedTask(_ => Task.fork(call)(executorService), timeout).send

final def taskSuspend[R :_task, A](task: => Task[Eff[R, A]], timeout: Option[FiniteDuration] = None): Eff[R, A] =
TimedTask((_, _) => Task.suspend(task), timeout).send[R].flatten
final def taskFork[R :_task, A](call: Task[A], timeout: FiniteDuration): Eff[R, A] =
taskForkWithTimeout(call, Some(timeout))

final def taskDelay[R :_task, A](call: => A, timeout: Option[FiniteDuration] = None): Eff[R, A] =
TimedTask((_, _) => Task.delay(call), timeout).send[R]
final def taskFork[R :_task, A](call: Task[A]): Eff[R, A] =
taskForkWithTimeout(call, None)

final def taskForkStrategy[R :_task, A](call: Task[A], executorService: ExecutorService, timeout: Option[FiniteDuration] = None): Eff[R, A] =
TimedTask((_, _) => Task.fork(call)(executorService), timeout).send
final def taskForkWithTimeout[R :_task, A](call: Task[A], timeout: Option[FiniteDuration]): Eff[R, A] =
TimedTask(es => Task.fork(call)(es.executorService), timeout).send

final def taskFork[R :_task, A](call: Task[A], timeout: Option[FiniteDuration] = None): Eff[R, A] =
TimedTask((_, _) => Task.fork(call), timeout).send

final def async[R :_task, A](callbackConsumer: ((Throwable Either A) => Unit) => Unit,
timeout: Option[FiniteDuration] = None): Eff[R, A] =
TimedTask((_, _) => Task.async[A] { cb =>
TimedTask(_ => Task.async[A] { cb =>
callbackConsumer(tea => cb(\/.fromEither(tea)))
}, timeout).send[R]

Expand All @@ -145,14 +163,14 @@ object TaskCreation extends TaskTypes

trait TaskInterpretation extends TaskTypes {

def runAsync[R, A](e: Eff[R, A])(implicit scheduler: Scheduler, ec: ExecutionContext, m: Member.Aux[TimedTask, R, NoFx]): Task[A] =
Eff.detachA(e)(TimedTask.TimedTaskMonad, TimedTask.TimedTaskApplicative, m).runNow(scheduler, ec)
def runAsync[R, A](e: Eff[R, A])(implicit es: ExecutorServices, m: Member.Aux[TimedTask, R, NoFx]): Task[A] =
Eff.detachA(e)(TimedTask.TimedTaskMonad, TimedTask.TimedTaskApplicative, m).runNow(es)

def runSequential[R, A](e: Eff[R, A])(implicit scheduler: Scheduler, ec: ExecutionContext, m: Member.Aux[TimedTask, R, NoFx]): Task[A] =
Eff.detach(e)(TimedTask.TimedTaskMonad, m).runNow(scheduler, ec)
def runSequential[R, A](e: Eff[R, A])(implicit es: ExecutorServices, m: Member.Aux[TimedTask, R, NoFx]): Task[A] =
Eff.detach(e)(TimedTask.TimedTaskMonad, m).runNow(es)

def attempt[A](task: TimedTask[A]): TimedTask[Throwable Either A] = {
TimedTask(task = (scheduler, ec) => task.runNow(scheduler, ec).attempt.map(_.toEither))
TimedTask(es => task.runNow(es).attempt.map(_.toEither))
}

import interpret.of
Expand All @@ -165,8 +183,8 @@ trait TaskInterpretation extends TaskTypes {

/** memoize the task result */
def memoize[A](key: AnyRef, cache: Cache, task: TimedTask[A]): TimedTask[A] =
TimedTask((scheduler, ec) => Task.suspend {
cache.get(key).fold(task.runNow(scheduler, ec).map { r => cache.put(key, r); r })(Task.now)
TimedTask(es => Task.suspend {
cache.get(key).fold(task.runNow(es).map { r => cache.put(key, r); r })(Task.now)
})


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ package object scalaz {
}

object EffScalaz {

def traverseA[R, F[_] : Traverse, A, B](fs: F[A])(f: A => Eff[R, B]): Eff[R, F[B]] =
Traverse[F].traverse(fs)(f)(EffScalazApplicative[R])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ final class TaskOps[R, A](val e: Eff[R, A]) extends AnyVal {
def taskMemo(key: AnyRef, cache: Cache)(implicit async: TimedTask /= R): Eff[R, A] =
TaskInterpretation.taskMemo(key, cache, e)

def runAsync(implicit scheduler: Scheduler, ec: ExecutionContext, m: Member.Aux[TimedTask, R, NoFx]): Task[A] =
TaskInterpretation.runAsync(e)
def runAsync(implicit es: ExecutorServices, m: Member.Aux[TimedTask, R, NoFx]): Task[A] =
TaskInterpretation.runAsync(e)(es, m)

def runSequential(implicit scheduler: Scheduler, ec: ExecutionContext, m: Member.Aux[TimedTask, R, NoFx]): Task[A] =
TaskInterpretation.runSequential(e)
def runSequential(implicit es: ExecutorServices, m: Member.Aux[TimedTask, R, NoFx]): Task[A] =
TaskInterpretation.runSequential(e)(es, m)
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ class TaskEffectSpec(implicit ee: ExecutionEnv) extends Specification with Scala

type S = Fx.fx2[TimedTask, Option]

implicit val scheduler = ExecutorServices.schedulerFromScheduledExecutorService(ee.ses)
implicit val ec = ee.ec
implicit val executorService = ExecutorServices.fromExecutorService(ee.es)

def e1 = {
def action[R :_task :_option]: Eff[R, Int] = for {
Expand Down
2 changes: 1 addition & 1 deletion try-eff.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
test -e ~/.coursier/cr || (mkdir -p ~/.coursier && wget -q -O ~/.coursier/cr https://git.io/vgvpD && chmod +x ~/.coursier/cr)
CLASSPATH="$(~/.coursier/cr fetch -q -p \
\
org.atnos:eff_2.12:4.0.2 \
org.atnos:eff_2.12:4.1.0 \
com.lihaoyi:ammonite_2.12.1:0.8.1 \
\
)" java ammonite.Main --predef 'import org.atnos.eff._, all._, syntax.all._'
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "4.0.2"
version in ThisBuild := "4.1.0"

0 comments on commit 3099353

Please sign in to comment.