summaryrefslogtreecommitdiff
path: root/src/library/scala/concurrent/impl
diff options
context:
space:
mode:
authorJason Zaugg <jzaugg@gmail.com>2014-11-05 11:40:26 +1000
committerJason Zaugg <jzaugg@gmail.com>2014-11-05 20:15:01 +1000
commit9273c333b536e45d561dd9798e88545794459b7c (patch)
tree4a7f34cc8f67ed6e4a86f79cf4e0fbd6682fc317 /src/library/scala/concurrent/impl
parent4059d76dd9240f365712a79ba086f70f2eb8be9b (diff)
downloadscala-9273c333b536e45d561dd9798e88545794459b7c.tar.gz
scala-9273c333b536e45d561dd9798e88545794459b7c.tar.bz2
scala-9273c333b536e45d561dd9798e88545794459b7c.zip
SI-8955 Fix hanging fork-join pool via parallel collections
A recent change [1] to Scala's default fork join thread pool caused intermittent deadlocks. This is only visible in the development series of Scala, 2.12.0-SNAPSHOT. We changed our thread factory to place a hard limit the number of threads created (equal to the desired parallelism.) I have extracted a test case [2] that uses jsr166e directly, rather than using Scala's parallel collections and abstractions on top of FJ. In the comments of the bug, Viktor suggests this was too aggressive and instead we ought to increase the limit to parallelism + 256 (with a system property override.) He explained: > The number 256 is going to be the default for the max threads for > FJP in Java9 (down from 32k) so this change will harmonize the > settings while making it possible to override from the outside. > > The cause of the deadlock is twofold: > > 1) The test uses ExecutionContext.global, which is not designed > for typical ForkJoin workloads since it has async = true > (FIFO instead of LIFO) > 2) And we capped the default max number of threads to be created > when doing managed blocking from 32k to number of cores > (a tad too aggressive it seems) Through testing, I found that for this example I could trigger the hang with: parallelismLevel | maxThreads ----------------------------- 2 | <= 4 4 | <= 9 8 | <= 11 16 | <= 15 I have emailed concurrency-interest [3] to help analyse the problem further, but in the interest of avoiding hangs in the scalacheck/parallel-collections test, I'm implementing Viktor's suggestion in the interim. [1] https://github.com/scala/scala/pull/4042 [2] https://gist.github.com/retronym/2e14cdab6d5612562d95 [3] http://markmail.org/message/czphdyjxpkixeztv
Diffstat (limited to 'src/library/scala/concurrent/impl')
-rw-r--r--src/library/scala/concurrent/impl/ExecutionContextImpl.scala12
1 files changed, 9 insertions, 3 deletions
diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
index 32f30b9049..0c7f98ce5a 100644
--- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
+++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
@@ -101,20 +101,26 @@ private[concurrent] object ExecutionContextImpl {
}
def range(floor: Int, desired: Int, ceiling: Int) = scala.math.min(scala.math.max(floor, desired), ceiling)
-
+ val numThreads = getInt("scala.concurrent.context.numThreads", "x1")
+ // The hard limit on the number of active threads that the thread factory will produce
+ // SI-8955 Deadlocks can happen if maxNoOfThreads is too low, although we're currently not sure
+ // about what the exact threshhold is. numThreads + 256 is conservatively high.
val maxNoOfThreads = getInt("scala.concurrent.context.maxThreads", "x1")
val desiredParallelism = range(
getInt("scala.concurrent.context.minThreads", "1"),
- getInt("scala.concurrent.context.numThreads", "x1"),
+ numThreads,
maxNoOfThreads)
+ // The thread factory must provide additional threads to support managed blocking.
+ val maxExtraThreads = getInt("scala.concurrent.context.maxExtraThreads", "256")
+
val uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = new Thread.UncaughtExceptionHandler {
override def uncaughtException(thread: Thread, cause: Throwable): Unit = reporter(cause)
}
val threadFactory = new ExecutionContextImpl.DefaultThreadFactory(daemonic = true,
- maxThreads = maxNoOfThreads,
+ maxThreads = maxNoOfThreads + maxExtraThreads,
prefix = "scala-execution-context-global",
uncaught = uncaughtExceptionHandler)