aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorGuillaume Poulin <poulin.guillaume@gmail.com>2016-04-05 02:54:38 +0100
committerSean Owen <sowen@cloudera.com>2016-04-05 02:54:38 +0100
commit7201f033ce520259b6d07ea5ead92272cac92363 (patch)
tree294fdda0323bef5a42a7f3c3b5be5b1a66bd7eb2 /streaming
parenta172e11cba6f917baf5bd6c4f83dc6689932de9a (diff)
downloadspark-7201f033ce520259b6d07ea5ead92272cac92363.tar.gz
spark-7201f033ce520259b6d07ea5ead92272cac92363.tar.bz2
spark-7201f033ce520259b6d07ea5ead92272cac92363.zip
[SPARK-12425][STREAMING] DStream union optimisation
Use PartitionerAwareUnionRDD when possbile for optimizing shuffling and preserving the partitioner. Author: Guillaume Poulin <poulin.guillaume@gmail.com> Closes #10382 from gpoulin/dstream_union_optimisation.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala11
2 files changed, 4 insertions, 11 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
index c1846a31f6..d46c0a01e0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import org.apache.spark.SparkException
-import org.apache.spark.rdd.{RDD, UnionRDD}
+import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Duration, Time}
private[streaming]
@@ -45,7 +45,7 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]])
s" time $validTime")
}
if (rdds.nonEmpty) {
- Some(new UnionRDD(ssc.sc, rdds))
+ Some(ssc.sc.union(rdds))
} else {
None
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index ee50a8d024..fe0f875525 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.dstream
import scala.reflect.ClassTag
-import org.apache.spark.rdd.{PartitionerAwareUnionRDD, RDD, UnionRDD}
+import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.Duration
@@ -63,13 +63,6 @@ class WindowedDStream[T: ClassTag](
override def compute(validTime: Time): Option[RDD[T]] = {
val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)
val rddsInWindow = parent.slice(currentWindow)
- val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {
- logDebug("Using partition aware union for windowing at " + validTime)
- new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow)
- } else {
- logDebug("Using normal union for windowing at " + validTime)
- new UnionRDD(ssc.sc, rddsInWindow)
- }
- Some(windowRDD)
+ Some(ssc.sc.union(rddsInWindow))
}
}