diff options
author | aleksandar <aleksandar@lampmac14.epfl.ch> | 2012-01-16 19:03:18 +0100 |
---|---|---|
committer | aleksandar <aleksandar@lampmac14.epfl.ch> | 2012-01-16 19:03:18 +0100 |
commit | 51a930f8595049babf5cf625e5f010c60bedc53b (patch) | |
tree | 97648a3795dee3ac657515e6087244179a4d0d42 | |
parent | 031eea9cb2b7ff00f70f9adb8d8da371bd013bfe (diff) | |
download | scala-51a930f8595049babf5cf625e5f010c60bedc53b.tar.gz scala-51a930f8595049babf5cf625e5f010c60bedc53b.tar.bz2 scala-51a930f8595049babf5cf625e5f010c60bedc53b.zip |
Refactor concurrent package and execution contexts.
-rw-r--r-- | src/library/scala/concurrent/ExecutionContext.scala | 37 | ||||
-rw-r--r-- | src/library/scala/concurrent/Future.scala | 45 | ||||
-rw-r--r-- | src/library/scala/concurrent/akka/ExecutionContextImpl.scala | 24 | ||||
-rw-r--r-- | src/library/scala/concurrent/akka/Future.scala | 2 | ||||
-rw-r--r-- | src/library/scala/concurrent/akka/Promise.scala | 2 | ||||
-rw-r--r-- | src/library/scala/concurrent/default/TaskImpl.scala | 23 | ||||
-rw-r--r-- | src/library/scala/concurrent/package.scala | 23 |
7 files changed, 101 insertions, 55 deletions
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index 2650022e1e..303489297f 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -13,6 +13,7 @@ package scala.concurrent import java.util.concurrent.{ Executors, Future => JFuture, Callable } import scala.util.{ Duration, Timeout } import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread } +import scala.collection.generic.CanBuildFrom @@ -32,7 +33,9 @@ trait ExecutionContext { def blocking[T](atMost: Duration)(body: =>T): T - def blocking[T](atMost: Duration)(awaitable: Awaitable[T]): T + def blocking[T](awaitable: Awaitable[T], atMost: Duration): T + + def futureUtilities: FutureUtilities = FutureUtilitiesImpl } @@ -40,3 +43,35 @@ trait ExecutionContext { sealed trait CanAwait +trait FutureUtilities { + + def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]]): Future[Coll[T]] = { + val builder = cbf(futures) + val p: Promise[Coll[T]] = promise[Coll[T]] + + if (futures.size == 1) futures.head onComplete { + case Left(t) => p failure t + case Right(v) => builder += v + p success builder.result + } else { + val restFutures = all(futures.tail) + futures.head onComplete { + case Left(t) => p failure t + case Right(v) => builder += v + restFutures onComplete { + case Left(t) => p failure t + case Right(vs) => for (v <- vs) builder += v + p success builder.result + } + } + } + + p.future + } + +} + + +object FutureUtilitiesImpl extends FutureUtilities { +} + diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index e6edaea87a..6b358e1e09 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -365,11 +365,11 @@ self => * {{{ * val f = future { sys.error("failed") } * val g = future { 5 } - * val h = f or g + * val h = f any g * await(0) h // evaluates to either 5 or throws a runtime exception * }}} */ - def or[U >: T](that: Future[U]): Future[U] = { + def any[U >: T](that: Future[U]): Future[U] = { val p = newPromise[U] val completePromise: PartialFunction[Either[Throwable, T], _] = { @@ -385,35 +385,24 @@ self => } +/** TODO some docs + * + * @define nonDeterministic + * Note: using this method yields nondeterministic dataflow programs. + */ object Future { - /* - // TODO make more modular by encoding this within the execution context - def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]]): Future[Coll[T]] = { - val builder = cbf(futures) - val p: Promise[Coll[T]] = executor.promise[Coll[T]] - - if (futures.size == 1) futures.head onComplete { - case Left(t) => p failure t - case Right(v) => builder += v - p success builder.result - } else { - val restFutures = all(futures.tail) - futures.head onComplete { - case Left(t) => p failure t - case Right(v) => builder += v - restFutures onComplete { - case Left(t) => p failure t - case Right(vs) => for (v <- vs) builder += v - p success builder.result - } - } - } - - p.future - } - */ + // TODO make more modular by encoding all other helper methods within the execution context + /** + */ + def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]], ec: ExecutionContext): Future[Coll[T]] = + ec.futureUtilities.all[T, Coll](futures) + // move this to future companion object @inline def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = executor.future(body) } + + + + diff --git a/src/library/scala/concurrent/akka/ExecutionContextImpl.scala b/src/library/scala/concurrent/akka/ExecutionContextImpl.scala index 1cff7211f3..922d77189c 100644 --- a/src/library/scala/concurrent/akka/ExecutionContextImpl.scala +++ b/src/library/scala/concurrent/akka/ExecutionContextImpl.scala @@ -11,15 +11,22 @@ package scala.concurrent.akka import java.util.concurrent.{Callable, ExecutorService} -import scala.concurrent.{ExecutionContext, resolver, Awaitable} +import scala.concurrent.{ExecutionContext, resolver, Awaitable, body2awaitable} import scala.util.Duration import scala.collection.mutable.Stack class ExecutionContextImpl(executorService: ExecutorService) extends ExecutionContext { + import ExecutionContextImpl._ - def execute(runnable: Runnable): Unit = executorService execute runnable + def execute(runnable: Runnable): Unit = executorService match { + // case fj: ForkJoinPool => + // // TODO fork if more applicable + // executorService execute runnable + case _ => + executorService execute runnable + } def execute[U](body: () => U): Unit = execute(new Runnable { def run() = body() @@ -46,7 +53,7 @@ class ExecutionContextImpl(executorService: ExecutorService) extends ExecutionCo def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost) - def blocking[T](atMost: Duration)(awaitable: Awaitable[T]): T = { + def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = { currentExecutionContext.get match { case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case case x => x.blockingCall(awaitable) // inside an execution context thread @@ -109,3 +116,14 @@ class ExecutionContextImpl(executorService: ExecutorService) extends ExecutionCo } } + + +object ExecutionContextImpl { + + private[concurrent] def currentExecutionContext: ThreadLocal[ExecutionContextImpl] = new ThreadLocal[ExecutionContextImpl] { + override protected def initialValue = null + } + +} + + diff --git a/src/library/scala/concurrent/akka/Future.scala b/src/library/scala/concurrent/akka/Future.scala index 5036768d36..be1a9ec2ae 100644 --- a/src/library/scala/concurrent/akka/Future.scala +++ b/src/library/scala/concurrent/akka/Future.scala @@ -17,7 +17,7 @@ import scala.concurrent.{Awaitable, ExecutionContext} trait Future[+T] extends scala.concurrent.Future[T] with Awaitable[T] { - implicit def executor: ExecutionContext + implicit def executor: ExecutionContextImpl /** For use only within a Future.flow block or another compatible Delimited Continuations reset block. * diff --git a/src/library/scala/concurrent/akka/Promise.scala b/src/library/scala/concurrent/akka/Promise.scala index e7b6b50aeb..d688d3d850 100644 --- a/src/library/scala/concurrent/akka/Promise.scala +++ b/src/library/scala/concurrent/akka/Promise.scala @@ -118,7 +118,7 @@ object Promise { value.isDefined } - executor.blockingCall(concurrent.body2awaitable(awaitUnsafe(dur2long(atMost)))) + executor.blocking(concurrent.body2awaitable(awaitUnsafe(dur2long(atMost))), Duration.fromNanos(0)) } private def ready(atMost: Duration)(implicit permit: CanAwait): this.type = diff --git a/src/library/scala/concurrent/default/TaskImpl.scala b/src/library/scala/concurrent/default/TaskImpl.scala index 4bddb740ef..59037cc48b 100644 --- a/src/library/scala/concurrent/default/TaskImpl.scala +++ b/src/library/scala/concurrent/default/TaskImpl.scala @@ -215,6 +215,8 @@ case class Failure[T](throwable: Throwable) extends State[T] private[concurrent] final class ExecutionContextImpl extends ExecutionContext { + import ExecutionContextImpl._ + val pool = { val p = new ForkJoinPool p.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler { @@ -259,10 +261,11 @@ private[concurrent] final class ExecutionContextImpl extends ExecutionContext { def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost) - def blocking[T](atMost: Duration)(awaitable: Awaitable[T]): T = { + def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = { currentExecutionContext.get match { case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case - case x => x.blockingCall(awaitable) // inside an execution context thread + case x if x eq this => this.blockingCall(awaitable) // inside an execution context thread on this executor + case x => x.blocking(awaitable, atMost) } } @@ -286,3 +289,19 @@ private[concurrent] final class ExecutionContextImpl extends ExecutionContext { } } + + +object ExecutionContextImpl { + + private[concurrent] def currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal[ExecutionContext] { + override protected def initialValue = null + } + +} + + + + + + + diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala index d40eb4e2a1..23f26dd3b5 100644 --- a/src/library/scala/concurrent/package.scala +++ b/src/library/scala/concurrent/package.scala @@ -27,17 +27,13 @@ package object concurrent { /** A global execution environment for executing lightweight tasks. */ lazy val executionContext = - new default.ExecutionContextImpl + new akka.ExecutionContextImpl(java.util.concurrent.Executors.newCachedThreadPool()) /** A global service for scheduling tasks for execution. */ lazy val scheduler = new default.SchedulerImpl - private[concurrent] def currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal[ExecutionContext] { - override protected def initialValue = null - } - val handledFutureException: PartialFunction[Throwable, Throwable] = { case t: Throwable if isFutureThrowable(t) => t } @@ -90,7 +86,7 @@ package object concurrent { * - InterruptedException - in the case that a wait within the blockable object was interrupted * - TimeoutException - in the case that the blockable object timed out */ - def blocking[T](atMost: Duration)(body: =>T)(implicit execCtx: ExecutionContext = executionContext): T = + def blocking[T](atMost: Duration)(body: =>T)(implicit execCtx: ExecutionContext): T = executionContext.blocking(atMost)(body) /** Blocks on an awaitable object. @@ -102,19 +98,8 @@ package object concurrent { * - InterruptedException - in the case that a wait within the blockable object was interrupted * - TimeoutException - in the case that the blockable object timed out */ - def blocking[T](atMost: Duration)(awaitable: Awaitable[T])(implicit execCtx: ExecutionContext = executionContext): T = - executionContext.blocking(atMost)(awaitable) - - /* - def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost) - - def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = { - currentExecutionContext.get match { - case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case - case x => x.blockingCall(awaitable) // inside an execution context thread - } - } - */ + def blocking[T](awaitable: Awaitable[T], atMost: Duration)(implicit execCtx: ExecutionContext = executionContext): T = + executionContext.blocking(awaitable, atMost) object await { def ready[T](atMost: Duration)(awaitable: Awaitable[T])(implicit execCtx: ExecutionContext = executionContext): Awaitable[T] = { |