summaryrefslogtreecommitdiff
path: root/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/library/scala/concurrent/impl/ExecutionContextImpl.scala')
-rw-r--r--src/library/scala/concurrent/impl/ExecutionContextImpl.scala57
1 files changed, 30 insertions, 27 deletions
diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
index 0aa6b37ffc..479720287c 100644
--- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
+++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
@@ -30,7 +30,7 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter:
}
// Implement BlockContext on FJP threads
- class DefaultThreadFactory(daemonic: Boolean) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory {
+ class DefaultThreadFactory(daemonic: Boolean) extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory {
def wire[T <: Thread](thread: T): T = {
thread.setDaemon(daemonic)
thread.setUncaughtExceptionHandler(uncaughtExceptionHandler)
@@ -57,22 +57,22 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter:
def createExecutorService: ExecutorService = {
- def getInt(name: String, f: String => Int): Int =
- try f(System.getProperty(name)) catch { case e: Exception => Runtime.getRuntime.availableProcessors }
- def range(floor: Int, desired: Int, ceiling: Int): Int =
- if (ceiling < floor) range(ceiling, desired, floor) else scala.math.min(scala.math.max(desired, floor), ceiling)
+ def getInt(name: String, default: String) = (try System.getProperty(name, default) catch {
+ case e: SecurityException => default
+ }) match {
+ case s if s.charAt(0) == 'x' => (Runtime.getRuntime.availableProcessors * s.substring(1).toDouble).ceil.toInt
+ case other => other.toInt
+ }
+
+ def range(floor: Int, desired: Int, ceiling: Int) = scala.math.min(scala.math.max(floor, desired), ceiling)
val desiredParallelism = range(
- getInt("scala.concurrent.context.minThreads", _.toInt),
- getInt("scala.concurrent.context.numThreads", {
- case null | "" => Runtime.getRuntime.availableProcessors
- case s if s.charAt(0) == 'x' => (Runtime.getRuntime.availableProcessors * s.substring(1).toDouble).ceil.toInt
- case other => other.toInt
- }),
- getInt("scala.concurrent.context.maxThreads", _.toInt))
+ getInt("scala.concurrent.context.minThreads", "1"),
+ getInt("scala.concurrent.context.numThreads", "x1"),
+ getInt("scala.concurrent.context.maxThreads", "x1"))
val threadFactory = new DefaultThreadFactory(daemonic = true)
-
+
try {
new ForkJoinPool(
desiredParallelism,
@@ -96,12 +96,26 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter:
}
}
-
def execute(runnable: Runnable): Unit = executor match {
case fj: ForkJoinPool =>
- val fjt = runnable match {
+ val fjt: ForkJoinTask[_] = runnable match {
case t: ForkJoinTask[_] => t
- case runnable => new ForkJoinTask[Unit] {
+ case r => new ExecutionContextImpl.AdaptedForkJoinTask(r)
+ }
+ Thread.currentThread match {
+ case fjw: ForkJoinWorkerThread if fjw.getPool eq fj => fjt.fork()
+ case _ => fj execute fjt
+ }
+ case generic => generic execute runnable
+ }
+
+ def reportFailure(t: Throwable) = reporter(t)
+}
+
+
+private[concurrent] object ExecutionContextImpl {
+
+ final class AdaptedForkJoinTask(runnable: Runnable) extends ForkJoinTask[Unit] {
final override def setRawResult(u: Unit): Unit = ()
final override def getRawResult(): Unit = ()
final override def exec(): Boolean = try { runnable.run(); true } catch {
@@ -114,18 +128,7 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter:
throw anything
}
}
- }
- Thread.currentThread match {
- case fjw: ForkJoinWorkerThread if fjw.getPool eq fj => fjt.fork()
- case _ => fj execute fjt
- }
- case generic => generic execute runnable
- }
-
- def reportFailure(t: Throwable) = reporter(t)
-}
-private[concurrent] object ExecutionContextImpl {
def fromExecutor(e: Executor, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl = new ExecutionContextImpl(e, reporter)
def fromExecutorService(es: ExecutorService, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl with ExecutionContextExecutorService =
new ExecutionContextImpl(es, reporter) with ExecutionContextExecutorService {