summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoraleksandar <aleksandar@lampmac14.epfl.ch>2012-01-16 19:03:18 +0100
committeraleksandar <aleksandar@lampmac14.epfl.ch>2012-01-16 19:03:18 +0100
commit51a930f8595049babf5cf625e5f010c60bedc53b (patch)
tree97648a3795dee3ac657515e6087244179a4d0d42
parent031eea9cb2b7ff00f70f9adb8d8da371bd013bfe (diff)
downloadscala-51a930f8595049babf5cf625e5f010c60bedc53b.tar.gz
scala-51a930f8595049babf5cf625e5f010c60bedc53b.tar.bz2
scala-51a930f8595049babf5cf625e5f010c60bedc53b.zip
Refactor concurrent package and execution contexts.
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala37
-rw-r--r--src/library/scala/concurrent/Future.scala45
-rw-r--r--src/library/scala/concurrent/akka/ExecutionContextImpl.scala24
-rw-r--r--src/library/scala/concurrent/akka/Future.scala2
-rw-r--r--src/library/scala/concurrent/akka/Promise.scala2
-rw-r--r--src/library/scala/concurrent/default/TaskImpl.scala23
-rw-r--r--src/library/scala/concurrent/package.scala23
7 files changed, 101 insertions, 55 deletions
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
index 2650022e1e..303489297f 100644
--- a/src/library/scala/concurrent/ExecutionContext.scala
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -13,6 +13,7 @@ package scala.concurrent
import java.util.concurrent.{ Executors, Future => JFuture, Callable }
import scala.util.{ Duration, Timeout }
import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread }
+import scala.collection.generic.CanBuildFrom
@@ -32,7 +33,9 @@ trait ExecutionContext {
def blocking[T](atMost: Duration)(body: =>T): T
- def blocking[T](atMost: Duration)(awaitable: Awaitable[T]): T
+ def blocking[T](awaitable: Awaitable[T], atMost: Duration): T
+
+ def futureUtilities: FutureUtilities = FutureUtilitiesImpl
}
@@ -40,3 +43,35 @@ trait ExecutionContext {
sealed trait CanAwait
+trait FutureUtilities {
+
+ def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]]): Future[Coll[T]] = {
+ val builder = cbf(futures)
+ val p: Promise[Coll[T]] = promise[Coll[T]]
+
+ if (futures.size == 1) futures.head onComplete {
+ case Left(t) => p failure t
+ case Right(v) => builder += v
+ p success builder.result
+ } else {
+ val restFutures = all(futures.tail)
+ futures.head onComplete {
+ case Left(t) => p failure t
+ case Right(v) => builder += v
+ restFutures onComplete {
+ case Left(t) => p failure t
+ case Right(vs) => for (v <- vs) builder += v
+ p success builder.result
+ }
+ }
+ }
+
+ p.future
+ }
+
+}
+
+
+object FutureUtilitiesImpl extends FutureUtilities {
+}
+
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index e6edaea87a..6b358e1e09 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -365,11 +365,11 @@ self =>
* {{{
* val f = future { sys.error("failed") }
* val g = future { 5 }
- * val h = f or g
+ * val h = f any g
* await(0) h // evaluates to either 5 or throws a runtime exception
* }}}
*/
- def or[U >: T](that: Future[U]): Future[U] = {
+ def any[U >: T](that: Future[U]): Future[U] = {
val p = newPromise[U]
val completePromise: PartialFunction[Either[Throwable, T], _] = {
@@ -385,35 +385,24 @@ self =>
}
+/** TODO some docs
+ *
+ * @define nonDeterministic
+ * Note: using this method yields nondeterministic dataflow programs.
+ */
object Future {
- /*
- // TODO make more modular by encoding this within the execution context
- def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]]): Future[Coll[T]] = {
- val builder = cbf(futures)
- val p: Promise[Coll[T]] = executor.promise[Coll[T]]
-
- if (futures.size == 1) futures.head onComplete {
- case Left(t) => p failure t
- case Right(v) => builder += v
- p success builder.result
- } else {
- val restFutures = all(futures.tail)
- futures.head onComplete {
- case Left(t) => p failure t
- case Right(v) => builder += v
- restFutures onComplete {
- case Left(t) => p failure t
- case Right(vs) => for (v <- vs) builder += v
- p success builder.result
- }
- }
- }
-
- p.future
- }
- */
+ // TODO make more modular by encoding all other helper methods within the execution context
+ /**
+ */
+ def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]], ec: ExecutionContext): Future[Coll[T]] =
+ ec.futureUtilities.all[T, Coll](futures)
+ // move this to future companion object
@inline def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = executor.future(body)
}
+
+
+
+
diff --git a/src/library/scala/concurrent/akka/ExecutionContextImpl.scala b/src/library/scala/concurrent/akka/ExecutionContextImpl.scala
index 1cff7211f3..922d77189c 100644
--- a/src/library/scala/concurrent/akka/ExecutionContextImpl.scala
+++ b/src/library/scala/concurrent/akka/ExecutionContextImpl.scala
@@ -11,15 +11,22 @@ package scala.concurrent.akka
import java.util.concurrent.{Callable, ExecutorService}
-import scala.concurrent.{ExecutionContext, resolver, Awaitable}
+import scala.concurrent.{ExecutionContext, resolver, Awaitable, body2awaitable}
import scala.util.Duration
import scala.collection.mutable.Stack
class ExecutionContextImpl(executorService: ExecutorService) extends ExecutionContext {
+ import ExecutionContextImpl._
- def execute(runnable: Runnable): Unit = executorService execute runnable
+ def execute(runnable: Runnable): Unit = executorService match {
+ // case fj: ForkJoinPool =>
+ // // TODO fork if more applicable
+ // executorService execute runnable
+ case _ =>
+ executorService execute runnable
+ }
def execute[U](body: () => U): Unit = execute(new Runnable {
def run() = body()
@@ -46,7 +53,7 @@ class ExecutionContextImpl(executorService: ExecutorService) extends ExecutionCo
def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost)
- def blocking[T](atMost: Duration)(awaitable: Awaitable[T]): T = {
+ def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = {
currentExecutionContext.get match {
case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case
case x => x.blockingCall(awaitable) // inside an execution context thread
@@ -109,3 +116,14 @@ class ExecutionContextImpl(executorService: ExecutorService) extends ExecutionCo
}
}
+
+
+object ExecutionContextImpl {
+
+ private[concurrent] def currentExecutionContext: ThreadLocal[ExecutionContextImpl] = new ThreadLocal[ExecutionContextImpl] {
+ override protected def initialValue = null
+ }
+
+}
+
+
diff --git a/src/library/scala/concurrent/akka/Future.scala b/src/library/scala/concurrent/akka/Future.scala
index 5036768d36..be1a9ec2ae 100644
--- a/src/library/scala/concurrent/akka/Future.scala
+++ b/src/library/scala/concurrent/akka/Future.scala
@@ -17,7 +17,7 @@ import scala.concurrent.{Awaitable, ExecutionContext}
trait Future[+T] extends scala.concurrent.Future[T] with Awaitable[T] {
- implicit def executor: ExecutionContext
+ implicit def executor: ExecutionContextImpl
/** For use only within a Future.flow block or another compatible Delimited Continuations reset block.
*
diff --git a/src/library/scala/concurrent/akka/Promise.scala b/src/library/scala/concurrent/akka/Promise.scala
index e7b6b50aeb..d688d3d850 100644
--- a/src/library/scala/concurrent/akka/Promise.scala
+++ b/src/library/scala/concurrent/akka/Promise.scala
@@ -118,7 +118,7 @@ object Promise {
value.isDefined
}
- executor.blockingCall(concurrent.body2awaitable(awaitUnsafe(dur2long(atMost))))
+ executor.blocking(concurrent.body2awaitable(awaitUnsafe(dur2long(atMost))), Duration.fromNanos(0))
}
private def ready(atMost: Duration)(implicit permit: CanAwait): this.type =
diff --git a/src/library/scala/concurrent/default/TaskImpl.scala b/src/library/scala/concurrent/default/TaskImpl.scala
index 4bddb740ef..59037cc48b 100644
--- a/src/library/scala/concurrent/default/TaskImpl.scala
+++ b/src/library/scala/concurrent/default/TaskImpl.scala
@@ -215,6 +215,8 @@ case class Failure[T](throwable: Throwable) extends State[T]
private[concurrent] final class ExecutionContextImpl extends ExecutionContext {
+ import ExecutionContextImpl._
+
val pool = {
val p = new ForkJoinPool
p.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler {
@@ -259,10 +261,11 @@ private[concurrent] final class ExecutionContextImpl extends ExecutionContext {
def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost)
- def blocking[T](atMost: Duration)(awaitable: Awaitable[T]): T = {
+ def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = {
currentExecutionContext.get match {
case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case
- case x => x.blockingCall(awaitable) // inside an execution context thread
+ case x if x eq this => this.blockingCall(awaitable) // inside an execution context thread on this executor
+ case x => x.blocking(awaitable, atMost)
}
}
@@ -286,3 +289,19 @@ private[concurrent] final class ExecutionContextImpl extends ExecutionContext {
}
}
+
+
+object ExecutionContextImpl {
+
+ private[concurrent] def currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal[ExecutionContext] {
+ override protected def initialValue = null
+ }
+
+}
+
+
+
+
+
+
+
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
index d40eb4e2a1..23f26dd3b5 100644
--- a/src/library/scala/concurrent/package.scala
+++ b/src/library/scala/concurrent/package.scala
@@ -27,17 +27,13 @@ package object concurrent {
/** A global execution environment for executing lightweight tasks.
*/
lazy val executionContext =
- new default.ExecutionContextImpl
+ new akka.ExecutionContextImpl(java.util.concurrent.Executors.newCachedThreadPool())
/** A global service for scheduling tasks for execution.
*/
lazy val scheduler =
new default.SchedulerImpl
- private[concurrent] def currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal[ExecutionContext] {
- override protected def initialValue = null
- }
-
val handledFutureException: PartialFunction[Throwable, Throwable] = {
case t: Throwable if isFutureThrowable(t) => t
}
@@ -90,7 +86,7 @@ package object concurrent {
* - InterruptedException - in the case that a wait within the blockable object was interrupted
* - TimeoutException - in the case that the blockable object timed out
*/
- def blocking[T](atMost: Duration)(body: =>T)(implicit execCtx: ExecutionContext = executionContext): T =
+ def blocking[T](atMost: Duration)(body: =>T)(implicit execCtx: ExecutionContext): T =
executionContext.blocking(atMost)(body)
/** Blocks on an awaitable object.
@@ -102,19 +98,8 @@ package object concurrent {
* - InterruptedException - in the case that a wait within the blockable object was interrupted
* - TimeoutException - in the case that the blockable object timed out
*/
- def blocking[T](atMost: Duration)(awaitable: Awaitable[T])(implicit execCtx: ExecutionContext = executionContext): T =
- executionContext.blocking(atMost)(awaitable)
-
- /*
- def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost)
-
- def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = {
- currentExecutionContext.get match {
- case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case
- case x => x.blockingCall(awaitable) // inside an execution context thread
- }
- }
- */
+ def blocking[T](awaitable: Awaitable[T], atMost: Duration)(implicit execCtx: ExecutionContext = executionContext): T =
+ executionContext.blocking(awaitable, atMost)
object await {
def ready[T](atMost: Duration)(awaitable: Awaitable[T])(implicit execCtx: ExecutionContext = executionContext): Awaitable[T] = {