aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorRyan Blue <blue@apache.org>2016-05-05 14:40:37 -0700
committerAndrew Or <andrew@databricks.com>2016-05-05 14:40:37 -0700
commit08db491265a3b50e31993ac6aa07c3f0dd08cdbb (patch)
tree6879d3dd877b128592555a2ea86ee2835e4f7199 /core
parent5c47db06570e65d3f5544d6f26bbdf893e275b94 (diff)
downloadspark-08db491265a3b50e31993ac6aa07c3f0dd08cdbb.tar.gz
spark-08db491265a3b50e31993ac6aa07c3f0dd08cdbb.tar.bz2
spark-08db491265a3b50e31993ac6aa07c3f0dd08cdbb.zip
[SPARK-9926] Parallelize partition logic in UnionRDD.
This patch has the new logic from #8512 that uses a parallel collection to compute partitions in UnionRDD. The rest of #8512 added an alternative code path for calculating splits in S3, but that isn't necessary to get the same speedup. The underlying problem wasn't that bulk listing wasn't used, it was that an extra FileStatus was retrieved for each file. The fix was just committed as [HADOOP-12810](https://issues.apache.org/jira/browse/HADOOP-12810). (I think the original commit also used a single prefix to enumerate all paths, but that isn't always helpful and it was removed in later versions so there is no need for SparkS3Utils.) I tested this using the same table that piapiaozhexiu was using. Calculating splits for a 10-day period took 25 seconds with this change and HADOOP-12810, which is on par with the results from #8512. Author: Ryan Blue <blue@apache.org> Author: Cheolsoo Park <cheolsoop@netflix.com> Closes #11242 from rdblue/SPARK-9926-parallelize-union-rdd.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala18
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala17
2 files changed, 34 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index 66cf4369da..8171dcc046 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -20,6 +20,8 @@ package org.apache.spark.rdd
import java.io.{IOException, ObjectOutputStream}
import scala.collection.mutable.ArrayBuffer
+import scala.collection.parallel.ForkJoinTaskSupport
+import scala.concurrent.forkjoin.ForkJoinPool
import scala.reflect.ClassTag
import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, TaskContext}
@@ -62,8 +64,22 @@ class UnionRDD[T: ClassTag](
var rdds: Seq[RDD[T]])
extends RDD[T](sc, Nil) { // Nil since we implement getDependencies
+ // visible for testing
+ private[spark] val isPartitionListingParallel: Boolean =
+ rdds.length > conf.getInt("spark.rdd.parallelListingThreshold", 10)
+
+ @transient private lazy val partitionEvalTaskSupport =
+ new ForkJoinTaskSupport(new ForkJoinPool(8))
+
override def getPartitions: Array[Partition] = {
- val array = new Array[Partition](rdds.map(_.partitions.length).sum)
+ val parRDDs = if (isPartitionListingParallel) {
+ val parArray = rdds.par
+ parArray.tasksupport = partitionEvalTaskSupport
+ parArray
+ } else {
+ rdds
+ }
+ val array = new Array[Partition](parRDDs.map(_.partitions.length).seq.sum)
var pos = 0
for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) {
array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index)
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index a663dab772..979fb426c9 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -116,6 +116,23 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
assert(sc.union(Seq(nums, nums)).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
}
+ test("SparkContext.union parallel partition listing") {
+ val nums1 = sc.makeRDD(Array(1, 2, 3, 4), 2)
+ val nums2 = sc.makeRDD(Array(5, 6, 7, 8), 2)
+ val serialUnion = sc.union(nums1, nums2)
+ val expected = serialUnion.collect().toList
+
+ assert(serialUnion.asInstanceOf[UnionRDD[Int]].isPartitionListingParallel === false)
+
+ sc.conf.set("spark.rdd.parallelListingThreshold", "1")
+ val parallelUnion = sc.union(nums1, nums2)
+ val actual = parallelUnion.collect().toList
+ sc.conf.remove("spark.rdd.parallelListingThreshold")
+
+ assert(parallelUnion.asInstanceOf[UnionRDD[Int]].isPartitionListingParallel === true)
+ assert(expected === actual)
+ }
+
test("SparkContext.union creates UnionRDD if at least one RDD has no partitioner") {
val rddWithPartitioner = sc.parallelize(Seq(1 -> true)).partitionBy(new HashPartitioner(1))
val rddWithNoPartitioner = sc.parallelize(Seq(2 -> true))