From c04dc27cedd3d75781fda4c24da16b6ada44d3e4 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 4 Mar 2016 10:56:58 +0000 Subject: [SPARK-13398][STREAMING] Move away from thread pool task support to forkjoin ## What changes were proposed in this pull request? Remove old deprecated ThreadPoolExecutor and replace with ExecutionContext using a ForkJoinPool. The downside of this is that scala's ForkJoinPool doesn't give us a way to specify the thread pool name (and is a wrapper of Java's in 2.12) except by providing a custom factory. Note that we can't use Java's ForkJoinPool directly in Scala 2.11 since it uses a ExecutionContext which reports system parallelism. One other implicit change that happens is the old ExecutionContext would have reported a different default parallelism since it used system parallelism rather than threadpool parallelism (this was likely not intended but also likely not a huge difference). The previous version of this PR attempted to use an execution context constructed on the ThreadPool (but not the deprecated ThreadPoolExecutor class) so as to keep the ability to have human readable named threads but this reported system parallelism. ## How was this patch tested? unit tests: streaming/testOnly org.apache.spark.streaming.util.* Author: Holden Karau Closes #11423 from holdenk/SPARK-13398-move-away-from-ThreadPoolTaskSupport-java-forkjoin. --- .../main/scala/org/apache/spark/util/ThreadUtils.scala | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index f9fbe2ff85..9abbf4a7a3 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -20,6 +20,7 @@ package org.apache.spark.util import java.util.concurrent._ import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} +import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread} import scala.util.control.NonFatal import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder} @@ -156,4 +157,21 @@ private[spark] object ThreadUtils { result } } + + /** + * Construct a new Scala ForkJoinPool with a specified max parallelism and name prefix. + */ + def newForkJoinPool(prefix: String, maxThreadNumber: Int): SForkJoinPool = { + // Custom factory to set thread names + val factory = new SForkJoinPool.ForkJoinWorkerThreadFactory { + override def newThread(pool: SForkJoinPool) = + new SForkJoinWorkerThread(pool) { + setName(prefix + "-" + super.getName) + } + } + new SForkJoinPool(maxThreadNumber, factory, + null, // handler + false // asyncMode + ) + } } -- cgit v1.2.3