diff options
author | Guillaume Poulin <poulin.guillaume@gmail.com> | 2016-04-05 02:54:38 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-04-05 02:54:38 +0100 |
commit | 7201f033ce520259b6d07ea5ead92272cac92363 (patch) | |
tree | 294fdda0323bef5a42a7f3c3b5be5b1a66bd7eb2 /streaming | |
parent | a172e11cba6f917baf5bd6c4f83dc6689932de9a (diff) | |
download | spark-7201f033ce520259b6d07ea5ead92272cac92363.tar.gz spark-7201f033ce520259b6d07ea5ead92272cac92363.tar.bz2 spark-7201f033ce520259b6d07ea5ead92272cac92363.zip |
[SPARK-12425][STREAMING] DStream union optimisation
Use PartitionerAwareUnionRDD when possbile for optimizing shuffling and
preserving the partitioner.
Author: Guillaume Poulin <poulin.guillaume@gmail.com>
Closes #10382 from gpoulin/dstream_union_optimisation.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala | 4 | ||||
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala | 11 |
2 files changed, 4 insertions, 11 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala index c1846a31f6..d46c0a01e0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala @@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import org.apache.spark.SparkException -import org.apache.spark.rdd.{RDD, UnionRDD} +import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Duration, Time} private[streaming] @@ -45,7 +45,7 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]]) s" time $validTime") } if (rdds.nonEmpty) { - Some(new UnionRDD(ssc.sc, rdds)) + Some(ssc.sc.union(rdds)) } else { None } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala index ee50a8d024..fe0f875525 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming.dstream import scala.reflect.ClassTag -import org.apache.spark.rdd.{PartitionerAwareUnionRDD, RDD, UnionRDD} +import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.Duration @@ -63,13 +63,6 @@ class WindowedDStream[T: ClassTag]( override def compute(validTime: Time): Option[RDD[T]] = { val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) val rddsInWindow = parent.slice(currentWindow) - val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) { - logDebug("Using partition aware union for windowing at " + validTime) - new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow) - } else { - logDebug("Using normal union for windowing at " + validTime) - new UnionRDD(ssc.sc, rddsInWindow) - } - Some(windowRDD) + Some(ssc.sc.union(rddsInWindow)) } } |