From 4a7bde6865cf22af060f20a9619c516b811c80f2 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 6 Sep 2012 19:06:59 -0700 Subject: Fixed bugs and added testcases for naive reduceByKeyAndWindow. --- .../src/main/scala/spark/streaming/DStream.scala | 6 + .../src/main/scala/spark/streaming/Scheduler.scala | 2 +- .../scala/spark/streaming/WindowedDStream.scala | 38 +---- .../scala/spark/streaming/DStreamBasicSuite.scala | 2 +- .../scala/spark/streaming/DStreamWindowSuite.scala | 179 +++++++++++++++------ 5 files changed, 140 insertions(+), 87 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 50b9458fae..3973ca1520 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -256,11 +256,17 @@ extends Logging with Serializable { def union(that: DStream[T]) = new UnifiedDStream(Array(this, that)) + def slice(interval: Interval): Seq[RDD[T]] = { + slice(interval.beginTime, interval.endTime) + } + + // Get all the RDDs between fromTime to toTime (both included) def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = { val rdds = new ArrayBuffer[RDD[T]]() var time = toTime.floor(slideTime) + while (time >= zeroTime && time >= fromTime) { getOrCompute(time) match { case Some(rdd) => rdds += rdd diff --git a/streaming/src/main/scala/spark/streaming/Scheduler.scala b/streaming/src/main/scala/spark/streaming/Scheduler.scala index 12e52bf56c..00136685d5 100644 --- a/streaming/src/main/scala/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/spark/streaming/Scheduler.scala @@ -40,7 +40,7 @@ extends Logging { } def generateRDDs (time: Time) { - println("\n-----------------------------------------------------\n") + logInfo("\n-----------------------------------------------------\n") logInfo("Generating RDDs for time " + time) outputStreams.foreach(outputStream => { outputStream.generateJob(time) match { diff --git a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala index 6c791fcfc1..93c1291691 100644 --- a/streaming/src/main/scala/spark/streaming/WindowedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/WindowedDStream.scala @@ -1,12 +1,8 @@ package spark.streaming -import spark.streaming.StreamingContext._ - import spark.RDD import spark.UnionRDD -import spark.SparkContext._ -import scala.collection.mutable.ArrayBuffer class WindowedDStream[T: ClassManifest]( parent: DStream[T], @@ -22,8 +18,6 @@ class WindowedDStream[T: ClassManifest]( throw new Exception("The slide duration of WindowedDStream (" + _slideTime + ") " + "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")") - val allowPartialWindows = true - override def dependencies = List(parent) def windowTime: Time = _windowTime @@ -31,36 +25,8 @@ class WindowedDStream[T: ClassManifest]( override def slideTime: Time = _slideTime override def compute(validTime: Time): Option[RDD[T]] = { - val parentRDDs = new ArrayBuffer[RDD[T]]() - val windowEndTime = validTime - val windowStartTime = if (allowPartialWindows && windowEndTime - windowTime < parent.zeroTime) { - parent.zeroTime - } else { - windowEndTime - windowTime - } - - logInfo("Window = " + windowStartTime + " - " + windowEndTime) - logInfo("Parent.zeroTime = " + parent.zeroTime) - - if (windowStartTime >= parent.zeroTime) { - // Walk back through time, from the 'windowEndTime' to 'windowStartTime' - // and get all parent RDDs from the parent DStream - var t = windowEndTime - while (t > windowStartTime) { - parent.getOrCompute(t) match { - case Some(rdd) => parentRDDs += rdd - case None => throw new Exception("Could not generate parent RDD for time " + t) - } - t -= parent.slideTime - } - } - - // Do a union of all parent RDDs to generate the new RDD - if (parentRDDs.size > 0) { - Some(new UnionRDD(ssc.sc, parentRDDs)) - } else { - None - } + val currentWindow = Interval(validTime - windowTime + parent.slideTime, validTime) + Some(new UnionRDD(ssc.sc, parent.slice(currentWindow))) } } diff --git a/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala index 2634c9b405..9b953d9dae 100644 --- a/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala +++ b/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala @@ -56,7 +56,7 @@ class DStreamBasicSuite extends DStreamSuiteBase { var newState = 0 if (values != null) newState += values.reduce(_ + _) if (state != null) newState += state.self - println("values = " + values + ", state = " + state + ", " + " new state = " + newState) + //println("values = " + values + ", state = " + state + ", " + " new state = " + newState) new RichInt(newState) } s.map(x => (x, 1)).updateStateByKey[RichInt](updateFunc).map(t => (t._1, t._2.self)) diff --git a/streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala index c0e054418c..061cab2cbb 100644 --- a/streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala +++ b/streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala @@ -4,47 +4,6 @@ import spark.streaming.StreamingContext._ class DStreamWindowSuite extends DStreamSuiteBase { - def testReduceByKeyAndWindow( - name: String, - input: Seq[Seq[(String, Int)]], - expectedOutput: Seq[Seq[(String, Int)]], - windowTime: Time = Seconds(2), - slideTime: Time = Seconds(1) - ) { - test("reduceByKeyAndWindow - " + name) { - testOperation( - input, - (s: DStream[(String, Int)]) => s.reduceByKeyAndWindow(_ + _, _ - _, windowTime, slideTime).persist(), - expectedOutput, - true - ) - } - } - - testReduceByKeyAndWindow( - "basic reduction", - Seq(Seq(("a", 1), ("a", 3)) ), - Seq(Seq(("a", 4)) ) - ) - - testReduceByKeyAndWindow( - "key already in window and new value added into window", - Seq( Seq(("a", 1)), Seq(("a", 1)) ), - Seq( Seq(("a", 1)), Seq(("a", 2)) ) - ) - - testReduceByKeyAndWindow( - "new key added into window", - Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)) ), - Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)) ) - ) - - testReduceByKeyAndWindow( - "key removed from window", - Seq( Seq(("a", 1)), Seq(("a", 1)), Seq(), Seq() ), - Seq( Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 1)), Seq(("a", 0)) ) - ) - val largerSlideInput = Seq( Seq(("a", 1)), // 1st window from here Seq(("a", 2)), @@ -65,14 +24,6 @@ class DStreamWindowSuite extends DStreamSuiteBase { Seq(("a", 6)) ) - testReduceByKeyAndWindow( - "larger slide time", - largerSlideInput, - largerSlideOutput, - Seconds(4), - Seconds(2) - ) - val bigInput = Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)), @@ -89,6 +40,29 @@ class DStreamWindowSuite extends DStreamSuiteBase { ) val bigOutput = Seq( + Seq(("a", 1)), + Seq(("a", 2), ("b", 1)), + Seq(("a", 2), ("b", 2), ("c", 1)), + Seq(("a", 2), ("b", 2), ("c", 1)), + Seq(("a", 2), ("b", 1)), + Seq(("a", 1)), + Seq(("a", 1)), + Seq(("a", 2), ("b", 1)), + Seq(("a", 2), ("b", 2), ("c", 1)), + Seq(("a", 2), ("b", 2), ("c", 1)), + Seq(("a", 2), ("b", 1)), + Seq(("a", 1)) + ) + + /* + The output of the reduceByKeyAndWindow with inverse reduce function is + difference from the naive reduceByKeyAndWindow. Even if the count of a + particular key is 0, the key does not get eliminated from the RDDs of + ReducedWindowedDStream. This causes the number of keys in these RDDs to + increase forever. A more generalized version that allows elimination of + keys should be considered. + */ + val bigOutputInv = Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)), Seq(("a", 2), ("b", 2), ("c", 1)), @@ -103,5 +77,112 @@ class DStreamWindowSuite extends DStreamSuiteBase { Seq(("a", 1), ("b", 0), ("c", 0)) ) + def testReduceByKeyAndWindow( + name: String, + input: Seq[Seq[(String, Int)]], + expectedOutput: Seq[Seq[(String, Int)]], + windowTime: Time = Seconds(2), + slideTime: Time = Seconds(1) + ) { + test("reduceByKeyAndWindow - " + name) { + testOperation( + input, + (s: DStream[(String, Int)]) => s.reduceByKeyAndWindow(_ + _, windowTime, slideTime).persist(), + expectedOutput, + true + ) + } + } + + def testReduceByKeyAndWindowInv( + name: String, + input: Seq[Seq[(String, Int)]], + expectedOutput: Seq[Seq[(String, Int)]], + windowTime: Time = Seconds(2), + slideTime: Time = Seconds(1) + ) { + test("reduceByKeyAndWindowInv - " + name) { + testOperation( + input, + (s: DStream[(String, Int)]) => s.reduceByKeyAndWindow(_ + _, _ - _, windowTime, slideTime).persist(), + expectedOutput, + true + ) + } + } + + + // Testing naive reduceByKeyAndWindow (without invertible function) + + testReduceByKeyAndWindow( + "basic reduction", + Seq(Seq(("a", 1), ("a", 3)) ), + Seq(Seq(("a", 4)) ) + ) + + testReduceByKeyAndWindow( + "key already in window and new value added into window", + Seq( Seq(("a", 1)), Seq(("a", 1)) ), + Seq( Seq(("a", 1)), Seq(("a", 2)) ) + ) + + + testReduceByKeyAndWindow( + "new key added into window", + Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)) ), + Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)) ) + ) + + testReduceByKeyAndWindow( + "key removed from window", + Seq( Seq(("a", 1)), Seq(("a", 1)), Seq(), Seq() ), + Seq( Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 1)), Seq() ) + ) + + testReduceByKeyAndWindow( + "larger slide time", + largerSlideInput, + largerSlideOutput, + Seconds(4), + Seconds(2) + ) + testReduceByKeyAndWindow("big test", bigInput, bigOutput) + + + // Testing reduceByKeyAndWindow (with invertible reduce function) + + testReduceByKeyAndWindowInv( + "basic reduction", + Seq(Seq(("a", 1), ("a", 3)) ), + Seq(Seq(("a", 4)) ) + ) + + testReduceByKeyAndWindowInv( + "key already in window and new value added into window", + Seq( Seq(("a", 1)), Seq(("a", 1)) ), + Seq( Seq(("a", 1)), Seq(("a", 2)) ) + ) + + testReduceByKeyAndWindowInv( + "new key added into window", + Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)) ), + Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)) ) + ) + + testReduceByKeyAndWindowInv( + "key removed from window", + Seq( Seq(("a", 1)), Seq(("a", 1)), Seq(), Seq() ), + Seq( Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 1)), Seq(("a", 0)) ) + ) + + testReduceByKeyAndWindowInv( + "larger slide time", + largerSlideInput, + largerSlideOutput, + Seconds(4), + Seconds(2) + ) + + testReduceByKeyAndWindowInv("big test", bigInput, bigOutputInv) } -- cgit v1.2.3