aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorRyan Blue <blue@apache.org>2016-09-10 10:18:53 +0100
committerSean Owen <sowen@cloudera.com>2016-09-10 10:18:53 +0100
commit6ea5055fa734d435b5f148cf52d3385a57926b60 (patch)
tree345ad795d4f44c1bc5362cdfe854dad5ed592982 /core/src
parentbcdd259c371b1dcdb41baf227867d7e2ecb923c6 (diff)
downloadspark-6ea5055fa734d435b5f148cf52d3385a57926b60.tar.gz
spark-6ea5055fa734d435b5f148cf52d3385a57926b60.tar.bz2
spark-6ea5055fa734d435b5f148cf52d3385a57926b60.zip
[SPARK-17396][CORE] Share the task support between UnionRDD instances.
## What changes were proposed in this pull request? Share the ForkJoinTaskSupport between UnionRDD instances to avoid creating a huge number of threads if lots of RDDs are created at the same time. ## How was this patch tested? This uses existing UnionRDD tests. Author: Ryan Blue <blue@apache.org> Closes #14985 from rdblue/SPARK-17396-use-shared-pool.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala12
1 files changed, 7 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index 8171dcc046..ad1fddbde7 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -20,7 +20,7 @@ package org.apache.spark.rdd
import java.io.{IOException, ObjectOutputStream}
import scala.collection.mutable.ArrayBuffer
-import scala.collection.parallel.ForkJoinTaskSupport
+import scala.collection.parallel.{ForkJoinTaskSupport, ThreadPoolTaskSupport}
import scala.concurrent.forkjoin.ForkJoinPool
import scala.reflect.ClassTag
@@ -58,6 +58,11 @@ private[spark] class UnionPartition[T: ClassTag](
}
}
+object UnionRDD {
+ private[spark] lazy val partitionEvalTaskSupport =
+ new ForkJoinTaskSupport(new ForkJoinPool(8))
+}
+
@DeveloperApi
class UnionRDD[T: ClassTag](
sc: SparkContext,
@@ -68,13 +73,10 @@ class UnionRDD[T: ClassTag](
private[spark] val isPartitionListingParallel: Boolean =
rdds.length > conf.getInt("spark.rdd.parallelListingThreshold", 10)
- @transient private lazy val partitionEvalTaskSupport =
- new ForkJoinTaskSupport(new ForkJoinPool(8))
-
override def getPartitions: Array[Partition] = {
val parRDDs = if (isPartitionListingParallel) {
val parArray = rdds.par
- parArray.tasksupport = partitionEvalTaskSupport
+ parArray.tasksupport = UnionRDD.partitionEvalTaskSupport
parArray
} else {
rdds