diff options
Diffstat (limited to 'src/library/scala/concurrent/impl/ExecutionContextImpl.scala')
-rw-r--r-- | src/library/scala/concurrent/impl/ExecutionContextImpl.scala | 57 |
1 files changed, 30 insertions, 27 deletions
diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala index 0aa6b37ffc..479720287c 100644 --- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala +++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala @@ -30,7 +30,7 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: } // Implement BlockContext on FJP threads - class DefaultThreadFactory(daemonic: Boolean) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory { + class DefaultThreadFactory(daemonic: Boolean) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory { def wire[T <: Thread](thread: T): T = { thread.setDaemon(daemonic) thread.setUncaughtExceptionHandler(uncaughtExceptionHandler) @@ -57,22 +57,22 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: def createExecutorService: ExecutorService = { - def getInt(name: String, f: String => Int): Int = - try f(System.getProperty(name)) catch { case e: Exception => Runtime.getRuntime.availableProcessors } - def range(floor: Int, desired: Int, ceiling: Int): Int = - if (ceiling < floor) range(ceiling, desired, floor) else scala.math.min(scala.math.max(desired, floor), ceiling) + def getInt(name: String, default: String) = (try System.getProperty(name, default) catch { + case e: SecurityException => default + }) match { + case s if s.charAt(0) == 'x' => (Runtime.getRuntime.availableProcessors * s.substring(1).toDouble).ceil.toInt + case other => other.toInt + } + + def range(floor: Int, desired: Int, ceiling: Int) = scala.math.min(scala.math.max(floor, desired), ceiling) val desiredParallelism = range( - getInt("scala.concurrent.context.minThreads", _.toInt), - getInt("scala.concurrent.context.numThreads", { - case null | "" => Runtime.getRuntime.availableProcessors - case s if s.charAt(0) == 'x' => (Runtime.getRuntime.availableProcessors * s.substring(1).toDouble).ceil.toInt - case other => other.toInt - }), - getInt("scala.concurrent.context.maxThreads", _.toInt)) + getInt("scala.concurrent.context.minThreads", "1"), + getInt("scala.concurrent.context.numThreads", "x1"), + getInt("scala.concurrent.context.maxThreads", "x1")) val threadFactory = new DefaultThreadFactory(daemonic = true) - + try { new ForkJoinPool( desiredParallelism, @@ -96,12 +96,26 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: } } - def execute(runnable: Runnable): Unit = executor match { case fj: ForkJoinPool => - val fjt = runnable match { + val fjt: ForkJoinTask[_] = runnable match { case t: ForkJoinTask[_] => t - case runnable => new ForkJoinTask[Unit] { + case r => new ExecutionContextImpl.AdaptedForkJoinTask(r) + } + Thread.currentThread match { + case fjw: ForkJoinWorkerThread if fjw.getPool eq fj => fjt.fork() + case _ => fj execute fjt + } + case generic => generic execute runnable + } + + def reportFailure(t: Throwable) = reporter(t) +} + + +private[concurrent] object ExecutionContextImpl { + + final class AdaptedForkJoinTask(runnable: Runnable) extends ForkJoinTask[Unit] { final override def setRawResult(u: Unit): Unit = () final override def getRawResult(): Unit = () final override def exec(): Boolean = try { runnable.run(); true } catch { @@ -114,18 +128,7 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: throw anything } } - } - Thread.currentThread match { - case fjw: ForkJoinWorkerThread if fjw.getPool eq fj => fjt.fork() - case _ => fj execute fjt - } - case generic => generic execute runnable - } - - def reportFailure(t: Throwable) = reporter(t) -} -private[concurrent] object ExecutionContextImpl { def fromExecutor(e: Executor, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl = new ExecutionContextImpl(e, reporter) def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl with ExecutionContextExecutorService = new ExecutionContextImpl(es, reporter) with ExecutionContextExecutorService { |