From e5f42a9b88e0a3ca1ac72e2c09ff3c954c3781ac Mon Sep 17 00:00:00 2001 From: phaller Date: Tue, 5 Jun 2012 12:08:32 +0200 Subject: Add configuration for ExecutionContext --- .../concurrent/impl/ExecutionContextImpl.scala | 28 ++++++++++++++++------ 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala index 1083a93439..7549bf8314 100644 --- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala +++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala @@ -43,17 +43,31 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: } } } - - def createExecutorService: ExecutorService = try { new ForkJoinPool( - Runtime.getRuntime.availableProcessors(), //FIXME from config - forkJoinPoolThreadFactory, - null, //FIXME we should have an UncaughtExceptionHandler, see what Akka does - true) //FIXME I really think this should be async... + + def createExecutorService: ExecutorService = try { + def getInt(name: String, f: String => Int): Int = + try f(System.getProperty(name)) catch { case e: Exception => Runtime.getRuntime.availableProcessors } + def range(floor: Int, desired: Int, ceiling: Int): Int = + if (ceiling < floor) range(ceiling, desired, floor) else scala.math.min(scala.math.max(desired, floor), ceiling) + + new ForkJoinPool( + range( + getInt("scala.concurrent.ec.minThreads", _.toInt), + getInt("scala.concurrent.ec.numThreads", { + case null | "" => Runtime.getRuntime.availableProcessors + case s if s.charAt(0) == 'x' => (Runtime.getRuntime.availableProcessors * s.substring(1).toDouble).ceil.toInt + case other => other.toInt + }), + getInt("scala.concurrent.ec.maxThreads", _.toInt) + ), + forkJoinPoolThreadFactory, + null, //FIXME we should have an UncaughtExceptionHandler, see what Akka does + true) //FIXME I really think this should be async... } catch { case NonFatal(t) => System.err.println("Failed to create ForkJoinPool for the default ExecutionContext, falling back to Executors.newCachedThreadPool") t.printStackTrace(System.err) - Executors.newCachedThreadPool(executorsThreadFactory) + Executors.newCachedThreadPool(executorsThreadFactory) //FIXME use the same desired parallelism here too? } def execute(runnable: Runnable): Unit = executor match { -- cgit v1.2.3