Skip to content

Commit

Permalink
use a scheduler instead of an scheduled executor service for future/t…
Browse files Browse the repository at this point in the history
…ask effects (#74)

except for monix task which can be directly embedded into Eff and support timeout
functionality
  • Loading branch information
etorreborre committed Mar 8, 2017
1 parent 7294e3b commit de60e57
Show file tree
Hide file tree
Showing 21 changed files with 249 additions and 257 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package org.atnos.eff.addon.fs2
import cats._
import cats.implicits._
import fs2._
import org.atnos.eff._
import org.atnos.eff.{ Scheduler => _, _}
import org.atnos.eff.syntax.all._

import scala.concurrent.duration.FiniteDuration
Expand Down
23 changes: 23 additions & 0 deletions js/src/main/scala/org/atnos/eff/Schedulers.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.atnos.eff

import scala.concurrent.duration.FiniteDuration
import scala.scalajs.js.timers._

trait Schedulers {

/**
* Default Scheduler for JavaScript
*/
def default: Scheduler = new Scheduler {
def schedule(timedout: =>Unit, duration: FiniteDuration): () => Unit = {
val handle = setTimeout(duration)(timedout)
() => clearTimeout(handle)
}

override def toString = "Scheduler"
}

}

object Schedulers extends Schedulers

9 changes: 9 additions & 0 deletions jvm/src/main/scala/org/atnos/eff/Schedulers.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.atnos.eff

import java.util.concurrent.{ScheduledExecutorService, TimeUnit}

import scala.concurrent.duration._

trait Schedulers {

}
7 changes: 5 additions & 2 deletions jvm/src/test/scala/org/atnos/eff/FutureEffectSpec.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package org.atnos.eff

import java.util.concurrent.{Callable, TimeUnit}
import java.util.Date

import cats.Eval
import cats.implicits._
import org.atnos.eff.all._
import org.atnos.eff.future._
Expand All @@ -11,7 +15,6 @@ import org.specs2.concurrent.ExecutionEnv
import scala.collection.mutable.ListBuffer
import scala.concurrent._
import duration._

import org.specs2.matcher.ThrownExpectations

import scala.util.control._
Expand All @@ -37,7 +40,7 @@ class FutureEffectSpec(implicit ee: ExecutionEnv) extends Specification with Sca

type S = Fx.fx2[TimedFuture, Option]

implicit val ses = ee.ses
implicit val scheduler = ExecutorServices.schedulerFromScheduledExecutorService(ee.ses)
implicit val ec = ee.ec

def e1 = {
Expand Down
9 changes: 3 additions & 6 deletions jvm/src/test/scala/org/atnos/site/ApplicativeEvaluation.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package org.atnos.site

import java.util.concurrent.ScheduledThreadPoolExecutor

import scala.annotation.tailrec


Expand All @@ -24,7 +22,7 @@ type _writerString[R] = WriterString |= R

type S = Fx.fx3[Eval, TimedFuture, WriterString]

implicit val ses = new ScheduledThreadPoolExecutor(Runtime.getRuntime.availableProcessors())
implicit val scheduler = ExecutorServices.schedulerFromGlobalExecutionContext

def execute[E :_eval :_writerString :_future](i: Int): Eff[E, Int] =
for {
Expand Down Expand Up @@ -53,7 +51,7 @@ type WriterString[A] = Writer[String, A]
type _writerString[R] = WriterString |= R

type S = Fx.fx3[Eval, TimedFuture, WriterString]
val ses = new ScheduledThreadPoolExecutor(Runtime.getRuntime.availableProcessors())
implicit val scheduler = ExecutorServices.schedulerFromGlobalExecutionContext

def execute[E :_eval :_writerString :_future](i: Int): Eff[E, Int] =
for {
Expand All @@ -66,8 +64,7 @@ def execute[E :_eval :_writerString :_future](i: Int): Eff[E, Int] =
val action: Eff[S, List[Int]] =
List(1000, 500, 50).traverseA(execute[S])

Await.result(Eff.detachA(action.runEval.runWriterLog[String])(TimedFuture.MonadTimedFuture, TimedFuture.ApplicativeTimedFuture).runNow(ses, global
), 2.seconds)
Await.result(Eff.detachA(action.runEval.runWriterLog[String])(TimedFuture.MonadTimedFuture, TimedFuture.ApplicativeTimedFuture).runNow(scheduler, global), 2.seconds)
}.eval}

This uses now `traverseA` (instead of `traverse`) to do an applicative traversal and execute futures concurrently and
Expand Down
6 changes: 3 additions & 3 deletions jvm/src/test/scala/org/atnos/site/OutOfTheBox.scala
Original file line number Diff line number Diff line change
Expand Up @@ -392,10 +392,10 @@ val action: Eff[R, Int] =
} yield b

/*p
Then we need to pass a `ScheduledExecutorService` and `ExecutionContext` in to begin the computation.
Then we need to pass a `Scheduler` and an `ExecutionContext` in to begin the computation.
*/

implicit val ses = new ScheduledThreadPoolExecutor(Runtime.getRuntime.availableProcessors())
implicit val scheduler = ExecutorServices.schedulerFromGlobalExecutionContext
import org.atnos.eff.syntax.future._

Await.result(action.runOption.runSequential, 1 second)
Expand Down Expand Up @@ -436,7 +436,7 @@ def expensive[R :_Future :_memo]: Eff[R, Int] =

type S = Fx.fx2[Memoized, TimedFuture]

implicit val ses = new ScheduledThreadPoolExecutor(Runtime.getRuntime.availableProcessors())
implicit val scheduler = ExecutorServices.schedulerFromGlobalExecutionContext

val futureMemo: Future[Int] =
(expensive[S] >> expensive[S]).runFutureMemo(ConcurrentHashMapCache()).runSequential
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ class TaskEffectSpec(implicit ee: ExecutionEnv) extends Specification with Scala

"""

type S = Fx.fx2[TimedTask, Option]

implicit val ses = ee.ses
type S = Fx.fx2[Task, Option]

def e1 = {
def action[R :_task :_option]: Eff[R, Int] = for {
Expand Down Expand Up @@ -77,7 +75,7 @@ class TaskEffectSpec(implicit ee: ExecutionEnv) extends Specification with Scala
}

def e5 = {
type R = Fx.fx1[TimedTask]
type R = Fx.fx1[Task]

def loop(i: Int): Task[Eff[R, Int]] =
if (i == 0) Task.now(Eff.pure(1))
Expand Down
140 changes: 37 additions & 103 deletions monix/shared/src/main/scala/org/atnos/eff/addon/monix/TaskEffect.scala
Original file line number Diff line number Diff line change
@@ -1,139 +1,73 @@
package org.atnos.eff.addon.monix

import java.util.concurrent.ScheduledExecutorService

import cats._
import cats.implicits._
import monix.eval._
import monix.cats._
import monix.execution._
import org.atnos.eff._
import org.atnos.eff.{Scheduler =>_, _}
import org.atnos.eff.syntax.all._

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Promise, TimeoutException}
import scala.util._

case class TimedTask[A](task: ScheduledExecutorService => Task[A], timeout: Option[FiniteDuration] = None) {
def runNow(sexs: ScheduledExecutorService): Task[A] = timeout.fold(task(sexs)) { t =>
Task.unsafeCreate[A] { (context, callback) =>
val promise = Promise[A]
val onTimeout = new Runnable {
override def run(): Unit = {
val _ = promise.tryFailure(new TimeoutException)
}
}
sexs.schedule(onTimeout, t.length, t.unit)
Task.unsafeStartAsync(task(sexs), context, new Callback[A] {
def onSuccess(value: A): Unit = {
val _ = promise.trySuccess(value)
}

def onError(ex: Throwable): Unit = {
val _ = promise.tryFailure(ex)
}
})
promise.future.onComplete(callback)(context.scheduler)
}
}
}

object TimedTask {
final def TimedTaskApplicative: Applicative[TimedTask] = new Applicative[TimedTask] {
def pure[A](x: A) = TimedTask(_ => Task.now(x))

def ap[A, B](ff: TimedTask[(A) => B])(fa: TimedTask[A]) =
TimedTask(sexs => Task.mapBoth(ff.runNow(sexs), fa.runNow(sexs))(_ (_)))
}

implicit final def TimedTaskMonad: Monad[TimedTask] = new Monad[TimedTask] {
def pure[A](x: A) = TimedTask(_ => Task.now(x))

def flatMap[A, B](fa: TimedTask[A])(f: (A) => TimedTask[B]) =
TimedTask(sexs => fa.runNow(sexs).flatMap(f(_).runNow(sexs)))

def tailRecM[A, B](a: A)(f: (A) => TimedTask[Either[A, B]]): TimedTask[B] =
TimedTask[B]({ sexs =>
def loop(na: A): Task[B] = f(na).runNow(sexs).flatMap(_.fold(loop, Task.now))
loop(a)
})
}

final def now[A](value: A): TimedTask[A] =
TimedTask(_ => Task.now(value))

implicit final def fromTask[A](task: Task[A]): TimedTask[A] =
TimedTask(_ => task)

final def fromTask[A](task: Task[A], timeout: Option[FiniteDuration] = None): TimedTask[A] =
TimedTask(_ => task, timeout)

}

trait TaskTypes {
type _task[R] = |=[TimedTask, R]
type _Task[R] = <=[TimedTask, R]
type _task[R] = |=[Task, R]
type _Task[R] = <=[Task, R]
}

trait TaskCreation extends TaskTypes {

final def taskWithContext[R :_task, A](c: ScheduledExecutorService => Task[A],
timeout: Option[FiniteDuration] = None): Eff[R, A] =
TimedTask(c, timeout).send

final def fromTask[R :_task, A](task: Task[A], timeout: Option[FiniteDuration] = None): Eff[R, A] =
TimedTask(_ => task, timeout).send[R]
timeout.fold(task)(t => task.timeout(t)).send[R]

final def taskFailed[R :_task, A](t: Throwable): Eff[R, A] =
TimedTask(_ => Task.fromTry[A](Failure(t))).send
fromTask(Task.fromTry[A](Failure(t)))

final def taskSuspend[R :_task, A](task: => Task[Eff[R, A]], timeout: Option[FiniteDuration] = None): Eff[R, A] =
TimedTask(_ => Task.suspend(task), timeout).send.flatten
final def taskSuspend[R :_task, A](task: =>Task[Eff[R, A]], timeout: Option[FiniteDuration] = None): Eff[R, A] =
fromTask(Task.suspend(task), timeout).flatten

final def taskDelay[R :_task, A](call: => A, timeout: Option[FiniteDuration] = None): Eff[R, A] =
TimedTask(_ => Task.delay(call), timeout).send
fromTask(Task.delay(call), timeout)

final def taskForkScheduler[R :_task, A](call: Task[A], scheduler: Scheduler, timeout: Option[FiniteDuration] = None): Eff[R, A] =
TimedTask(_ => Task.fork(call, scheduler), timeout).send
fromTask(Task.fork(call, scheduler), timeout)

final def taskFork[R :_task, A](call: Task[A], timeout: Option[FiniteDuration] = None): Eff[R, A] =
TimedTask(_ => Task.fork(call), timeout).send
fromTask(Task.fork(call), timeout)

final def taskAsync[R :_task, A](callbackConsumer: ((Throwable Either A) => Unit) => Unit,
timeout: Option[FiniteDuration] = None): Eff[R, A] =
TimedTask(_ => Task.async[A] { (_, cb) =>
callbackConsumer(tea => cb(tea.fold(Failure(_), Success(_)))); Cancelable.empty
}, timeout).send

final def taskAsyncScheduler[R :_task, A](callbackConsumer: ((Throwable Either A) => Unit) => Cancelable,
scheduler: Scheduler,
timeout: Option[FiniteDuration] = None): Eff[R, A] =
TimedTask(_ => Task.async[A] { (_, cb) =>
timeout: Option[FiniteDuration] = None): Eff[R, A] = {
val async = Task.async[A] { (_, cb) =>
callbackConsumer(tea => cb(tea.fold(Failure(_), Success(_))))
}, timeout).send

Cancelable.empty
}
fromTask(async, timeout)
}
}

object TaskCreation extends TaskCreation

trait TaskInterpretation extends TaskTypes {

def runAsync[R, A](e: Eff[R, A])(implicit sexs: ScheduledExecutorService, m: Member.Aux[TimedTask, R, NoFx]): Task[A] =
Eff.detachA(e)(TimedTask.TimedTaskMonad, TimedTask.TimedTaskApplicative, m).runNow(sexs)
private val monixTaskMonad: Monad[Task] =
Monad[Task]

def runSequential[R, A](e: Eff[R, A])(implicit sexs: ScheduledExecutorService, m: Member.Aux[TimedTask, R, NoFx]): Task[A] =
Eff.detach(e)(Monad[TimedTask], m).runNow(sexs)
private val monixTaskApplicative : Applicative[Task] =
monixToCatsApplicative(Task.nondeterminism.applicative)

def attempt[A](task: TimedTask[A]): TimedTask[Throwable Either A] = {
TimedTask[Throwable Either A](sexs => task.runNow(sexs).materialize.map(t => Either.cond(t.isSuccess, t.get, t.failed.get)))
}
def runAsync[R, A](e: Eff[R, A])(implicit m: Member.Aux[Task, R, NoFx]): Task[A] =
Eff.detachA(e)(monixTaskMonad, monixTaskApplicative, m)

def runSequential[R, A](e: Eff[R, A])(implicit m: Member.Aux[Task, R, NoFx]): Task[A] =
Eff.detach(e)(monixTaskMonad, m)

import interpret.of

def taskAttempt[R, A](e: Eff[R, A])(implicit task: TimedTask /= R): Eff[R, Throwable Either A] =
interpret.interceptNatM[R, TimedTask, Throwable Either ?, A](e,
new (TimedTask ~> (TimedTask of (Throwable Either ?))#l) {
override def apply[X](fa: TimedTask[X]): TimedTask[Throwable Either X] =
attempt(fa)
def taskAttempt[R, A](e: Eff[R, A])(implicit task: Task /= R): Eff[R, Throwable Either A] =
interpret.interceptNatM[R, Task, Throwable Either ?, A](e,
new (Task ~> (Task of (Throwable Either ?))#l) {
def apply[X](fa: Task[X]): Task[Throwable Either X] =
fa.attempt
})

/** memoize the task result using a cache */
Expand All @@ -147,18 +81,18 @@ trait TaskInterpretation extends TaskTypes {
*
* if this method is called with the same key the previous value will be returned
*/
def taskMemo[R, A](key: AnyRef, cache: Cache, e: Eff[R, A])(implicit task: TimedTask /= R): Eff[R, A] =
def taskMemo[R, A](key: AnyRef, cache: Cache, e: Eff[R, A])(implicit task: Task /= R): Eff[R, A] =
Eff.memoizeEffect(e, cache, key)

/**
* Memoize task values using a memoization effect
*
* if this method is called with the same key the previous value will be returned
*/
def taskMemoized[R, A](key: AnyRef, e: Eff[R, A])(implicit task: TimedTask /= R, m: Memoized |= R): Eff[R, A] =
def taskMemoized[R, A](key: AnyRef, e: Eff[R, A])(implicit task: Task /= R, m: Memoized |= R): Eff[R, A] =
MemoEffect.getCache[R].flatMap(cache => taskMemo(key, cache, e))

def runTaskMemo[R, U, A](cache: Cache)(effect: Eff[R, A])(implicit m: Member.Aux[Memoized, R, U], task: TimedTask |= U): Eff[U, A] = {
def runTaskMemo[R, U, A](cache: Cache)(effect: Eff[R, A])(implicit m: Member.Aux[Memoized, R, U], task: Task |= U): Eff[U, A] = {
interpret.translate(effect)(new Translate[Memoized, U] {
def apply[X](mx: Memoized[X]): Eff[U, X] =
mx match {
Expand All @@ -168,9 +102,9 @@ trait TaskInterpretation extends TaskTypes {
})
}

implicit val timedTaskSequenceCached: SequenceCached[TimedTask] = new SequenceCached[TimedTask] {
def apply[X](cache: Cache, key: AnyRef, sequenceKey: Int, tx: =>TimedTask[X]): TimedTask[X] =
TimedTask(sexs => cache.memo((key, sequenceKey), tx.runNow(sexs).memoize))
implicit val taskSequenceCached: SequenceCached[Task] = new SequenceCached[Task] {
def apply[X](cache: Cache, key: AnyRef, sequenceKey: Int, tx: =>Task[X]): Task[X] =
cache.memo((key, sequenceKey), tx.memoize)
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package org.atnos.eff.syntax.addon.monix

import java.util.concurrent.ScheduledExecutorService

import org.atnos.eff.{Fx, _}
import org.atnos.eff._
import org.atnos.eff.addon.monix._
import _root_.monix.eval.Task
import scala.util.Either
Expand All @@ -15,19 +13,19 @@ trait task {

final class TaskOps[R, A](val e: Eff[R, A]) extends AnyVal {

def runTaskMemo[U](cache: Cache)(implicit m: Member.Aux[Memoized, R, U], task: TimedTask |= U): Eff[U, A] =
def runTaskMemo[U](cache: Cache)(implicit m: Member.Aux[Memoized, R, U], task: Task |= U): Eff[U, A] =
TaskEffect.runTaskMemo(cache)(e)

def taskAttempt(implicit task: TimedTask /= R): Eff[R, Throwable Either A] =
def taskAttempt(implicit task: Task /= R): Eff[R, Throwable Either A] =
TaskInterpretation.taskAttempt(e)

def taskMemo(key: AnyRef, cache: Cache)(implicit task: TimedTask /= R): Eff[R, A] =
def taskMemo(key: AnyRef, cache: Cache)(implicit task: Task /= R): Eff[R, A] =
TaskInterpretation.taskMemo(key, cache, e)

def runAsync(implicit sexs: ScheduledExecutorService, m: Member.Aux[TimedTask, R, NoFx]): Task[A] =
def runAsync(implicit m: Member.Aux[Task, R, NoFx]): Task[A] =
TaskInterpretation.runAsync(e)

def runSequential(implicit sexs: ScheduledExecutorService, m: Member.Aux[TimedTask, R, NoFx]): Task[A] =
def runSequential(implicit m: Member.Aux[Task, R, NoFx]): Task[A] =
TaskInterpretation.runSequential(e)
}

Expand Down
Loading

0 comments on commit de60e57

Please sign in to comment.