diff options
author | Aleksandar Prokopec <axel22@gmail.com> | 2012-03-30 14:52:10 +0200 |
---|---|---|
committer | Aleksandar Prokopec <axel22@gmail.com> | 2012-03-30 14:52:10 +0200 |
commit | 44551b2e9fb3ba76278dc6c4b56195cc65a1dff8 (patch) | |
tree | c2b7c40c83f2bac1a61b59bcfeea5b5d4e368142 /src | |
parent | 7c84f1fc32e99a309f9b47b7359764aecbfc21e6 (diff) | |
download | scala-44551b2e9fb3ba76278dc6c4b56195cc65a1dff8.tar.gz scala-44551b2e9fb3ba76278dc6c4b56195cc65a1dff8.tar.bz2 scala-44551b2e9fb3ba76278dc6c4b56195cc65a1dff8.zip |
Remove blocking from execution contexts.
Diffstat (limited to 'src')
4 files changed, 54 insertions, 26 deletions
diff --git a/src/library/scala/concurrent/ConcurrentPackageObject.scala b/src/library/scala/concurrent/ConcurrentPackageObject.scala index ba98757906..f4744a8757 100644 --- a/src/library/scala/concurrent/ConcurrentPackageObject.scala +++ b/src/library/scala/concurrent/ConcurrentPackageObject.scala @@ -10,8 +10,8 @@ package scala.concurrent -import java.util.concurrent.{ Executors, ExecutorService } -import scala.concurrent.forkjoin.ForkJoinPool +import java.util.concurrent.{ Executors, ExecutorService, ThreadFactory } +import scala.concurrent.forkjoin.{ ForkJoinPool, ForkJoinWorkerThread } import scala.util.{ Try, Success, Failure } import scala.concurrent.util.Duration import ConcurrentPackageObject._ @@ -24,14 +24,9 @@ abstract class ConcurrentPackageObject { /** A global execution environment for executing lightweight tasks. */ lazy val executionContext = - new impl.ExecutionContextImpl(getExecutorService) - - private[concurrent] def getExecutorService: AnyRef = - if (scala.util.Properties.isJavaAtLeast("1.6")) { - val vendor = scala.util.Properties.javaVmVendor - if ((vendor contains "Oracle") || (vendor contains "Sun") || (vendor contains "Apple")) new ForkJoinPool - else Executors.newCachedThreadPool() - } else Executors.newCachedThreadPool() + new impl.ExecutionContextImpl() + + private val currentExecutionContext = new ThreadLocal[ExecutionContext] val handledFutureException: PartialFunction[Throwable, Throwable] = { case t: Throwable if isFutureThrowable(t) => t @@ -58,10 +53,10 @@ abstract class ConcurrentPackageObject { /* concurrency constructs */ - def future[T](body: =>T)(implicit execCtx: ExecutionContext = executionContext): Future[T] = + def future[T](body: =>T)(implicit execctx: ExecutionContext = executionContext): Future[T] = Future[T](body) - def promise[T]()(implicit execCtx: ExecutionContext = executionContext): Promise[T] = + def promise[T]()(implicit execctx: ExecutionContext = executionContext): Promise[T] = Promise[T]() /** Wraps a block of code into an awaitable object. */ @@ -82,8 +77,8 @@ abstract class ConcurrentPackageObject { * - InterruptedException - in the case that a wait within the blockable object was interrupted * - TimeoutException - in the case that the blockable object timed out */ - def blocking[T](body: =>T)(implicit execCtx: ExecutionContext): T = - executionContext.blocking(body) + def blocking[T](body: =>T): T = + blocking(body2awaitable(body), Duration.fromNanos(0)) /** Blocks on an awaitable object. * @@ -94,8 +89,11 @@ abstract class ConcurrentPackageObject { * - InterruptedException - in the case that a wait within the blockable object was interrupted * - TimeoutException - in the case that the blockable object timed out */ - def blocking[T](awaitable: Awaitable[T], atMost: Duration)(implicit execCtx: ExecutionContext = executionContext): T = - executionContext.blocking(awaitable, atMost) + def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = + currentExecutionContext.get match { + case null => Await.result(awaitable, atMost) + case ec => ec.internalBlockingCall(awaitable, atMost) + } @inline implicit final def int2durationops(x: Int): DurationOps = new DurationOps(x) } diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index a206a2d4ea..f639f76dc9 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -26,10 +26,8 @@ trait ExecutionContext { def execute[U](body: () => U): Unit - def blocking[T](body: =>T): T - - def blocking[T](awaitable: Awaitable[T], atMost: Duration): T - + def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T + def reportFailure(t: Throwable): Unit /* implementations follow */ diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala index 5dc440f42b..2cfd6f22cd 100644 --- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala +++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala @@ -10,7 +10,7 @@ package scala.concurrent.impl -import java.util.concurrent.{Callable, ExecutorService} +import java.util.concurrent.{Callable, ExecutorService, Executors, ThreadFactory} import scala.concurrent.forkjoin._ import scala.concurrent.{ExecutionContext, resolver, Awaitable, body2awaitable} import scala.util.{ Try, Success, Failure } @@ -18,8 +18,42 @@ import scala.concurrent.util.{ Duration } -private[scala] class ExecutionContextImpl(val executorService: AnyRef) extends ExecutionContext { +private[scala] class ExecutionContextImpl() extends ExecutionContext { import ExecutionContextImpl._ + + val executorService: AnyRef = getExecutorService + + // to ensure that the current execution context thread local is properly set + private def executorsThreadFactory = new ThreadFactory { + def newThread(r: Runnable) = new Thread(new Runnable { + override def run() { + currentExecutionContext.set(ExecutionContextImpl.this) + r.run() + } + }) + } + + // to ensure that the current execution context thread local is properly set + private def forkJoinPoolThreadFactory = new ForkJoinPool.ForkJoinWorkerThreadFactory { + def newThread(fjp: ForkJoinPool) = new ForkJoinWorkerThread(fjp) { + override def onStart() { + currentExecutionContext.set(ExecutionContextImpl.this) + } + } + } + + private def getExecutorService: AnyRef = + if (scala.util.Properties.isJavaAtLeast("1.6")) { + val vendor = scala.util.Properties.javaVmVendor + if ((vendor contains "Oracle") || (vendor contains "Sun") || (vendor contains "Apple")) + new ForkJoinPool( + Runtime.getRuntime.availableProcessors(), + forkJoinPoolThreadFactory, + null, + false) + else + Executors.newCachedThreadPool(executorsThreadFactory) + } else Executors.newCachedThreadPool(executorsThreadFactory) def execute(runnable: Runnable): Unit = executorService match { case fj: ForkJoinPool => @@ -37,9 +71,7 @@ private[scala] class ExecutionContextImpl(val executorService: AnyRef) extends E def run() = body() }) - def blocking[T](body: =>T): T = blocking(body2awaitable(body), Duration.fromNanos(0)) - - def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = { + def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = { Future.releaseStack(this) awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index c79b0d02cc..f05e306088 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -127,7 +127,7 @@ object Promise { value.isDefined } - executor.blocking(concurrent.body2awaitable(awaitUnsafe(dur2long(atMost))), atMost) + blocking(concurrent.body2awaitable(awaitUnsafe(dur2long(atMost))), atMost) } def ready(atMost: Duration)(implicit permit: CanAwait): this.type = |