From de60e576519888c1a9b2a5cac1ff23b90c333240 Mon Sep 17 00:00:00 2001 From: Eric Torreborre Date: Wed, 8 Mar 2017 07:50:03 +0100 Subject: [PATCH] use a scheduler instead of an scheduled executor service for future/task effects (#74) except for monix task which can be directly embedded into Eff and support timeout functionality --- .../org/atnos/eff/addon/fs2/TaskEffect.scala | 2 +- .../main/scala/org/atnos/eff/Schedulers.scala | 23 +++ .../main/scala/org/atnos/eff/Schedulers.scala | 9 ++ .../org/atnos/eff/FutureEffectSpec.scala | 7 +- .../atnos/site/ApplicativeEvaluation.scala | 9 +- .../scala/org/atnos/site/OutOfTheBox.scala | 6 +- .../eff/addon/monix/TaskEffectSpec.scala | 6 +- .../atnos/eff/addon/monix/TaskEffect.scala | 140 +++++------------- .../atnos/eff/syntax/addon/monix/task.scala | 14 +- notes/3.1.0.markdown | 14 ++ .../addon/scalaz/concurrent/TaskEffect.scala | 55 ++++--- .../atnos/eff/syntax/addon/scalaz/task.scala | 8 +- .../scalaz/concurrent/TaskEffectSpec.scala | 2 +- .../org/atnos/eff/ExecutorServices.scala | 22 ++- .../scala/org/atnos/eff/FutureEffect.scala | 105 +++++++------ .../main/scala/org/atnos/eff/Scheduler.scala | 13 ++ .../scala/org/atnos/eff/syntax/future.scala | 6 +- .../addon/twitter/TwitterFutureEffect.scala | 55 +++---- .../eff/syntax/addon/twitter/future.scala | 6 +- .../twitter/TwitterFutureEffectSpec.scala | 2 +- version.sbt | 2 +- 21 files changed, 249 insertions(+), 257 deletions(-) create mode 100644 js/src/main/scala/org/atnos/eff/Schedulers.scala create mode 100644 jvm/src/main/scala/org/atnos/eff/Schedulers.scala create mode 100644 notes/3.1.0.markdown create mode 100644 shared/src/main/scala/org/atnos/eff/Scheduler.scala diff --git a/fs2/shared/src/main/scala/org/atnos/eff/addon/fs2/TaskEffect.scala b/fs2/shared/src/main/scala/org/atnos/eff/addon/fs2/TaskEffect.scala index 8d41cf6c..761d8ef4 100644 --- a/fs2/shared/src/main/scala/org/atnos/eff/addon/fs2/TaskEffect.scala +++ b/fs2/shared/src/main/scala/org/atnos/eff/addon/fs2/TaskEffect.scala @@ -3,7 +3,7 @@ package org.atnos.eff.addon.fs2 import cats._ import cats.implicits._ import fs2._ -import org.atnos.eff._ +import org.atnos.eff.{ Scheduler => _, _} import org.atnos.eff.syntax.all._ import scala.concurrent.duration.FiniteDuration diff --git a/js/src/main/scala/org/atnos/eff/Schedulers.scala b/js/src/main/scala/org/atnos/eff/Schedulers.scala new file mode 100644 index 00000000..b0a0f68f --- /dev/null +++ b/js/src/main/scala/org/atnos/eff/Schedulers.scala @@ -0,0 +1,23 @@ +package org.atnos.eff + +import scala.concurrent.duration.FiniteDuration +import scala.scalajs.js.timers._ + +trait Schedulers { + + /** + * Default Scheduler for JavaScript + */ + def default: Scheduler = new Scheduler { + def schedule(timedout: =>Unit, duration: FiniteDuration): () => Unit = { + val handle = setTimeout(duration)(timedout) + () => clearTimeout(handle) + } + + override def toString = "Scheduler" + } + +} + +object Schedulers extends Schedulers + diff --git a/jvm/src/main/scala/org/atnos/eff/Schedulers.scala b/jvm/src/main/scala/org/atnos/eff/Schedulers.scala new file mode 100644 index 00000000..f23f1f23 --- /dev/null +++ b/jvm/src/main/scala/org/atnos/eff/Schedulers.scala @@ -0,0 +1,9 @@ +package org.atnos.eff + +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import scala.concurrent.duration._ + +trait Schedulers { + +} diff --git a/jvm/src/test/scala/org/atnos/eff/FutureEffectSpec.scala b/jvm/src/test/scala/org/atnos/eff/FutureEffectSpec.scala index 62e1e2da..e92cf80b 100644 --- a/jvm/src/test/scala/org/atnos/eff/FutureEffectSpec.scala +++ b/jvm/src/test/scala/org/atnos/eff/FutureEffectSpec.scala @@ -1,5 +1,9 @@ package org.atnos.eff +import java.util.concurrent.{Callable, TimeUnit} +import java.util.Date + +import cats.Eval import cats.implicits._ import org.atnos.eff.all._ import org.atnos.eff.future._ @@ -11,7 +15,6 @@ import org.specs2.concurrent.ExecutionEnv import scala.collection.mutable.ListBuffer import scala.concurrent._ import duration._ - import org.specs2.matcher.ThrownExpectations import scala.util.control._ @@ -37,7 +40,7 @@ class FutureEffectSpec(implicit ee: ExecutionEnv) extends Specification with Sca type S = Fx.fx2[TimedFuture, Option] - implicit val ses = ee.ses + implicit val scheduler = ExecutorServices.schedulerFromScheduledExecutorService(ee.ses) implicit val ec = ee.ec def e1 = { diff --git a/jvm/src/test/scala/org/atnos/site/ApplicativeEvaluation.scala b/jvm/src/test/scala/org/atnos/site/ApplicativeEvaluation.scala index 4b51f118..1b759a79 100644 --- a/jvm/src/test/scala/org/atnos/site/ApplicativeEvaluation.scala +++ b/jvm/src/test/scala/org/atnos/site/ApplicativeEvaluation.scala @@ -1,7 +1,5 @@ package org.atnos.site -import java.util.concurrent.ScheduledThreadPoolExecutor - import scala.annotation.tailrec @@ -24,7 +22,7 @@ type _writerString[R] = WriterString |= R type S = Fx.fx3[Eval, TimedFuture, WriterString] -implicit val ses = new ScheduledThreadPoolExecutor(Runtime.getRuntime.availableProcessors()) +implicit val scheduler = ExecutorServices.schedulerFromGlobalExecutionContext def execute[E :_eval :_writerString :_future](i: Int): Eff[E, Int] = for { @@ -53,7 +51,7 @@ type WriterString[A] = Writer[String, A] type _writerString[R] = WriterString |= R type S = Fx.fx3[Eval, TimedFuture, WriterString] -val ses = new ScheduledThreadPoolExecutor(Runtime.getRuntime.availableProcessors()) +implicit val scheduler = ExecutorServices.schedulerFromGlobalExecutionContext def execute[E :_eval :_writerString :_future](i: Int): Eff[E, Int] = for { @@ -66,8 +64,7 @@ def execute[E :_eval :_writerString :_future](i: Int): Eff[E, Int] = val action: Eff[S, List[Int]] = List(1000, 500, 50).traverseA(execute[S]) -Await.result(Eff.detachA(action.runEval.runWriterLog[String])(TimedFuture.MonadTimedFuture, TimedFuture.ApplicativeTimedFuture).runNow(ses, global -), 2.seconds) +Await.result(Eff.detachA(action.runEval.runWriterLog[String])(TimedFuture.MonadTimedFuture, TimedFuture.ApplicativeTimedFuture).runNow(scheduler, global), 2.seconds) }.eval} This uses now `traverseA` (instead of `traverse`) to do an applicative traversal and execute futures concurrently and diff --git a/jvm/src/test/scala/org/atnos/site/OutOfTheBox.scala b/jvm/src/test/scala/org/atnos/site/OutOfTheBox.scala index 41de6e69..54345173 100644 --- a/jvm/src/test/scala/org/atnos/site/OutOfTheBox.scala +++ b/jvm/src/test/scala/org/atnos/site/OutOfTheBox.scala @@ -392,10 +392,10 @@ val action: Eff[R, Int] = } yield b /*p -Then we need to pass a `ScheduledExecutorService` and `ExecutionContext` in to begin the computation. +Then we need to pass a `Scheduler` and an `ExecutionContext` in to begin the computation. */ -implicit val ses = new ScheduledThreadPoolExecutor(Runtime.getRuntime.availableProcessors()) +implicit val scheduler = ExecutorServices.schedulerFromGlobalExecutionContext import org.atnos.eff.syntax.future._ Await.result(action.runOption.runSequential, 1 second) @@ -436,7 +436,7 @@ def expensive[R :_Future :_memo]: Eff[R, Int] = type S = Fx.fx2[Memoized, TimedFuture] -implicit val ses = new ScheduledThreadPoolExecutor(Runtime.getRuntime.availableProcessors()) +implicit val scheduler = ExecutorServices.schedulerFromGlobalExecutionContext val futureMemo: Future[Int] = (expensive[S] >> expensive[S]).runFutureMemo(ConcurrentHashMapCache()).runSequential diff --git a/monix/jvm/src/test/scala/org/atnos/eff/addon/monix/TaskEffectSpec.scala b/monix/jvm/src/test/scala/org/atnos/eff/addon/monix/TaskEffectSpec.scala index 09007d89..d2631d78 100644 --- a/monix/jvm/src/test/scala/org/atnos/eff/addon/monix/TaskEffectSpec.scala +++ b/monix/jvm/src/test/scala/org/atnos/eff/addon/monix/TaskEffectSpec.scala @@ -31,9 +31,7 @@ class TaskEffectSpec(implicit ee: ExecutionEnv) extends Specification with Scala """ - type S = Fx.fx2[TimedTask, Option] - - implicit val ses = ee.ses + type S = Fx.fx2[Task, Option] def e1 = { def action[R :_task :_option]: Eff[R, Int] = for { @@ -77,7 +75,7 @@ class TaskEffectSpec(implicit ee: ExecutionEnv) extends Specification with Scala } def e5 = { - type R = Fx.fx1[TimedTask] + type R = Fx.fx1[Task] def loop(i: Int): Task[Eff[R, Int]] = if (i == 0) Task.now(Eff.pure(1)) diff --git a/monix/shared/src/main/scala/org/atnos/eff/addon/monix/TaskEffect.scala b/monix/shared/src/main/scala/org/atnos/eff/addon/monix/TaskEffect.scala index f4b8ba44..2a226c4b 100644 --- a/monix/shared/src/main/scala/org/atnos/eff/addon/monix/TaskEffect.scala +++ b/monix/shared/src/main/scala/org/atnos/eff/addon/monix/TaskEffect.scala @@ -1,139 +1,73 @@ package org.atnos.eff.addon.monix -import java.util.concurrent.ScheduledExecutorService - import cats._ import cats.implicits._ import monix.eval._ +import monix.cats._ import monix.execution._ -import org.atnos.eff._ +import org.atnos.eff.{Scheduler =>_, _} import org.atnos.eff.syntax.all._ - import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{Promise, TimeoutException} import scala.util._ -case class TimedTask[A](task: ScheduledExecutorService => Task[A], timeout: Option[FiniteDuration] = None) { - def runNow(sexs: ScheduledExecutorService): Task[A] = timeout.fold(task(sexs)) { t => - Task.unsafeCreate[A] { (context, callback) => - val promise = Promise[A] - val onTimeout = new Runnable { - override def run(): Unit = { - val _ = promise.tryFailure(new TimeoutException) - } - } - sexs.schedule(onTimeout, t.length, t.unit) - Task.unsafeStartAsync(task(sexs), context, new Callback[A] { - def onSuccess(value: A): Unit = { - val _ = promise.trySuccess(value) - } - - def onError(ex: Throwable): Unit = { - val _ = promise.tryFailure(ex) - } - }) - promise.future.onComplete(callback)(context.scheduler) - } - } -} - -object TimedTask { - final def TimedTaskApplicative: Applicative[TimedTask] = new Applicative[TimedTask] { - def pure[A](x: A) = TimedTask(_ => Task.now(x)) - - def ap[A, B](ff: TimedTask[(A) => B])(fa: TimedTask[A]) = - TimedTask(sexs => Task.mapBoth(ff.runNow(sexs), fa.runNow(sexs))(_ (_))) - } - - implicit final def TimedTaskMonad: Monad[TimedTask] = new Monad[TimedTask] { - def pure[A](x: A) = TimedTask(_ => Task.now(x)) - - def flatMap[A, B](fa: TimedTask[A])(f: (A) => TimedTask[B]) = - TimedTask(sexs => fa.runNow(sexs).flatMap(f(_).runNow(sexs))) - - def tailRecM[A, B](a: A)(f: (A) => TimedTask[Either[A, B]]): TimedTask[B] = - TimedTask[B]({ sexs => - def loop(na: A): Task[B] = f(na).runNow(sexs).flatMap(_.fold(loop, Task.now)) - loop(a) - }) - } - - final def now[A](value: A): TimedTask[A] = - TimedTask(_ => Task.now(value)) - - implicit final def fromTask[A](task: Task[A]): TimedTask[A] = - TimedTask(_ => task) - - final def fromTask[A](task: Task[A], timeout: Option[FiniteDuration] = None): TimedTask[A] = - TimedTask(_ => task, timeout) - -} - trait TaskTypes { - type _task[R] = |=[TimedTask, R] - type _Task[R] = <=[TimedTask, R] + type _task[R] = |=[Task, R] + type _Task[R] = <=[Task, R] } trait TaskCreation extends TaskTypes { - final def taskWithContext[R :_task, A](c: ScheduledExecutorService => Task[A], - timeout: Option[FiniteDuration] = None): Eff[R, A] = - TimedTask(c, timeout).send - final def fromTask[R :_task, A](task: Task[A], timeout: Option[FiniteDuration] = None): Eff[R, A] = - TimedTask(_ => task, timeout).send[R] + timeout.fold(task)(t => task.timeout(t)).send[R] final def taskFailed[R :_task, A](t: Throwable): Eff[R, A] = - TimedTask(_ => Task.fromTry[A](Failure(t))).send + fromTask(Task.fromTry[A](Failure(t))) - final def taskSuspend[R :_task, A](task: => Task[Eff[R, A]], timeout: Option[FiniteDuration] = None): Eff[R, A] = - TimedTask(_ => Task.suspend(task), timeout).send.flatten + final def taskSuspend[R :_task, A](task: =>Task[Eff[R, A]], timeout: Option[FiniteDuration] = None): Eff[R, A] = + fromTask(Task.suspend(task), timeout).flatten final def taskDelay[R :_task, A](call: => A, timeout: Option[FiniteDuration] = None): Eff[R, A] = - TimedTask(_ => Task.delay(call), timeout).send + fromTask(Task.delay(call), timeout) final def taskForkScheduler[R :_task, A](call: Task[A], scheduler: Scheduler, timeout: Option[FiniteDuration] = None): Eff[R, A] = - TimedTask(_ => Task.fork(call, scheduler), timeout).send + fromTask(Task.fork(call, scheduler), timeout) final def taskFork[R :_task, A](call: Task[A], timeout: Option[FiniteDuration] = None): Eff[R, A] = - TimedTask(_ => Task.fork(call), timeout).send + fromTask(Task.fork(call), timeout) final def taskAsync[R :_task, A](callbackConsumer: ((Throwable Either A) => Unit) => Unit, - timeout: Option[FiniteDuration] = None): Eff[R, A] = - TimedTask(_ => Task.async[A] { (_, cb) => - callbackConsumer(tea => cb(tea.fold(Failure(_), Success(_)))); Cancelable.empty - }, timeout).send - - final def taskAsyncScheduler[R :_task, A](callbackConsumer: ((Throwable Either A) => Unit) => Cancelable, - scheduler: Scheduler, - timeout: Option[FiniteDuration] = None): Eff[R, A] = - TimedTask(_ => Task.async[A] { (_, cb) => + timeout: Option[FiniteDuration] = None): Eff[R, A] = { + val async = Task.async[A] { (_, cb) => callbackConsumer(tea => cb(tea.fold(Failure(_), Success(_)))) - }, timeout).send - + Cancelable.empty + } + fromTask(async, timeout) + } } object TaskCreation extends TaskCreation trait TaskInterpretation extends TaskTypes { - def runAsync[R, A](e: Eff[R, A])(implicit sexs: ScheduledExecutorService, m: Member.Aux[TimedTask, R, NoFx]): Task[A] = - Eff.detachA(e)(TimedTask.TimedTaskMonad, TimedTask.TimedTaskApplicative, m).runNow(sexs) + private val monixTaskMonad: Monad[Task] = + Monad[Task] - def runSequential[R, A](e: Eff[R, A])(implicit sexs: ScheduledExecutorService, m: Member.Aux[TimedTask, R, NoFx]): Task[A] = - Eff.detach(e)(Monad[TimedTask], m).runNow(sexs) + private val monixTaskApplicative : Applicative[Task] = + monixToCatsApplicative(Task.nondeterminism.applicative) - def attempt[A](task: TimedTask[A]): TimedTask[Throwable Either A] = { - TimedTask[Throwable Either A](sexs => task.runNow(sexs).materialize.map(t => Either.cond(t.isSuccess, t.get, t.failed.get))) - } + def runAsync[R, A](e: Eff[R, A])(implicit m: Member.Aux[Task, R, NoFx]): Task[A] = + Eff.detachA(e)(monixTaskMonad, monixTaskApplicative, m) + + def runSequential[R, A](e: Eff[R, A])(implicit m: Member.Aux[Task, R, NoFx]): Task[A] = + Eff.detach(e)(monixTaskMonad, m) import interpret.of - def taskAttempt[R, A](e: Eff[R, A])(implicit task: TimedTask /= R): Eff[R, Throwable Either A] = - interpret.interceptNatM[R, TimedTask, Throwable Either ?, A](e, - new (TimedTask ~> (TimedTask of (Throwable Either ?))#l) { - override def apply[X](fa: TimedTask[X]): TimedTask[Throwable Either X] = - attempt(fa) + def taskAttempt[R, A](e: Eff[R, A])(implicit task: Task /= R): Eff[R, Throwable Either A] = + interpret.interceptNatM[R, Task, Throwable Either ?, A](e, + new (Task ~> (Task of (Throwable Either ?))#l) { + def apply[X](fa: Task[X]): Task[Throwable Either X] = + fa.attempt }) /** memoize the task result using a cache */ @@ -147,7 +81,7 @@ trait TaskInterpretation extends TaskTypes { * * if this method is called with the same key the previous value will be returned */ - def taskMemo[R, A](key: AnyRef, cache: Cache, e: Eff[R, A])(implicit task: TimedTask /= R): Eff[R, A] = + def taskMemo[R, A](key: AnyRef, cache: Cache, e: Eff[R, A])(implicit task: Task /= R): Eff[R, A] = Eff.memoizeEffect(e, cache, key) /** @@ -155,10 +89,10 @@ trait TaskInterpretation extends TaskTypes { * * if this method is called with the same key the previous value will be returned */ - def taskMemoized[R, A](key: AnyRef, e: Eff[R, A])(implicit task: TimedTask /= R, m: Memoized |= R): Eff[R, A] = + def taskMemoized[R, A](key: AnyRef, e: Eff[R, A])(implicit task: Task /= R, m: Memoized |= R): Eff[R, A] = MemoEffect.getCache[R].flatMap(cache => taskMemo(key, cache, e)) - def runTaskMemo[R, U, A](cache: Cache)(effect: Eff[R, A])(implicit m: Member.Aux[Memoized, R, U], task: TimedTask |= U): Eff[U, A] = { + def runTaskMemo[R, U, A](cache: Cache)(effect: Eff[R, A])(implicit m: Member.Aux[Memoized, R, U], task: Task |= U): Eff[U, A] = { interpret.translate(effect)(new Translate[Memoized, U] { def apply[X](mx: Memoized[X]): Eff[U, X] = mx match { @@ -168,9 +102,9 @@ trait TaskInterpretation extends TaskTypes { }) } - implicit val timedTaskSequenceCached: SequenceCached[TimedTask] = new SequenceCached[TimedTask] { - def apply[X](cache: Cache, key: AnyRef, sequenceKey: Int, tx: =>TimedTask[X]): TimedTask[X] = - TimedTask(sexs => cache.memo((key, sequenceKey), tx.runNow(sexs).memoize)) + implicit val taskSequenceCached: SequenceCached[Task] = new SequenceCached[Task] { + def apply[X](cache: Cache, key: AnyRef, sequenceKey: Int, tx: =>Task[X]): Task[X] = + cache.memo((key, sequenceKey), tx.memoize) } } diff --git a/monix/shared/src/main/scala/org/atnos/eff/syntax/addon/monix/task.scala b/monix/shared/src/main/scala/org/atnos/eff/syntax/addon/monix/task.scala index 904a848e..28d9a206 100644 --- a/monix/shared/src/main/scala/org/atnos/eff/syntax/addon/monix/task.scala +++ b/monix/shared/src/main/scala/org/atnos/eff/syntax/addon/monix/task.scala @@ -1,8 +1,6 @@ package org.atnos.eff.syntax.addon.monix -import java.util.concurrent.ScheduledExecutorService - -import org.atnos.eff.{Fx, _} +import org.atnos.eff._ import org.atnos.eff.addon.monix._ import _root_.monix.eval.Task import scala.util.Either @@ -15,19 +13,19 @@ trait task { final class TaskOps[R, A](val e: Eff[R, A]) extends AnyVal { - def runTaskMemo[U](cache: Cache)(implicit m: Member.Aux[Memoized, R, U], task: TimedTask |= U): Eff[U, A] = + def runTaskMemo[U](cache: Cache)(implicit m: Member.Aux[Memoized, R, U], task: Task |= U): Eff[U, A] = TaskEffect.runTaskMemo(cache)(e) - def taskAttempt(implicit task: TimedTask /= R): Eff[R, Throwable Either A] = + def taskAttempt(implicit task: Task /= R): Eff[R, Throwable Either A] = TaskInterpretation.taskAttempt(e) - def taskMemo(key: AnyRef, cache: Cache)(implicit task: TimedTask /= R): Eff[R, A] = + def taskMemo(key: AnyRef, cache: Cache)(implicit task: Task /= R): Eff[R, A] = TaskInterpretation.taskMemo(key, cache, e) - def runAsync(implicit sexs: ScheduledExecutorService, m: Member.Aux[TimedTask, R, NoFx]): Task[A] = + def runAsync(implicit m: Member.Aux[Task, R, NoFx]): Task[A] = TaskInterpretation.runAsync(e) - def runSequential(implicit sexs: ScheduledExecutorService, m: Member.Aux[TimedTask, R, NoFx]): Task[A] = + def runSequential(implicit m: Member.Aux[Task, R, NoFx]): Task[A] = TaskInterpretation.runSequential(e) } diff --git a/notes/3.1.0.markdown b/notes/3.1.0.markdown new file mode 100644 index 00000000..d08a1d29 --- /dev/null +++ b/notes/3.1.0.markdown @@ -0,0 +1,14 @@ +This version brings some breaking API changes for async effects in order to make them compatible with ScalaJS. + +## Changes + + * `TimedFuture`, `TwitterTimedFuture`, scalaz `TimedTask`, fs2 `TimedTask` now use a `org.atnos.eff.Scheduler` instead of a `ScheduledExecutorService` to + timeout computations + + * a `Scheduler` can be created from a `ScheduledExecutorService` with the `ExecutorServices.schedulerFromScheduledExecutorService` method for the JVM + + * a `Scheduler` can be created with the `org.atnos.eff.Schedulers.default` method for ScalaJS + + * the monix `Task` effect is now directly using the `monix.eval.Task` type instead of using a `TimedTask` type before. + The API stays the same except for stacks declarations which are now `Fx.fx1[Task]` instead of `Fx.fx1[TimedTask]` + diff --git a/scalaz/src/main/scala/org/atnos/eff/addon/scalaz/concurrent/TaskEffect.scala b/scalaz/src/main/scala/org/atnos/eff/addon/scalaz/concurrent/TaskEffect.scala index e8addcf0..9dad4ba5 100644 --- a/scalaz/src/main/scala/org/atnos/eff/addon/scalaz/concurrent/TaskEffect.scala +++ b/scalaz/src/main/scala/org/atnos/eff/addon/scalaz/concurrent/TaskEffect.scala @@ -1,6 +1,6 @@ package org.atnos.eff.addon.scalaz.concurrent -import java.util.concurrent.{ExecutorService, ScheduledExecutorService} +import java.util.concurrent._ import org.atnos.eff.syntax.all._ @@ -14,21 +14,19 @@ import scala.concurrent.{ExecutionContext, Promise, TimeoutException} import scala.concurrent.duration.FiniteDuration import scala.util.{Either, Failure, Success} -case class TimedTask[A](task: (ScheduledExecutorService, ExecutionContext) => Task[A], timeout: Option[FiniteDuration] = None) { - @inline def runNow(sexs: ScheduledExecutorService, ec: ExecutionContext): Task[A] = timeout.fold(task(sexs, ec)) { t => - Task.async[A] { register => - val promise = Promise[A] - val onTimeout = new Runnable { - override def run(): Unit = { - val _ = promise.tryFailure(new TimeoutException) +case class TimedTask[A](task: (Scheduler, ExecutionContext) => Task[A], timeout: Option[FiniteDuration] = None) { + @inline def runNow(scheduler: Scheduler, ec: ExecutionContext): Task[A] = + timeout.fold(task(scheduler, ec)) { t => + Task.async[A] { register => + val promise = Promise[A] + val cancelTimeout = scheduler.schedule({ val _ = promise.tryFailure(new TimeoutException) }, t) + + task(scheduler, ec).unsafePerformAsync { tea => + promise.tryComplete(tea.fold(Failure(_), Success(_))) + cancelTimeout() } + promise.future.onComplete(t => register(if (t.isSuccess) \/-(t.get) else -\/(t.failed.get)))(ec) } - val _ = sexs.schedule(onTimeout, t.length, t.unit) - task(sexs, ec).unsafePerformAsync { tea => - val _ = promise.tryComplete(tea.fold(Failure(_), Success(_))) - } - promise.future.onComplete(t => register(if (t.isSuccess) \/-(t.get) else -\/(t.failed.get)))(ec) - } } } @@ -39,7 +37,7 @@ object TimedTask { TimedTask((_, _) => Task.now(x)) def ap[A, B](ff: TimedTask[A => B])(fa: TimedTask[A]): TimedTask[B] = - TimedTask[B]((sexs, ec) => Nondeterminism[Task].mapBoth(ff.runNow(sexs, ec), fa.runNow(sexs, ec))(_(_))) + TimedTask[B]((scheduler, ec) => Nondeterminism[Task].mapBoth(ff.runNow(scheduler, ec), fa.runNow(scheduler, ec))(_(_))) override def toString = "Applicative[Task]" } @@ -49,11 +47,11 @@ object TimedTask { TimedTask((_, _) => Task.now(x)) def flatMap[A, B](fa: TimedTask[A])(f: A => TimedTask[B]): TimedTask[B] = - TimedTask((sexs, ec) => fa.runNow(sexs, ec).flatMap(f(_).runNow(sexs, ec))) + TimedTask((scheduler, ec) => fa.runNow(scheduler, ec).flatMap(f(_).runNow(scheduler, ec))) def tailRecM[A, B](a: A)(f: A => TimedTask[Either[A, B]]): TimedTask[B] = - TimedTask({ (sexs, ec) => - def loop(na: A): Task[B] = { f(na).runNow(sexs, ec).flatMap(_.fold(loop, Task.now)) } + TimedTask({ (scheduler, ec) => + def loop(na: A): Task[B] = { f(na).runNow(scheduler, ec).flatMap(_.fold(loop, Task.now)) } loop(a) }) @@ -62,6 +60,7 @@ object TimedTask { } final def now[A](value: A): TimedTask[A] = TimedTask((_, _) => Task.now(value)) + implicit final def fromTask[A](task: Task[A]): TimedTask[A] = TimedTask((_, _) => task) @@ -69,10 +68,10 @@ object TimedTask { TimedTask((_, _) => task, timeout) implicit val timedTaskSequenceCached: SequenceCached[TimedTask] = new SequenceCached[TimedTask] { - def apply[X](cache: Cache, key: AnyRef, sequenceKey: Int, tx: =>TimedTask[X]): TimedTask[X] = TimedTask { (sexs, ec) => + def apply[X](cache: Cache, key: AnyRef, sequenceKey: Int, tx: =>TimedTask[X]): TimedTask[X] = TimedTask { (scheduler, ec) => implicit val executionContext = ec // there is no built-in memoization for Scalaz tasks so we need to memoize future instead - lazy val cached = cache.memo((key, sequenceKey), taskToFuture(tx.runNow(sexs, ec))) + lazy val cached = cache.memo((key, sequenceKey), taskToFuture(tx.runNow(scheduler, ec))) Task async { cb => cached.onComplete { @@ -112,7 +111,7 @@ trait TaskTypes { trait TaskCreation extends TaskTypes { - final def taskWithExecutors[R :_task, A](c: (ScheduledExecutorService, ExecutionContext) => Task[A], + final def taskWithExecutors[R :_task, A](c: (Scheduler, ExecutionContext) => Task[A], timeout: Option[FiniteDuration] = None): Eff[R, A] = Eff.send[TimedTask, R, A](TimedTask(c, timeout)) @@ -146,14 +145,14 @@ object TaskCreation extends TaskTypes trait TaskInterpretation extends TaskTypes { - def runAsync[R, A](e: Eff[R, A])(implicit sexs: ScheduledExecutorService, ec: ExecutionContext, m: Member.Aux[TimedTask, R, NoFx]): Task[A] = - Eff.detachA(e)(TimedTask.TimedTaskMonad, TimedTask.TimedTaskApplicative, m).runNow(sexs, ec) + def runAsync[R, A](e: Eff[R, A])(implicit scheduler: Scheduler, ec: ExecutionContext, m: Member.Aux[TimedTask, R, NoFx]): Task[A] = + Eff.detachA(e)(TimedTask.TimedTaskMonad, TimedTask.TimedTaskApplicative, m).runNow(scheduler, ec) - def runSequential[R, A](e: Eff[R, A])(implicit sexs: ScheduledExecutorService, ec: ExecutionContext, m: Member.Aux[TimedTask, R, NoFx]): Task[A] = - Eff.detach(e)(Monad[TimedTask], m).runNow(sexs, ec) + def runSequential[R, A](e: Eff[R, A])(implicit scheduler: Scheduler, ec: ExecutionContext, m: Member.Aux[TimedTask, R, NoFx]): Task[A] = + Eff.detach(e)(TimedTask.TimedTaskMonad, m).runNow(scheduler, ec) def attempt[A](task: TimedTask[A]): TimedTask[Throwable Either A] = { - TimedTask(task = (sexs, ec) => task.runNow(sexs, ec).attempt.map(_.toEither)) + TimedTask(task = (scheduler, ec) => task.runNow(scheduler, ec).attempt.map(_.toEither)) } import interpret.of @@ -166,8 +165,8 @@ trait TaskInterpretation extends TaskTypes { /** memoize the task result */ def memoize[A](key: AnyRef, cache: Cache, task: TimedTask[A]): TimedTask[A] = - TimedTask((sexs, ec) => Task.suspend { - cache.get(key).fold(task.runNow(sexs, ec).map { r => cache.put(key, r); r })(Task.now) + TimedTask((scheduler, ec) => Task.suspend { + cache.get(key).fold(task.runNow(scheduler, ec).map { r => cache.put(key, r); r })(Task.now) }) diff --git a/scalaz/src/main/scala/org/atnos/eff/syntax/addon/scalaz/task.scala b/scalaz/src/main/scala/org/atnos/eff/syntax/addon/scalaz/task.scala index b91a340e..9ff5ae95 100644 --- a/scalaz/src/main/scala/org/atnos/eff/syntax/addon/scalaz/task.scala +++ b/scalaz/src/main/scala/org/atnos/eff/syntax/addon/scalaz/task.scala @@ -1,9 +1,7 @@ package org.atnos.eff.syntax.addon.scalaz -import java.util.concurrent.ScheduledExecutorService - import org.atnos.eff.addon.scalaz.concurrent.{TaskEffect, TaskInterpretation, TimedTask} -import org.atnos.eff.{Fx, _} +import org.atnos.eff._ import scala.concurrent.ExecutionContext import scala.util.Either @@ -29,9 +27,9 @@ final class TaskOps[R, A](val e: Eff[R, A]) extends AnyVal { def taskMemo(key: AnyRef, cache: Cache)(implicit async: TimedTask /= R): Eff[R, A] = TaskInterpretation.taskMemo(key, cache, e) - def runAsync(implicit sexs: ScheduledExecutorService, ec: ExecutionContext, m: Member.Aux[TimedTask, R, NoFx]): Task[A] = + def runAsync(implicit scheduler: Scheduler, ec: ExecutionContext, m: Member.Aux[TimedTask, R, NoFx]): Task[A] = TaskInterpretation.runAsync(e) - def runSequential(implicit sexs: ScheduledExecutorService, ec: ExecutionContext, m: Member.Aux[TimedTask, R, NoFx]): Task[A] = + def runSequential(implicit scheduler: Scheduler, ec: ExecutionContext, m: Member.Aux[TimedTask, R, NoFx]): Task[A] = TaskInterpretation.runSequential(e) } diff --git a/scalaz/src/test/scala/org/atnos/eff/addon/scalaz/concurrent/TaskEffectSpec.scala b/scalaz/src/test/scala/org/atnos/eff/addon/scalaz/concurrent/TaskEffectSpec.scala index 4ad337cd..e23ed55e 100644 --- a/scalaz/src/test/scala/org/atnos/eff/addon/scalaz/concurrent/TaskEffectSpec.scala +++ b/scalaz/src/test/scala/org/atnos/eff/addon/scalaz/concurrent/TaskEffectSpec.scala @@ -33,7 +33,7 @@ class TaskEffectSpec(implicit ee: ExecutionEnv) extends Specification with Scala type S = Fx.fx2[TimedTask, Option] - implicit val ses = ee.ses + implicit val scheduler = ExecutorServices.schedulerFromScheduledExecutorService(ee.ses) implicit val ec = ee.ec def e1 = { diff --git a/shared/src/main/scala/org/atnos/eff/ExecutorServices.scala b/shared/src/main/scala/org/atnos/eff/ExecutorServices.scala index 0bf23d38..a2245af9 100644 --- a/shared/src/main/scala/org/atnos/eff/ExecutorServices.scala +++ b/shared/src/main/scala/org/atnos/eff/ExecutorServices.scala @@ -5,6 +5,7 @@ import java.util.concurrent._ import cats.Eval +import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService} case class ExecutorServices(executorServiceEval: Eval[ExecutorService], @@ -27,6 +28,9 @@ case class ExecutorServices(executorServiceEval: Eval[ExecutorService], implicit lazy val executionContext: ExecutionContext = executionContextEval.value + implicit lazy val scheduler: Scheduler = + ExecutorServices.schedulerFromScheduledExecutorService(scheduledExecutorService) + /** convenience method to shutdown the services when the final future has completed */ def shutdownOnComplete[A](future: scala.concurrent.Future[A]): ExecutorServices = { future.onComplete(_ => shutdown.value) @@ -35,7 +39,7 @@ case class ExecutorServices(executorServiceEval: Eval[ExecutorService], } -object ExecutorServices { +object ExecutorServices extends Schedulers { lazy val threadsNb = Runtime.getRuntime.availableProcessors @@ -91,9 +95,23 @@ object ExecutorServices { } } - /** create an ExecutionEnv from Scala global execution context */ + /** create an ExecutorServices from Scala global execution context */ def fromGlobalExecutionContext: ExecutorServices = fromExecutionContext(scala.concurrent.ExecutionContext.global) + /** create a Scheduler from Scala global execution context */ + def schedulerFromGlobalExecutionContext: Scheduler = + schedulerFromScheduledExecutorService(fromGlobalExecutionContext.scheduledExecutorService) + + def schedulerFromScheduledExecutorService(s: ScheduledExecutorService): Scheduler = + new Scheduler { + def schedule(timedout: =>Unit, duration: FiniteDuration): () => Unit = { + val scheduled = s.schedule(new Runnable { def run(): Unit = timedout }, duration.toNanos, TimeUnit.NANOSECONDS) + () => { scheduled.cancel(false); () } + } + + override def toString = "Scheduler" + } + } diff --git a/shared/src/main/scala/org/atnos/eff/FutureEffect.scala b/shared/src/main/scala/org/atnos/eff/FutureEffect.scala index 7482ae39..51e2377a 100644 --- a/shared/src/main/scala/org/atnos/eff/FutureEffect.scala +++ b/shared/src/main/scala/org/atnos/eff/FutureEffect.scala @@ -1,66 +1,65 @@ package org.atnos.eff -import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.TimeoutException import cats._ import cats.implicits._ import org.atnos.eff.all._ import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{ExecutionContext, Future, Promise, TimeoutException} +import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.{Failure, Success} object FutureCreation extends FutureCreation - final case class TimedFuture[A](callback: (ScheduledExecutorService, ExecutionContext) => Future[A], timeout: Option[FiniteDuration] = None) { - @inline def runNow(sexs: ScheduledExecutorService, exc: ExecutionContext): Future[A] = { - timeout.fold { - callback(sexs, exc) - } { t => - val promise = Promise[A] - val timeout = new Runnable { - override def run(): Unit = { - val _ = promise.tryFailure(new TimeoutException) - } - } - sexs.schedule(timeout, t.length, t.unit) - promise.tryCompleteWith(callback(sexs, exc)) - promise.future - } +final case class TimedFuture[A](callback: (Scheduler, ExecutionContext) => Future[A], timeout: Option[FiniteDuration] = None) { + @inline def runNow(scheduler: Scheduler, ec: ExecutionContext): Future[A] = + timeout.fold(callback(scheduler, ec)) { t => + val promise = Promise[A] + val cancelTimeout = scheduler.schedule({ promise.tryFailure(new TimeoutException); () }, t) + promise.tryCompleteWith(callback(scheduler, ec).map(a => { cancelTimeout(); a })(ec)) + promise.future } - } +} + +object TimedFuture { - object TimedFuture { - - final def ApplicativeTimedFuture: Applicative[TimedFuture] = new Applicative[TimedFuture] { - override def pure[A](x: A) = TimedFuture((_, _) => Future.successful(x)) - override def ap[A, B](ff: TimedFuture[(A) => B])(fa: TimedFuture[A]): TimedFuture[B] = { - val newCallback = { (sexs: ScheduledExecutorService, ec: ExecutionContext) => - val ffRan = ff.runNow(sexs, ec) - val faRan = fa.runNow(sexs, ec) - faRan.flatMap(a => ffRan.map(f => f(a))(ec))(ec) - } - TimedFuture(newCallback) + final def ApplicativeTimedFuture: Applicative[TimedFuture] = new Applicative[TimedFuture] { + def pure[A](x: A): TimedFuture[A] = + TimedFuture((_, _) => Future.successful(x)) + + def ap[A, B](ff: TimedFuture[(A) => B])(fa: TimedFuture[A]): TimedFuture[B] = { + val newCallback = { (scheduler: Scheduler, ec: ExecutionContext) => + val ffRan = ff.runNow(scheduler, ec) + val faRan = fa.runNow(scheduler, ec) + faRan.flatMap(a => ffRan.map(f => f(a))(ec))(ec) } - override def toString = "Applicative[TimedFuture]" + TimedFuture(newCallback) } - implicit final def MonadTimedFuture: Monad[TimedFuture] = new Monad[TimedFuture] { - override def pure[A](x: A) = TimedFuture((_, _) => Future.successful(x)) - override def flatMap[A, B](fa: TimedFuture[A])(f: (A) => TimedFuture[B]): TimedFuture[B] = - TimedFuture[B]((sexs, ec) => fa.runNow(sexs, ec).flatMap(f(_).runNow(sexs, ec))(ec)) - override def tailRecM[A, B](a: A)(f: (A) => TimedFuture[Either[A, B]]): TimedFuture[B] = - TimedFuture[B]({ (sexs, ec) => - def loop(va: A): Future[B] = f(va).runNow(sexs, ec).flatMap { - case Left(na) => loop(na) - case Right(nb) => Future.successful(nb) - }(ec) - loop(a) - }) - override def toString = "Monad[TimedFuture]" - } + override def toString = "Applicative[TimedFuture]" } + implicit final def MonadTimedFuture: Monad[TimedFuture] = new Monad[TimedFuture] { + def pure[A](x: A): TimedFuture[A] = + TimedFuture((_, _) => Future.successful(x)) + + def flatMap[A, B](fa: TimedFuture[A])(f: (A) => TimedFuture[B]): TimedFuture[B] = + TimedFuture[B]((scheduler, ec) => fa.runNow(scheduler, ec).flatMap(f(_).runNow(scheduler, ec))(ec)) + + def tailRecM[A, B](a: A)(f: (A) => TimedFuture[Either[A, B]]): TimedFuture[B] = + TimedFuture[B]({ (scheduler, ec) => + def loop(va: A): Future[B] = f(va).runNow(scheduler, ec).flatMap { + case Left(na) => loop(na) + case Right(nb) => Future.successful(nb) + }(ec) + loop(a) + }) + + override def toString = "Monad[TimedFuture]" + } +} + trait FutureTypes { type _future[R] = TimedFuture |= R type _Future[R] = TimedFuture <= R @@ -68,7 +67,7 @@ trait FutureTypes { trait FutureCreation extends FutureTypes { - final def fromFutureWithExecutors[R :_future, A](c: (ScheduledExecutorService, ExecutionContext) => Future[A], timeout: Option[FiniteDuration] = None): Eff[R, A] = + final def fromFutureWithExecutors[R :_future, A](c: (Scheduler, ExecutionContext) => Future[A], timeout: Option[FiniteDuration] = None): Eff[R, A] = send[TimedFuture, R, A](TimedFuture(c, timeout)) final def fromFuture[R :_future, A](c: => Future[A], timeout: Option[FiniteDuration] = None): Eff[R, A] = @@ -93,11 +92,11 @@ trait FutureCreation extends FutureTypes { trait FutureInterpretation extends FutureTypes { - def runAsync[R, A](e: Eff[R, A])(implicit sexs: ScheduledExecutorService, exc: ExecutionContext, m: Member.Aux[TimedFuture, R, NoFx]): Future[A] = - Eff.detachA(Eff.effInto[R, Fx1[TimedFuture], A](e))(TimedFuture.MonadTimedFuture, TimedFuture.ApplicativeTimedFuture).runNow(sexs, exc) + def runAsync[R, A](e: Eff[R, A])(implicit scheduler: Scheduler, exc: ExecutionContext, m: Member.Aux[TimedFuture, R, NoFx]): Future[A] = + Eff.detachA(Eff.effInto[R, Fx1[TimedFuture], A](e))(TimedFuture.MonadTimedFuture, TimedFuture.ApplicativeTimedFuture).runNow(scheduler, exc) - def runSequential[R, A](e: Eff[R, A])(implicit sexs: ScheduledExecutorService, exc: ExecutionContext, m: Member.Aux[TimedFuture, R, NoFx]): Future[A] = - Eff.detach(Eff.effInto[R, Fx1[TimedFuture], A](e)).runNow(sexs, exc) + def runSequential[R, A](e: Eff[R, A])(implicit scheduler: Scheduler, exc: ExecutionContext, m: Member.Aux[TimedFuture, R, NoFx]): Future[A] = + Eff.detach(Eff.effInto[R, Fx1[TimedFuture], A](e)).runNow(scheduler, exc) import interpret.of @@ -108,9 +107,9 @@ trait FutureInterpretation extends FutureTypes { }) final def attempt[A](a: TimedFuture[A]): TimedFuture[Throwable Either A] = { - TimedFuture[Throwable Either A](callback = (sexs, ec) => { + TimedFuture[Throwable Either A](callback = (scheduler, ec) => { val prom = Promise[Throwable Either A]() - a.runNow(sexs, ec).onComplete { t => + a.runNow(scheduler, ec).onComplete { t => prom.success(t match { case Failure(ex) => Either.left(ex) case Success(v) => Either.right(v) @@ -121,10 +120,10 @@ trait FutureInterpretation extends FutureTypes { } final def memoize[A](key: AnyRef, cache: Cache, future: TimedFuture[A]): TimedFuture[A] = - TimedFuture { (sexs, ec) => + TimedFuture { (scheduler, ec) => val prom = Promise[A]() cache.get[A](key).fold { - prom.completeWith(future.runNow(sexs, ec).map { v => val _ = cache.put(key, v); v }(ec)) + prom.completeWith(future.runNow(scheduler, ec).map { v => val _ = cache.put(key, v); v }(ec)) } { v => prom.success(v) } prom.future } diff --git a/shared/src/main/scala/org/atnos/eff/Scheduler.scala b/shared/src/main/scala/org/atnos/eff/Scheduler.scala new file mode 100644 index 00000000..b2e90f93 --- /dev/null +++ b/shared/src/main/scala/org/atnos/eff/Scheduler.scala @@ -0,0 +1,13 @@ +package org.atnos.eff + +import scala.concurrent.duration._ + +/** + * The design of the Scheduler is taken from: + * https://github.com/functional-streams-for-scala/fs2/blob/series/1.0/core/jvm/src/main/scala/fs2/Scheduler.scala + */ +trait Scheduler { + + def schedule(timedout: =>Unit, duration: FiniteDuration): () => Unit + +} diff --git a/shared/src/main/scala/org/atnos/eff/syntax/future.scala b/shared/src/main/scala/org/atnos/eff/syntax/future.scala index f9a928d4..b556019a 100644 --- a/shared/src/main/scala/org/atnos/eff/syntax/future.scala +++ b/shared/src/main/scala/org/atnos/eff/syntax/future.scala @@ -1,7 +1,5 @@ package org.atnos.eff.syntax -import java.util.concurrent.ScheduledExecutorService - import org.atnos.eff._ import scala.concurrent.{ExecutionContext, Future} @@ -22,10 +20,10 @@ final class FutureOps[R, A](val e: Eff[R, A]) extends AnyVal { def futureMemo(key: AnyRef, cache: Cache)(implicit future: TimedFuture /= R): Eff[R, A] = FutureInterpretation.futureMemo(key, cache, e) - def runAsync(implicit sexs: ScheduledExecutorService, exc: ExecutionContext, m: Member.Aux[TimedFuture, R, NoFx]): Future[A] = + def runAsync(implicit scheduler: Scheduler, exc: ExecutionContext, m: Member.Aux[TimedFuture, R, NoFx]): Future[A] = FutureInterpretation.runAsync(e) - def runSequential(implicit sexs: ScheduledExecutorService, exc: ExecutionContext, m: Member.Aux[TimedFuture, R, NoFx]): Future[A] = + def runSequential(implicit scheduler: Scheduler, exc: ExecutionContext, m: Member.Aux[TimedFuture, R, NoFx]): Future[A] = FutureInterpretation.runSequential(e) } diff --git a/twitter/src/main/scala/org/atnos/eff/addon/twitter/TwitterFutureEffect.scala b/twitter/src/main/scala/org/atnos/eff/addon/twitter/TwitterFutureEffect.scala index a062b286..cca279ff 100644 --- a/twitter/src/main/scala/org/atnos/eff/addon/twitter/TwitterFutureEffect.scala +++ b/twitter/src/main/scala/org/atnos/eff/addon/twitter/TwitterFutureEffect.scala @@ -1,7 +1,5 @@ package org.atnos.eff.addon.twitter -import java.util.concurrent.ScheduledExecutorService - import cats._ import cats.implicits._ import org.atnos.eff.all._ @@ -13,24 +11,19 @@ import com.twitter.util._ object TwitterFutureCreation extends TwitterFutureCreation -final case class TwitterTimedFuture[A](callback: (FuturePool, ScheduledExecutorService) => Future[A], timeout: Option[FiniteDuration] = None) { - @inline def runNow(pool: FuturePool, sexs: ScheduledExecutorService): Future[A] = { - timeout.fold { - callback(pool, sexs) - } { t => +final case class TwitterTimedFuture[A](callback: (FuturePool, Scheduler) => Future[A], timeout: Option[FiniteDuration] = None) { + @inline + def runNow(pool: FuturePool, scheduler: Scheduler): Future[A] = + timeout.fold(callback(pool, scheduler)) { t => val promise = Promise[A] - val timeout = new Runnable { - override def run(): Unit = { - val _ = promise.updateIfEmpty(Throw(new TimeoutException)) - } - } - sexs.schedule(timeout, t.length, t.unit) - callback(pool, sexs) - .onFailure { e => val _ = promise.updateIfEmpty(Throw(e)) } - .onSuccess { a => val _ = promise.updateIfEmpty(Return(a)) } + val cancelTimeout = scheduler.schedule({ promise.updateIfEmpty(Throw(new TimeoutException)); () }, t) + callback(pool, scheduler) + .onFailure { e => cancelTimeout(); val _ = promise.updateIfEmpty(Throw(e)) } + .onSuccess { a => cancelTimeout(); val _ = promise.updateIfEmpty(Return(a)) } promise + } - } + } object TwitterTimedFuture { @@ -40,9 +33,9 @@ object TwitterTimedFuture { TwitterTimedFuture((_, _) => Future.value(x)) def ap[A, B](ff: TwitterTimedFuture[(A) => B])(fa: TwitterTimedFuture[A]): TwitterTimedFuture[B] = { - val newCallback = { (pool: FuturePool, sexs: ScheduledExecutorService) => - val ffRan = ff.runNow(pool, sexs) - val faRan = fa.runNow(pool, sexs) + val newCallback = { (pool: FuturePool, scheduler: Scheduler) => + val ffRan = ff.runNow(pool, scheduler) + val faRan = fa.runNow(pool, scheduler) ffRan.joinWith(faRan)(_(_)) } TwitterTimedFuture(newCallback) @@ -56,11 +49,11 @@ object TwitterTimedFuture { TwitterTimedFuture((_, _) => Future.value(x)) def flatMap[A, B](fa: TwitterTimedFuture[A])(f: (A) => TwitterTimedFuture[B]): TwitterTimedFuture[B] = - TwitterTimedFuture[B]((pool, sexs) => fa.runNow(pool, sexs).flatMap(f(_).runNow(pool, sexs))) + TwitterTimedFuture[B]((pool, scheduler) => fa.runNow(pool, scheduler).flatMap(f(_).runNow(pool, scheduler))) def tailRecM[A, B](a: A)(f: (A) => TwitterTimedFuture[Either[A, B]]): TwitterTimedFuture[B] = - TwitterTimedFuture[B]({ (pool, sexs) => - def loop(va: A): Future[B] = f(va).runNow(pool, sexs).flatMap { + TwitterTimedFuture[B]({ (pool, scheduler) => + def loop(va: A): Future[B] = f(va).runNow(pool, scheduler).flatMap { case Left(na) => loop(na) case Right(nb) => Future.value(nb) } @@ -73,7 +66,7 @@ object TwitterTimedFuture { implicit val twitterFutureSequenceCached: SequenceCached[TwitterTimedFuture] = new SequenceCached[TwitterTimedFuture] { def apply[X](cache: Cache, key: AnyRef, sequenceKey: Int, tx: =>TwitterTimedFuture[X]): TwitterTimedFuture[X] = - TwitterTimedFuture((pool, sexs) => cache.memo((key, sequenceKey), tx.runNow(pool, sexs))) + TwitterTimedFuture((pool, scheduler) => cache.memo((key, sequenceKey), tx.runNow(pool, scheduler))) } } @@ -85,7 +78,7 @@ trait TwitterFutureTypes { trait TwitterFutureCreation extends TwitterFutureTypes { - final def fromFutureWithExecutors[R :_future, A](c: (FuturePool, ScheduledExecutorService) => Future[A], timeout: Option[FiniteDuration] = None): Eff[R, A] = + final def fromFutureWithExecutors[R :_future, A](c: (FuturePool, Scheduler) => Future[A], timeout: Option[FiniteDuration] = None): Eff[R, A] = send[TwitterTimedFuture, R, A](TwitterTimedFuture(c, timeout)) final def fromFuture[R :_future, A](c: => Future[A], timeout: Option[FiniteDuration] = None): Eff[R, A] = @@ -107,11 +100,11 @@ trait TwitterFutureCreation extends TwitterFutureTypes { trait TwitterFutureInterpretation extends TwitterFutureTypes { - def runAsync[R, A](e: Eff[R, A])(implicit pool: FuturePool, sexs: ScheduledExecutorService, m: Member.Aux[TwitterTimedFuture, R, NoFx]): Future[A] = - Eff.detachA(e)(TwitterTimedFuture.MonadTwitterTimedFuture, TwitterTimedFuture.ApplicativeTwitterTimedFuture, m).runNow(pool, sexs) + def runAsync[R, A](e: Eff[R, A])(implicit pool: FuturePool, scheduler: Scheduler, m: Member.Aux[TwitterTimedFuture, R, NoFx]): Future[A] = + Eff.detachA(e)(TwitterTimedFuture.MonadTwitterTimedFuture, TwitterTimedFuture.ApplicativeTwitterTimedFuture, m).runNow(pool, scheduler) - def runSequential[R, A](e: Eff[R, A])(implicit pool: FuturePool, sexs: ScheduledExecutorService, m: Member.Aux[TwitterTimedFuture, R, NoFx]): Future[A] = - Eff.detach(e)(Monad[TwitterTimedFuture], m).runNow(pool, sexs) + def runSequential[R, A](e: Eff[R, A])(implicit pool: FuturePool, scheduler: Scheduler, m: Member.Aux[TwitterTimedFuture, R, NoFx]): Future[A] = + Eff.detach(e)(Monad[TwitterTimedFuture], m).runNow(pool, scheduler) import interpret.of @@ -132,11 +125,11 @@ trait TwitterFutureInterpretation extends TwitterFutureTypes { /** memoize future result using a cache */ final def memoize[A](key: AnyRef, cache: Cache, future: TwitterTimedFuture[A]): TwitterTimedFuture[A] = - TwitterTimedFuture { (pool, sexs) => + TwitterTimedFuture { (pool, scheduler) => val promise = Promise[A]() cache.get[A](key).fold { - future.runNow(pool, sexs).map { v => val _ = cache.put(key, v); v }.proxyTo(promise) + future.runNow(pool, scheduler).map { v => val _ = cache.put(key, v); v }.proxyTo(promise) } { v => promise.setValue(v) } promise diff --git a/twitter/src/main/scala/org/atnos/eff/syntax/addon/twitter/future.scala b/twitter/src/main/scala/org/atnos/eff/syntax/addon/twitter/future.scala index abf6cb95..d372f354 100644 --- a/twitter/src/main/scala/org/atnos/eff/syntax/addon/twitter/future.scala +++ b/twitter/src/main/scala/org/atnos/eff/syntax/addon/twitter/future.scala @@ -1,7 +1,5 @@ package org.atnos.eff.syntax.addon.twitter -import java.util.concurrent.ScheduledExecutorService - import com.twitter.util.{Future, FuturePool} import org.atnos.eff.addon.twitter._ import org.atnos.eff.{Fx, _} @@ -26,9 +24,9 @@ final class TwitterFutureOps[R, A](val e: Eff[R, A]) extends AnyVal { def twitterFutureMemo(key: AnyRef, cache: Cache)(implicit future: TwitterTimedFuture /= R): Eff[R, A] = TwitterFutureInterpretation.futureMemo(key, cache, e) - def runAsync(implicit pool: FuturePool, sexs: ScheduledExecutorService, m: Member.Aux[TwitterTimedFuture, R, NoFx]): Future[A] = + def runAsync(implicit pool: FuturePool, scheduler: Scheduler, m: Member.Aux[TwitterTimedFuture, R, NoFx]): Future[A] = TwitterFutureInterpretation.runAsync(e) - def runSequential(implicit pool: FuturePool, sexs: ScheduledExecutorService, m: Member.Aux[TwitterTimedFuture, R, NoFx]): Future[A] = + def runSequential(implicit pool: FuturePool, scheduler: Scheduler, m: Member.Aux[TwitterTimedFuture, R, NoFx]): Future[A] = TwitterFutureInterpretation.runSequential(e) } diff --git a/twitter/src/test/scala/org/atnos/eff/addon/twitter/TwitterFutureEffectSpec.scala b/twitter/src/test/scala/org/atnos/eff/addon/twitter/TwitterFutureEffectSpec.scala index 0b2a3260..894a3a9c 100644 --- a/twitter/src/test/scala/org/atnos/eff/addon/twitter/TwitterFutureEffectSpec.scala +++ b/twitter/src/test/scala/org/atnos/eff/addon/twitter/TwitterFutureEffectSpec.scala @@ -40,7 +40,7 @@ class TwitterFutureEffectSpec(implicit ee: ExecutionEnv) extends Specification w type S = Fx.fx2[TwitterTimedFuture, Option] implicit val pool = FuturePool(ee.es) - implicit val ses = ee.ses + implicit val scheduler = ExecutorServices.schedulerFromScheduledExecutorService(ee.ses) def e1 = { def action[R :_future :_option]: Eff[R, Int] = for { diff --git a/version.sbt b/version.sbt index 3909461b..3271bb8f 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "3.0.4" +version in ThisBuild := "3.1.0"