aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-02 13:20:54 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-02 13:20:54 -0800
commit588a1695f4b0b7763ecfa8ea56e371783810dd68 (patch)
tree86913efaabe18c9ca8b27afa9f35947a392f2468 /streaming
parent7bafb68d77fa156c0dd7541aeef14626f867726b (diff)
parent5fde4566ea48e5c6d6c50af032a29eaded2d7c43 (diff)
downloadspark-588a1695f4b0b7763ecfa8ea56e371783810dd68.tar.gz
spark-588a1695f4b0b7763ecfa8ea56e371783810dd68.tar.bz2
spark-588a1695f4b0b7763ecfa8ea56e371783810dd68.zip
Merge pull request #297 from tdas/window-improvement
Improvements to DStream window ops and refactoring of Spark's CheckpointSuite - Added a new RDD - PartitionerAwareUnionRDD. Using this RDD, one can take multiple RDDs partitioned by the same partitioner and unify them into a single RDD while preserving the partitioner. So m RDDs with p partitions each will be unified to a single RDD with p partitions and the same partitioner. The preferred location for each partition of the unified RDD will be the most common preferred location of the corresponding partitions of the parent RDDs. For example, location of partition 0 of the unified RDD will be where most of partition 0 of the parent RDDs are located. - Improved the performance of DStream's reduceByKeyAndWindow and groupByKeyAndWindow. Both these operations work by doing per-batch reduceByKey/groupByKey and then using PartitionerAwareUnionRDD to union the RDDs across the window. This eliminates a shuffle related to the window operation, which can reduce batch processing time by 30-40% for simple workloads. - Fixed bugs and simplified Spark's CheckpointSuite. Some of the tests were incorrect and unreliable. Added missing tests for ZippedRDD. I can go into greater detail if necessary. - Added mapSideCombine option to combineByKeyAndWindow.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala13
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala18
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala9
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala16
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala4
5 files changed, 45 insertions, 15 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
index 80af96c060..56dbcbda23 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
@@ -108,8 +108,9 @@ extends Serializable {
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiner: (C, C) => C,
- partitioner: Partitioner) : DStream[(K, C)] = {
- new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner)
+ partitioner: Partitioner,
+ mapSideCombine: Boolean = true): DStream[(K, C)] = {
+ new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)
}
/**
@@ -173,7 +174,13 @@ extends Serializable {
slideDuration: Duration,
partitioner: Partitioner
): DStream[(K, Seq[V])] = {
- self.window(windowDuration, slideDuration).groupByKey(partitioner)
+ val createCombiner = (v: Seq[V]) => new ArrayBuffer[V] ++= v
+ val mergeValue = (buf: ArrayBuffer[V], v: Seq[V]) => buf ++= v
+ val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2
+ self.groupByKey(partitioner)
+ .window(windowDuration, slideDuration)
+ .combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, partitioner)
+ .asInstanceOf[DStream[(K, Seq[V])]]
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index dfd6e27c3e..6c3467d405 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -155,7 +155,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Combine elements of each key in DStream's RDDs using custom function. This is similar to the
- * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.PairRDDFunctions]] for more
+ * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.rdd.PairRDDFunctions]] for more
* information.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
@@ -169,6 +169,22 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
+ * Combine elements of each key in DStream's RDDs using custom function. This is similar to the
+ * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.rdd.PairRDDFunctions]] for more
+ * information.
+ */
+ def combineByKey[C](createCombiner: JFunction[V, C],
+ mergeValue: JFunction2[C, V, C],
+ mergeCombiners: JFunction2[C, C, C],
+ partitioner: Partitioner,
+ mapSideCombine: Boolean
+ ): JavaPairDStream[K, C] = {
+ implicit val cm: ClassTag[C] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
+ dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine)
+ }
+
+ /**
* Return a new DStream by applying `groupByKey` over a sliding window. This is similar to
* `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs
* with the same interval as this DStream. Hash partitioning is used to generate the RDDs with
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
index e6e0022097..84e69f277b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
@@ -29,8 +29,9 @@ class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiner: (C, C) => C,
- partitioner: Partitioner
- ) extends DStream [(K,C)] (parent.ssc) {
+ partitioner: Partitioner,
+ mapSideCombine: Boolean = true
+ ) extends DStream[(K,C)] (parent.ssc) {
override def dependencies = List(parent)
@@ -38,8 +39,8 @@ class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag](
override def compute(validTime: Time): Option[RDD[(K,C)]] = {
parent.getOrCompute(validTime) match {
- case Some(rdd) =>
- Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner))
+ case Some(rdd) => Some(rdd.combineByKey[C](
+ createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))
case None => None
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index 73d959331a..89c43ff935 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -17,10 +17,10 @@
package org.apache.spark.streaming.dstream
-import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.UnionRDD
+import org.apache.spark.rdd.{PartitionerAwareUnionRDD, RDD, UnionRDD}
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Duration, Interval, Time, DStream}
+import org.apache.spark.streaming._
+import org.apache.spark.streaming.Duration
import scala.reflect.ClassTag
@@ -51,6 +51,14 @@ class WindowedDStream[T: ClassTag](
override def compute(validTime: Time): Option[RDD[T]] = {
val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)
- Some(new UnionRDD(ssc.sc, parent.slice(currentWindow)))
+ val rddsInWindow = parent.slice(currentWindow)
+ val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {
+ logDebug("Using partition aware union for windowing at " + validTime)
+ new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow)
+ } else {
+ logDebug("Using normal union for windowing at " + validTime)
+ new UnionRDD(ssc.sc,rddsInWindow)
+ }
+ Some(windowRDD)
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
index c92c34d49b..c39abfc21b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
@@ -224,9 +224,7 @@ class WindowOperationsSuite extends TestSuiteBase {
val slideDuration = Seconds(1)
val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
- s.groupByKeyAndWindow(windowDuration, slideDuration)
- .map(x => (x._1, x._2.toSet))
- .persist()
+ s.groupByKeyAndWindow(windowDuration, slideDuration).map(x => (x._1, x._2.toSet))
}
testOperation(input, operation, expectedOutput, numBatches, true)
}