summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleksandar Prokopec <axel22@gmail.com>2012-03-30 14:52:10 +0200
committerAleksandar Prokopec <axel22@gmail.com>2012-03-30 14:52:10 +0200
commit44551b2e9fb3ba76278dc6c4b56195cc65a1dff8 (patch)
treec2b7c40c83f2bac1a61b59bcfeea5b5d4e368142
parent7c84f1fc32e99a309f9b47b7359764aecbfc21e6 (diff)
downloadscala-44551b2e9fb3ba76278dc6c4b56195cc65a1dff8.tar.gz
scala-44551b2e9fb3ba76278dc6c4b56195cc65a1dff8.tar.bz2
scala-44551b2e9fb3ba76278dc6c4b56195cc65a1dff8.zip
Remove blocking from execution contexts.
-rw-r--r--src/library/scala/concurrent/ConcurrentPackageObject.scala30
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala6
-rw-r--r--src/library/scala/concurrent/impl/ExecutionContextImpl.scala42
-rw-r--r--src/library/scala/concurrent/impl/Promise.scala2
4 files changed, 54 insertions, 26 deletions
diff --git a/src/library/scala/concurrent/ConcurrentPackageObject.scala b/src/library/scala/concurrent/ConcurrentPackageObject.scala
index ba98757906..f4744a8757 100644
--- a/src/library/scala/concurrent/ConcurrentPackageObject.scala
+++ b/src/library/scala/concurrent/ConcurrentPackageObject.scala
@@ -10,8 +10,8 @@ package scala.concurrent
-import java.util.concurrent.{ Executors, ExecutorService }
-import scala.concurrent.forkjoin.ForkJoinPool
+import java.util.concurrent.{ Executors, ExecutorService, ThreadFactory }
+import scala.concurrent.forkjoin.{ ForkJoinPool, ForkJoinWorkerThread }
import scala.util.{ Try, Success, Failure }
import scala.concurrent.util.Duration
import ConcurrentPackageObject._
@@ -24,14 +24,9 @@ abstract class ConcurrentPackageObject {
/** A global execution environment for executing lightweight tasks.
*/
lazy val executionContext =
- new impl.ExecutionContextImpl(getExecutorService)
-
- private[concurrent] def getExecutorService: AnyRef =
- if (scala.util.Properties.isJavaAtLeast("1.6")) {
- val vendor = scala.util.Properties.javaVmVendor
- if ((vendor contains "Oracle") || (vendor contains "Sun") || (vendor contains "Apple")) new ForkJoinPool
- else Executors.newCachedThreadPool()
- } else Executors.newCachedThreadPool()
+ new impl.ExecutionContextImpl()
+
+ private val currentExecutionContext = new ThreadLocal[ExecutionContext]
val handledFutureException: PartialFunction[Throwable, Throwable] = {
case t: Throwable if isFutureThrowable(t) => t
@@ -58,10 +53,10 @@ abstract class ConcurrentPackageObject {
/* concurrency constructs */
- def future[T](body: =>T)(implicit execCtx: ExecutionContext = executionContext): Future[T] =
+ def future[T](body: =>T)(implicit execctx: ExecutionContext = executionContext): Future[T] =
Future[T](body)
- def promise[T]()(implicit execCtx: ExecutionContext = executionContext): Promise[T] =
+ def promise[T]()(implicit execctx: ExecutionContext = executionContext): Promise[T] =
Promise[T]()
/** Wraps a block of code into an awaitable object. */
@@ -82,8 +77,8 @@ abstract class ConcurrentPackageObject {
* - InterruptedException - in the case that a wait within the blockable object was interrupted
* - TimeoutException - in the case that the blockable object timed out
*/
- def blocking[T](body: =>T)(implicit execCtx: ExecutionContext): T =
- executionContext.blocking(body)
+ def blocking[T](body: =>T): T =
+ blocking(body2awaitable(body), Duration.fromNanos(0))
/** Blocks on an awaitable object.
*
@@ -94,8 +89,11 @@ abstract class ConcurrentPackageObject {
* - InterruptedException - in the case that a wait within the blockable object was interrupted
* - TimeoutException - in the case that the blockable object timed out
*/
- def blocking[T](awaitable: Awaitable[T], atMost: Duration)(implicit execCtx: ExecutionContext = executionContext): T =
- executionContext.blocking(awaitable, atMost)
+ def blocking[T](awaitable: Awaitable[T], atMost: Duration): T =
+ currentExecutionContext.get match {
+ case null => Await.result(awaitable, atMost)
+ case ec => ec.internalBlockingCall(awaitable, atMost)
+ }
@inline implicit final def int2durationops(x: Int): DurationOps = new DurationOps(x)
}
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
index a206a2d4ea..f639f76dc9 100644
--- a/src/library/scala/concurrent/ExecutionContext.scala
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -26,10 +26,8 @@ trait ExecutionContext {
def execute[U](body: () => U): Unit
- def blocking[T](body: =>T): T
-
- def blocking[T](awaitable: Awaitable[T], atMost: Duration): T
-
+ def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T
+
def reportFailure(t: Throwable): Unit
/* implementations follow */
diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
index 5dc440f42b..2cfd6f22cd 100644
--- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
+++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
@@ -10,7 +10,7 @@ package scala.concurrent.impl
-import java.util.concurrent.{Callable, ExecutorService}
+import java.util.concurrent.{Callable, ExecutorService, Executors, ThreadFactory}
import scala.concurrent.forkjoin._
import scala.concurrent.{ExecutionContext, resolver, Awaitable, body2awaitable}
import scala.util.{ Try, Success, Failure }
@@ -18,8 +18,42 @@ import scala.concurrent.util.{ Duration }
-private[scala] class ExecutionContextImpl(val executorService: AnyRef) extends ExecutionContext {
+private[scala] class ExecutionContextImpl() extends ExecutionContext {
import ExecutionContextImpl._
+
+ val executorService: AnyRef = getExecutorService
+
+ // to ensure that the current execution context thread local is properly set
+ private def executorsThreadFactory = new ThreadFactory {
+ def newThread(r: Runnable) = new Thread(new Runnable {
+ override def run() {
+ currentExecutionContext.set(ExecutionContextImpl.this)
+ r.run()
+ }
+ })
+ }
+
+ // to ensure that the current execution context thread local is properly set
+ private def forkJoinPoolThreadFactory = new ForkJoinPool.ForkJoinWorkerThreadFactory {
+ def newThread(fjp: ForkJoinPool) = new ForkJoinWorkerThread(fjp) {
+ override def onStart() {
+ currentExecutionContext.set(ExecutionContextImpl.this)
+ }
+ }
+ }
+
+ private def getExecutorService: AnyRef =
+ if (scala.util.Properties.isJavaAtLeast("1.6")) {
+ val vendor = scala.util.Properties.javaVmVendor
+ if ((vendor contains "Oracle") || (vendor contains "Sun") || (vendor contains "Apple"))
+ new ForkJoinPool(
+ Runtime.getRuntime.availableProcessors(),
+ forkJoinPoolThreadFactory,
+ null,
+ false)
+ else
+ Executors.newCachedThreadPool(executorsThreadFactory)
+ } else Executors.newCachedThreadPool(executorsThreadFactory)
def execute(runnable: Runnable): Unit = executorService match {
case fj: ForkJoinPool =>
@@ -37,9 +71,7 @@ private[scala] class ExecutionContextImpl(val executorService: AnyRef) extends E
def run() = body()
})
- def blocking[T](body: =>T): T = blocking(body2awaitable(body), Duration.fromNanos(0))
-
- def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = {
+ def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
Future.releaseStack(this)
awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence)
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
index c79b0d02cc..f05e306088 100644
--- a/src/library/scala/concurrent/impl/Promise.scala
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -127,7 +127,7 @@ object Promise {
value.isDefined
}
- executor.blocking(concurrent.body2awaitable(awaitUnsafe(dur2long(atMost))), atMost)
+ blocking(concurrent.body2awaitable(awaitUnsafe(dur2long(atMost))), atMost)
}
def ready(atMost: Duration)(implicit permit: CanAwait): this.type =