aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/ThreadUtils.scala18
1 files changed, 18 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index f9fbe2ff85..9abbf4a7a3 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -20,6 +20,7 @@ package org.apache.spark.util
import java.util.concurrent._
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
+import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread}
import scala.util.control.NonFatal
import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
@@ -156,4 +157,21 @@ private[spark] object ThreadUtils {
result
}
}
+
+ /**
+ * Construct a new Scala ForkJoinPool with a specified max parallelism and name prefix.
+ */
+ def newForkJoinPool(prefix: String, maxThreadNumber: Int): SForkJoinPool = {
+ // Custom factory to set thread names
+ val factory = new SForkJoinPool.ForkJoinWorkerThreadFactory {
+ override def newThread(pool: SForkJoinPool) =
+ new SForkJoinWorkerThread(pool) {
+ setName(prefix + "-" + super.getName)
+ }
+ }
+ new SForkJoinPool(maxThreadNumber, factory,
+ null, // handler
+ false // asyncMode
+ )
+ }
}