diff options
author | Ryan Blue <blue@apache.org> | 2016-09-10 10:18:53 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-09-10 10:18:53 +0100 |
commit | 6ea5055fa734d435b5f148cf52d3385a57926b60 (patch) | |
tree | 345ad795d4f44c1bc5362cdfe854dad5ed592982 | |
parent | bcdd259c371b1dcdb41baf227867d7e2ecb923c6 (diff) | |
download | spark-6ea5055fa734d435b5f148cf52d3385a57926b60.tar.gz spark-6ea5055fa734d435b5f148cf52d3385a57926b60.tar.bz2 spark-6ea5055fa734d435b5f148cf52d3385a57926b60.zip |
[SPARK-17396][CORE] Share the task support between UnionRDD instances.
## What changes were proposed in this pull request?
Share the ForkJoinTaskSupport between UnionRDD instances to avoid creating a huge number of threads if lots of RDDs are created at the same time.
## How was this patch tested?
This uses existing UnionRDD tests.
Author: Ryan Blue <blue@apache.org>
Closes #14985 from rdblue/SPARK-17396-use-shared-pool.
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala | 12 |
1 files changed, 7 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala index 8171dcc046..ad1fddbde7 100644 --- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala @@ -20,7 +20,7 @@ package org.apache.spark.rdd import java.io.{IOException, ObjectOutputStream} import scala.collection.mutable.ArrayBuffer -import scala.collection.parallel.ForkJoinTaskSupport +import scala.collection.parallel.{ForkJoinTaskSupport, ThreadPoolTaskSupport} import scala.concurrent.forkjoin.ForkJoinPool import scala.reflect.ClassTag @@ -58,6 +58,11 @@ private[spark] class UnionPartition[T: ClassTag]( } } +object UnionRDD { + private[spark] lazy val partitionEvalTaskSupport = + new ForkJoinTaskSupport(new ForkJoinPool(8)) +} + @DeveloperApi class UnionRDD[T: ClassTag]( sc: SparkContext, @@ -68,13 +73,10 @@ class UnionRDD[T: ClassTag]( private[spark] val isPartitionListingParallel: Boolean = rdds.length > conf.getInt("spark.rdd.parallelListingThreshold", 10) - @transient private lazy val partitionEvalTaskSupport = - new ForkJoinTaskSupport(new ForkJoinPool(8)) - override def getPartitions: Array[Partition] = { val parRDDs = if (isPartitionListingParallel) { val parArray = rdds.par - parArray.tasksupport = partitionEvalTaskSupport + parArray.tasksupport = UnionRDD.partitionEvalTaskSupport parArray } else { rdds |