aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorHolden Karau <holden@us.ibm.com>2016-03-04 10:56:58 +0000
committerSean Owen <sowen@cloudera.com>2016-03-04 10:56:58 +0000
commitc04dc27cedd3d75781fda4c24da16b6ada44d3e4 (patch)
tree70f0f5fa93c9dae9f13a31e24b54a18fee0e5977 /core
parent27e88faa058c1364d0e99fffc0c5cb64ef817bd3 (diff)
downloadspark-c04dc27cedd3d75781fda4c24da16b6ada44d3e4.tar.gz
spark-c04dc27cedd3d75781fda4c24da16b6ada44d3e4.tar.bz2
spark-c04dc27cedd3d75781fda4c24da16b6ada44d3e4.zip
[SPARK-13398][STREAMING] Move away from thread pool task support to forkjoin
## What changes were proposed in this pull request? Remove old deprecated ThreadPoolExecutor and replace with ExecutionContext using a ForkJoinPool. The downside of this is that scala's ForkJoinPool doesn't give us a way to specify the thread pool name (and is a wrapper of Java's in 2.12) except by providing a custom factory. Note that we can't use Java's ForkJoinPool directly in Scala 2.11 since it uses a ExecutionContext which reports system parallelism. One other implicit change that happens is the old ExecutionContext would have reported a different default parallelism since it used system parallelism rather than threadpool parallelism (this was likely not intended but also likely not a huge difference). The previous version of this PR attempted to use an execution context constructed on the ThreadPool (but not the deprecated ThreadPoolExecutor class) so as to keep the ability to have human readable named threads but this reported system parallelism. ## How was this patch tested? unit tests: streaming/testOnly org.apache.spark.streaming.util.* Author: Holden Karau <holden@us.ibm.com> Closes #11423 from holdenk/SPARK-13398-move-away-from-ThreadPoolTaskSupport-java-forkjoin.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/ThreadUtils.scala18
1 files changed, 18 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index f9fbe2ff85..9abbf4a7a3 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -20,6 +20,7 @@ package org.apache.spark.util
import java.util.concurrent._
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
+import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread}
import scala.util.control.NonFatal
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
@@ -156,4 +157,21 @@ private[spark] object ThreadUtils {
result
}
}
+
+ /**
+ * Construct a new Scala ForkJoinPool with a specified max parallelism and name prefix.
+ */
+ def newForkJoinPool(prefix: String, maxThreadNumber: Int): SForkJoinPool = {
+ // Custom factory to set thread names
+ val factory = new SForkJoinPool.ForkJoinWorkerThreadFactory {
+ override def newThread(pool: SForkJoinPool) =
+ new SForkJoinWorkerThread(pool) {
+ setName(prefix + "-" + super.getName)
+ }
+ }
+ new SForkJoinPool(maxThreadNumber, factory,
+ null, // handler
+ false // asyncMode
+ )
+ }
}