diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-26 02:58:29 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-26 02:58:29 -0800 |
commit | 069cb14bdcbd366453db66932b8c83431c35f5f5 (patch) | |
tree | 2982e608f10a36382f579c06a26ad20c341dcdf9 | |
parent | 94479673eb0ea839d5f6b6bd43c5abf75af7b9eb (diff) | |
download | spark-069cb14bdcbd366453db66932b8c83431c35f5f5.tar.gz spark-069cb14bdcbd366453db66932b8c83431c35f5f5.tar.bz2 spark-069cb14bdcbd366453db66932b8c83431c35f5f5.zip |
Updated groupByKeyAndWindow to be computed incrementally, and added mapSideCombine to combineByKeyAndWindow.
5 files changed, 34 insertions, 12 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala index 80af96c060..56dbcbda23 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala @@ -108,8 +108,9 @@ extends Serializable { createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, - partitioner: Partitioner) : DStream[(K, C)] = { - new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner) + partitioner: Partitioner, + mapSideCombine: Boolean = true): DStream[(K, C)] = { + new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine) } /** @@ -173,7 +174,13 @@ extends Serializable { slideDuration: Duration, partitioner: Partitioner ): DStream[(K, Seq[V])] = { - self.window(windowDuration, slideDuration).groupByKey(partitioner) + val createCombiner = (v: Seq[V]) => new ArrayBuffer[V] ++= v + val mergeValue = (buf: ArrayBuffer[V], v: Seq[V]) => buf ++= v + val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2 + self.groupByKey(partitioner) + .window(windowDuration, slideDuration) + .combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, partitioner) + .asInstanceOf[DStream[(K, Seq[V])]] } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index dfd6e27c3e..6c3467d405 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -155,7 +155,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Combine elements of each key in DStream's RDDs using custom function. This is similar to the - * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.PairRDDFunctions]] for more + * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.rdd.PairRDDFunctions]] for more * information. */ def combineByKey[C](createCombiner: JFunction[V, C], @@ -169,6 +169,22 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** + * Combine elements of each key in DStream's RDDs using custom function. This is similar to the + * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.rdd.PairRDDFunctions]] for more + * information. + */ + def combineByKey[C](createCombiner: JFunction[V, C], + mergeValue: JFunction2[C, V, C], + mergeCombiners: JFunction2[C, C, C], + partitioner: Partitioner, + mapSideCombine: Boolean + ): JavaPairDStream[K, C] = { + implicit val cm: ClassTag[C] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]] + dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine) + } + + /** * Return a new DStream by applying `groupByKey` over a sliding window. This is similar to * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala index e6e0022097..84e69f277b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala @@ -29,8 +29,9 @@ class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, - partitioner: Partitioner - ) extends DStream [(K,C)] (parent.ssc) { + partitioner: Partitioner, + mapSideCombine: Boolean = true + ) extends DStream[(K,C)] (parent.ssc) { override def dependencies = List(parent) @@ -38,8 +39,8 @@ class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag]( override def compute(validTime: Time): Option[RDD[(K,C)]] = { parent.getOrCompute(validTime) match { - case Some(rdd) => - Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner)) + case Some(rdd) => Some(rdd.combineByKey[C]( + createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)) case None => None } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala index ea21ab505b..ca4edae38a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala @@ -39,7 +39,7 @@ class WindowedDStream[T: ClassTag]( throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " + "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")") - val useNewUnion = System.getProperty("spark.streaming.useNewUnion", "false").toBoolean + val useNewUnion = System.getProperty("spark.streaming.useNewUnion", "true").toBoolean parent.persist(StorageLevel.MEMORY_ONLY_SER) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala index 6b4aaefcdf..b29d790a27 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala @@ -225,9 +225,7 @@ class WindowOperationsSuite extends TestSuiteBase { val slideDuration = Seconds(1) val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt val operation = (s: DStream[(String, Int)]) => { - s.groupByKeyAndWindow(windowDuration, slideDuration) - .map(x => (x._1, x._2.toSet)) - .persist() + s.groupByKeyAndWindow(windowDuration, slideDuration).map(x => (x._1, x._2.toSet)) } testOperation(input, operation, expectedOutput, numBatches, true) } |