From 3fb5c9ee24302edf02df130bd0dfd0463cf6c0a4 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 2 Nov 2012 12:12:25 -0700 Subject: Fixed serialization bug in countByWindow, added countByKey and countByKeyAndWindow, and added testcases for them. --- .../src/main/scala/spark/streaming/DStream.scala | 4 +- .../spark/streaming/PairDStreamFunctions.scala | 23 ++- .../scala/spark/streaming/InputStreamsSuite.scala | 18 ++ .../test/scala/spark/streaming/TestSuiteBase.scala | 8 +- .../spark/streaming/WindowOperationsSuite.scala | 181 ++++++++++++++++----- 5 files changed, 186 insertions(+), 48 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 175ebf104f..a4921bb1a2 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -338,9 +338,7 @@ extends Serializable with Logging { } def countByWindow(windowTime: Time, slideTime: Time): DStream[Int] = { - def add(v1: Int, v2: Int) = (v1 + v2) - def subtract(v1: Int, v2: Int) = (v1 - v2) - this.map(_ => 1).reduceByWindow(add _, subtract _, windowTime, slideTime) + this.map(_ => 1).reduceByWindow(_ + _, _ - _, windowTime, slideTime) } def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that)) diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala index f88247708b..e09d27d34f 100644 --- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -53,14 +53,18 @@ extends Serializable { combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner) } - private def combineByKey[C: ClassManifest]( + def combineByKey[C: ClassManifest]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiner: (C, C) => C, - partitioner: Partitioner) : ShuffledDStream[K, V, C] = { + partitioner: Partitioner) : DStream[(K, C)] = { new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner) } + def countByKey(numPartitions: Int = self.ssc.sc.defaultParallelism): DStream[(K, Long)] = { + self.map(x => (x._1, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions) + } + def groupByKeyAndWindow(windowTime: Time, slideTime: Time): DStream[(K, Seq[V])] = { groupByKeyAndWindow(windowTime, slideTime, defaultPartitioner()) } @@ -157,6 +161,21 @@ extends Serializable { self, cleanedReduceFunc, cleanedInvReduceFunc, windowTime, slideTime, partitioner) } + def countByKeyAndWindow( + windowTime: Time, + slideTime: Time, + numPartitions: Int = self.ssc.sc.defaultParallelism + ): DStream[(K, Long)] = { + + self.map(x => (x._1, 1L)).reduceByKeyAndWindow( + (x: Long, y: Long) => x + y, + (x: Long, y: Long) => x - y, + windowTime, + slideTime, + numPartitions + ) + } + // TODO: // // diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 6f6b18a790..c17254b809 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -51,6 +51,15 @@ class InputStreamsSuite extends TestSuiteBase { ssc.stop() // Verify whether data received by Spark Streaming was as expected + logInfo("--------------------------------") + logInfo("output.size = " + outputBuffer.size) + logInfo("output") + outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("expected output.size = " + expectedOutput.size) + logInfo("expected output") + expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("--------------------------------") + assert(outputBuffer.size === expectedOutput.size) for (i <- 0 until outputBuffer.size) { assert(outputBuffer(i).size === 1) @@ -101,6 +110,15 @@ class InputStreamsSuite extends TestSuiteBase { ssc.stop() // Verify whether data received by Spark Streaming was as expected + logInfo("--------------------------------") + logInfo("output.size = " + outputBuffer.size) + logInfo("output") + outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("expected output.size = " + expectedOutput.size) + logInfo("expected output") + expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("--------------------------------") + assert(outputBuffer.size === expectedOutput.size) for (i <- 0 until outputBuffer.size) { assert(outputBuffer(i).size === 1) diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala index c1b7772e7b..c9bc454f91 100644 --- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala @@ -16,13 +16,14 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[ def compute(validTime: Time): Option[RDD[T]] = { logInfo("Computing RDD for time " + validTime) - val rdd = if (currentIndex < input.size) { - ssc.sc.makeRDD(input(currentIndex), numPartitions) + val index = ((validTime - zeroTime) / slideTime - 1).toInt + val rdd = if (index < input.size) { + ssc.sc.makeRDD(input(index), numPartitions) } else { ssc.sc.makeRDD(Seq[T](), numPartitions) } logInfo("Created RDD " + rdd.id) - currentIndex += 1 + //currentIndex += 1 Some(rdd) } } @@ -96,7 +97,6 @@ trait TestSuiteBase extends FunSuite with Logging { ssc } - def runStreams[V: ClassManifest]( ssc: StreamingContext, numBatches: Int, diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala index 90d67844bb..d7d8d5bd36 100644 --- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala @@ -1,6 +1,7 @@ package spark.streaming import spark.streaming.StreamingContext._ +import collection.mutable.ArrayBuffer class WindowOperationsSuite extends TestSuiteBase { @@ -8,6 +9,8 @@ class WindowOperationsSuite extends TestSuiteBase { override def maxWaitTimeMillis() = 20000 + override def batchDuration() = Seconds(1) + val largerSlideInput = Seq( Seq(("a", 1)), Seq(("a", 2)), // 1st window from here @@ -19,7 +22,7 @@ class WindowOperationsSuite extends TestSuiteBase { Seq() // 4th window from here ) - val largerSlideOutput = Seq( + val largerSlideReduceOutput = Seq( Seq(("a", 3)), Seq(("a", 10)), Seq(("a", 18)), @@ -42,7 +45,23 @@ class WindowOperationsSuite extends TestSuiteBase { Seq() ) - val bigOutput = Seq( + val bigGroupByOutput = Seq( + Seq(("a", Seq(1))), + Seq(("a", Seq(1, 1)), ("b", Seq(1))), + Seq(("a", Seq(1, 1)), ("b", Seq(1, 1)), ("c", Seq(1))), + Seq(("a", Seq(1, 1)), ("b", Seq(1, 1)), ("c", Seq(1))), + Seq(("a", Seq(1, 1)), ("b", Seq(1))), + Seq(("a", Seq(1))), + Seq(("a", Seq(1))), + Seq(("a", Seq(1, 1)), ("b", Seq(1))), + Seq(("a", Seq(1, 1)), ("b", Seq(1, 1)), ("c", Seq(1))), + Seq(("a", Seq(1, 1)), ("b", Seq(1, 1)), ("c", Seq(1))), + Seq(("a", Seq(1, 1)), ("b", Seq(1))), + Seq(("a", Seq(1))) + ) + + + val bigReduceOutput = Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)), Seq(("a", 2), ("b", 2), ("c", 1)), @@ -59,13 +78,14 @@ class WindowOperationsSuite extends TestSuiteBase { /* The output of the reduceByKeyAndWindow with inverse reduce function is - difference from the naive reduceByKeyAndWindow. Even if the count of a + different from the naive reduceByKeyAndWindow. Even if the count of a particular key is 0, the key does not get eliminated from the RDDs of ReducedWindowedDStream. This causes the number of keys in these RDDs to increase forever. A more generalized version that allows elimination of keys should be considered. */ - val bigOutputInv = Seq( + + val bigReduceInvOutput = Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)), Seq(("a", 2), ("b", 2), ("c", 1)), @@ -80,38 +100,37 @@ class WindowOperationsSuite extends TestSuiteBase { Seq(("a", 1), ("b", 0), ("c", 0)) ) - def testReduceByKeyAndWindow( - name: String, - input: Seq[Seq[(String, Int)]], - expectedOutput: Seq[Seq[(String, Int)]], - windowTime: Time = batchDuration * 2, - slideTime: Time = batchDuration - ) { - test("reduceByKeyAndWindow - " + name) { - val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt - val operation = (s: DStream[(String, Int)]) => { - s.reduceByKeyAndWindow(_ + _, windowTime, slideTime).persist() - } - testOperation(input, operation, expectedOutput, numBatches, true) - } - } + // Testing window operation - def testReduceByKeyAndWindowInv( - name: String, - input: Seq[Seq[(String, Int)]], - expectedOutput: Seq[Seq[(String, Int)]], - windowTime: Time = batchDuration * 2, - slideTime: Time = batchDuration - ) { - test("reduceByKeyAndWindowInv - " + name) { - val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt - val operation = (s: DStream[(String, Int)]) => { - s.reduceByKeyAndWindow(_ + _, _ - _, windowTime, slideTime).persist() - } - testOperation(input, operation, expectedOutput, numBatches, true) - } - } + testWindow( + "basic window", + Seq( Seq(0), Seq(1), Seq(2), Seq(3), Seq(4), Seq(5)), + Seq( Seq(0), Seq(0, 1), Seq(1, 2), Seq(2, 3), Seq(3, 4), Seq(4, 5)) + ) + testWindow( + "tumbling window", + Seq( Seq(0), Seq(1), Seq(2), Seq(3), Seq(4), Seq(5)), + Seq( Seq(0, 1), Seq(2, 3), Seq(4, 5)), + Seconds(2), + Seconds(2) + ) + + testWindow( + "larger window", + Seq( Seq(0), Seq(1), Seq(2), Seq(3), Seq(4), Seq(5)), + Seq( Seq(0, 1), Seq(0, 1, 2, 3), Seq(2, 3, 4, 5), Seq(4, 5)), + Seconds(4), + Seconds(2) + ) + + testWindow( + "non-overlapping window", + Seq( Seq(0), Seq(1), Seq(2), Seq(3), Seq(4), Seq(5)), + Seq( Seq(1, 2), Seq(4, 5)), + Seconds(2), + Seconds(3) + ) // Testing naive reduceByKeyAndWindow (without invertible function) @@ -142,13 +161,12 @@ class WindowOperationsSuite extends TestSuiteBase { testReduceByKeyAndWindow( "larger slide time", largerSlideInput, - largerSlideOutput, + largerSlideReduceOutput, Seconds(4), Seconds(2) ) - testReduceByKeyAndWindow("big test", bigInput, bigOutput) - + testReduceByKeyAndWindow("big test", bigInput, bigReduceOutput) // Testing reduceByKeyAndWindow (with invertible reduce function) @@ -179,10 +197,95 @@ class WindowOperationsSuite extends TestSuiteBase { testReduceByKeyAndWindowInv( "larger slide time", largerSlideInput, - largerSlideOutput, + largerSlideReduceOutput, Seconds(4), Seconds(2) ) - testReduceByKeyAndWindowInv("big test", bigInput, bigOutputInv) + testReduceByKeyAndWindowInv("big test", bigInput, bigReduceInvOutput) + + test("groupByKeyAndWindow") { + val input = bigInput + val expectedOutput = bigGroupByOutput.map(_.map(x => (x._1, x._2.toSet))) + val windowTime = Seconds(2) + val slideTime = Seconds(1) + val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt + val operation = (s: DStream[(String, Int)]) => { + s.groupByKeyAndWindow(windowTime, slideTime) + .map(x => (x._1, x._2.toSet)) + .persist() + } + testOperation(input, operation, expectedOutput, numBatches, true) + } + + test("countByWindow") { + val input = Seq(Seq(1), Seq(1), Seq(1, 2), Seq(0), Seq(), Seq() ) + val expectedOutput = Seq( Seq(1), Seq(2), Seq(3), Seq(3), Seq(1), Seq(0)) + val windowTime = Seconds(2) + val slideTime = Seconds(1) + val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt + val operation = (s: DStream[Int]) => s.countByWindow(windowTime, slideTime) + testOperation(input, operation, expectedOutput, numBatches, true) + } + + test("countByKeyAndWindow") { + val input = Seq(Seq(("a", 1)), Seq(("b", 1), ("b", 2)), Seq(("a", 10), ("b", 20))) + val expectedOutput = Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 2)), Seq(("a", 1), ("b", 3))) + val windowTime = Seconds(2) + val slideTime = Seconds(1) + val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt + val operation = (s: DStream[(String, Int)]) => { + s.countByKeyAndWindow(windowTime, slideTime).map(x => (x._1, x._2.toInt)) + } + testOperation(input, operation, expectedOutput, numBatches, true) + } + + + // Helper functions + + def testWindow( + name: String, + input: Seq[Seq[Int]], + expectedOutput: Seq[Seq[Int]], + windowTime: Time = Seconds(2), + slideTime: Time = Seconds(1) + ) { + test("window - " + name) { + val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt + val operation = (s: DStream[Int]) => s.window(windowTime, slideTime) + testOperation(input, operation, expectedOutput, numBatches, true) + } + } + + def testReduceByKeyAndWindow( + name: String, + input: Seq[Seq[(String, Int)]], + expectedOutput: Seq[Seq[(String, Int)]], + windowTime: Time = Seconds(2), + slideTime: Time = Seconds(1) + ) { + test("reduceByKeyAndWindow - " + name) { + val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt + val operation = (s: DStream[(String, Int)]) => { + s.reduceByKeyAndWindow(_ + _, windowTime, slideTime).persist() + } + testOperation(input, operation, expectedOutput, numBatches, true) + } + } + + def testReduceByKeyAndWindowInv( + name: String, + input: Seq[Seq[(String, Int)]], + expectedOutput: Seq[Seq[(String, Int)]], + windowTime: Time = Seconds(2), + slideTime: Time = Seconds(1) + ) { + test("reduceByKeyAndWindowInv - " + name) { + val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt + val operation = (s: DStream[(String, Int)]) => { + s.reduceByKeyAndWindow(_ + _, _ - _, windowTime, slideTime).persist() + } + testOperation(input, operation, expectedOutput, numBatches, true) + } + } } -- cgit v1.2.3