diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2012-09-06 05:28:29 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2012-09-06 05:28:29 -0700 |
commit | babb7e3ce2a5eda793f87b42839cc20d14cb94cf (patch) | |
tree | c4362b14b485d4be759cad0d8dd2924dc4f7ecc8 | |
parent | 25fd684b89ac5bdc6675b0a5d5e3caa9fe608d92 (diff) | |
download | spark-babb7e3ce2a5eda793f87b42839cc20d14cb94cf.tar.gz spark-babb7e3ce2a5eda793f87b42839cc20d14cb94cf.tar.bz2 spark-babb7e3ce2a5eda793f87b42839cc20d14cb94cf.zip |
Re-implemented ReducedWindowedDSteam to simplify and fix bugs. Added slice operator to DStream. Also, refactored DStream testsuites and added tests for reduceByKeyAndWindow.
7 files changed, 334 insertions, 276 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 20f1c4db20..50b9458fae 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -143,7 +143,7 @@ extends Logging with Serializable { /** * This method generates a SparkStreaming job for the given time - * and may require to be overriden by subclasses + * and may required to be overriden by subclasses */ def generateJob(time: Time): Option[Job] = { getOrCompute(time) match { @@ -208,7 +208,7 @@ extends Logging with Serializable { new TransformedDStream(this, ssc.sc.clean(transformFunc)) } - def toQueue = { + def toBlockingQueue = { val queue = new ArrayBlockingQueue[RDD[T]](10000) this.foreachRDD(rdd => { queue.add(rdd) @@ -256,6 +256,22 @@ extends Logging with Serializable { def union(that: DStream[T]) = new UnifiedDStream(Array(this, that)) + def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = { + + val rdds = new ArrayBuffer[RDD[T]]() + var time = toTime.floor(slideTime) + + while (time >= zeroTime && time >= fromTime) { + getOrCompute(time) match { + case Some(rdd) => rdds += rdd + case None => throw new Exception("Could not get old reduced RDD for time " + time) + } + time -= slideTime + } + + rdds.toSeq + } + def register() { ssc.registerOutputStream(this) } diff --git a/streaming/src/main/scala/spark/streaming/Interval.scala b/streaming/src/main/scala/spark/streaming/Interval.scala index 87b8437b3d..ffb7725ac9 100644 --- a/streaming/src/main/scala/spark/streaming/Interval.scala +++ b/streaming/src/main/scala/spark/streaming/Interval.scala @@ -9,6 +9,10 @@ case class Interval(beginTime: Time, endTime: Time) { new Interval(beginTime + time, endTime + time) } + def - (time: Time): Interval = { + new Interval(beginTime - time, endTime - time) + } + def < (that: Interval): Boolean = { if (this.duration != that.duration) { throw new Exception("Comparing two intervals with different durations [" + this + ", " + that + "]") diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala index 191d264b2b..b0beaba94d 100644 --- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala @@ -12,7 +12,7 @@ import spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( - parent: DStream[(K, V)], + @transient parent: DStream[(K, V)], reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, _windowTime: Time, @@ -28,9 +28,7 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( throw new Exception("The slide duration of ReducedWindowedDStream (" + _slideTime + ") " + "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")") - val reducedStream = parent.reduceByKey(reduceFunc, partitioner) - val allowPartialWindows = true - //reducedStream.persist(StorageLevel.MEMORY_ONLY_DESER_2) + @transient val reducedStream = parent.reduceByKey(reduceFunc, partitioner) override def dependencies = List(reducedStream) @@ -44,174 +42,95 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( checkpointInterval: Time): DStream[(K,V)] = { super.persist(storageLevel, checkpointLevel, checkpointInterval) reducedStream.persist(storageLevel, checkpointLevel, checkpointInterval) + this } - + override def compute(validTime: Time): Option[RDD[(K, V)]] = { - - // Notation: + val currentTime = validTime + val currentWindow = Interval(currentTime - windowTime + parent.slideTime, currentTime) + val previousWindow = currentWindow - slideTime + + logDebug("Window time = " + windowTime) + logDebug("Slide time = " + slideTime) + logDebug("ZeroTime = " + zeroTime) + logDebug("Current window = " + currentWindow) + logDebug("Previous window = " + previousWindow) + // _____________________________ - // | previous window _________|___________________ - // |___________________| current window | --------------> Time + // | previous window _________|___________________ + // |___________________| current window | --------------> Time // |_____________________________| - // + // // |________ _________| |________ _________| // | | // V V - // old time steps new time steps + // old RDDs new RDDs // - def getAdjustedWindow(endTime: Time, windowTime: Time): Interval = { - val beginTime = - if (allowPartialWindows && endTime - windowTime < parent.zeroTime) { - parent.zeroTime - } else { - endTime - windowTime - } - Interval(beginTime, endTime) - } - - val currentTime = validTime - val currentWindow = getAdjustedWindow(currentTime, windowTime) - val previousWindow = getAdjustedWindow(currentTime - slideTime, windowTime) - - logInfo("Current window = " + currentWindow) - logInfo("Slide time = " + slideTime) - logInfo("Previous window = " + previousWindow) - logInfo("Parent.zeroTime = " + parent.zeroTime) - - if (allowPartialWindows) { - if (currentTime - slideTime <= parent.zeroTime) { - reducedStream.getOrCompute(currentTime) match { - case Some(rdd) => return Some(rdd) - case None => throw new Exception("Could not get first reduced RDD for time " + currentTime) - } - } - } else { - if (previousWindow.beginTime < parent.zeroTime) { - if (currentWindow.beginTime < parent.zeroTime) { - return None - } else { - // If this is the first feasible window, then generate reduced value in the naive manner - val reducedRDDs = new ArrayBuffer[RDD[(K, V)]]() - var t = currentWindow.endTime - while (t > currentWindow.beginTime) { - reducedStream.getOrCompute(t) match { - case Some(rdd) => reducedRDDs += rdd - case None => throw new Exception("Could not get reduced RDD for time " + t) - } - t -= reducedStream.slideTime - } - if (reducedRDDs.size == 0) { - throw new Exception("Could not generate the first RDD for time " + validTime) - } - return Some(new UnionRDD(ssc.sc, reducedRDDs).reduceByKey(partitioner, reduceFunc)) - } - } - } - - // Get the RDD of the reduced value of the previous window - val previousWindowRDD = getOrCompute(previousWindow.endTime) match { - case Some(rdd) => rdd.asInstanceOf[RDD[(_, _)]] - case None => throw new Exception("Could not get previous RDD for time " + previousWindow.endTime) - } - val oldRDDs = new ArrayBuffer[RDD[(_, _)]]() - val newRDDs = new ArrayBuffer[RDD[(_, _)]]() - // Get the RDDs of the reduced values in "old time steps" - var t = currentWindow.beginTime - while (t > previousWindow.beginTime) { - reducedStream.getOrCompute(t) match { - case Some(rdd) => oldRDDs += rdd.asInstanceOf[RDD[(_, _)]] - case None => throw new Exception("Could not get old reduced RDD for time " + t) - } - t -= reducedStream.slideTime - } + val oldRDDs = reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideTime) + logDebug("# old RDDs = " + oldRDDs.size) // Get the RDDs of the reduced values in "new time steps" - t = currentWindow.endTime - while (t > previousWindow.endTime) { - reducedStream.getOrCompute(t) match { - case Some(rdd) => newRDDs += rdd.asInstanceOf[RDD[(_, _)]] - case None => throw new Exception("Could not get new reduced RDD for time " + t) - } - t -= reducedStream.slideTime + val newRDDs = reducedStream.slice(previousWindow.endTime + parent.slideTime, currentWindow.endTime) + logDebug("# new RDDs = " + newRDDs.size) + + // Get the RDD of the reduced value of the previous window + val previousWindowRDD = getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]())) + + // Make the list of RDDs that needs to cogrouped together for reducing their reduced values + val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs + + // Cogroup the reduced RDDs and merge the reduced values + val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(_, _)]]], partitioner) + val mergeValuesFunc = mergeValues(oldRDDs.size, newRDDs.size) _ + val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValuesFunc) + + Some(mergedValuesRDD) + } + + def mergeValues(numOldValues: Int, numNewValues: Int)(seqOfValues: Seq[Seq[V]]): V = { + + if (seqOfValues.size != 1 + numOldValues + numNewValues) { + throw new Exception("Unexpected number of sequences of reduced values") } - val allRDDs = new ArrayBuffer[RDD[(_, _)]]() - allRDDs += previousWindowRDD - allRDDs ++= oldRDDs - allRDDs ++= newRDDs - - - val numOldRDDs = oldRDDs.size - val numNewRDDs = newRDDs.size - logInfo("Generated numOldRDDs = " + numOldRDDs + ", numNewRDDs = " + numNewRDDs) - logInfo("Generating CoGroupedRDD with " + allRDDs.size + " RDDs") - val newRDD = new CoGroupedRDD[K](allRDDs.toSeq, partitioner).asInstanceOf[RDD[(K,Seq[Seq[V]])]].map(x => { - val (key, value) = x - logDebug("value.size = " + value.size + ", numOldRDDs = " + numOldRDDs + ", numNewRDDs = " + numNewRDDs) - if (value.size != 1 + numOldRDDs + numNewRDDs) { - throw new Exception("Number of groups not odd!") - } + // Getting reduced values "old time steps" that will be removed from current window + val oldValues = (1 to numOldValues).map(i => seqOfValues(i)).filter(!_.isEmpty).map(_.head) + + // Getting reduced values "new time steps" + val newValues = (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head) - // old values = reduced values of the "old time steps" that are eliminated from current window - // new values = reduced values of the "new time steps" that are introduced to the current window - // previous value = reduced value of the previous window - - /*val numOldValues = (value.size - 1) / 2*/ - // Getting reduced values "old time steps" - val oldValues = - (0 until numOldRDDs).map(i => value(1 + i)).filter(_.size > 0).map(x => x(0)) - // Getting reduced values "new time steps" - val newValues = - (0 until numNewRDDs).map(i => value(1 + numOldRDDs + i)).filter(_.size > 0).map(x => x(0)) - - // If reduced value for the key does not exist in previous window, it should not exist in "old time steps" - if (value(0).size == 0 && oldValues.size != 0) { - throw new Exception("Unexpected: Key exists in old reduced values but not in previous reduced values") + if (seqOfValues(0).isEmpty) { + + // If previous window's reduce value does not exist, then at least new values should exist + if (newValues.isEmpty) { + throw new Exception("Neither previous window has value for key, nor new values found") } - // For the key, at least one of "old time steps", "new time steps" and previous window should have reduced values - if (value(0).size == 0 && oldValues.size == 0 && newValues.size == 0) { - throw new Exception("Unexpected: Key does not exist in any of old, new, or previour reduced values") + // Reduce the new values + // println("new values = " + newValues.map(_.toString).reduce(_ + " " + _)) + return newValues.reduce(reduceFunc) + } else { + + // Get the previous window's reduced value + var tempValue = seqOfValues(0).head + + // If old values exists, then inverse reduce then from previous value + if (!oldValues.isEmpty) { + // println("old values = " + oldValues.map(_.toString).reduce(_ + " " + _)) + tempValue = invReduceFunc(tempValue, oldValues.reduce(reduceFunc)) } - // Logic to generate the final reduced value for current window: - // - // If previous window did not have reduced value for the key - // Then, return reduced value of "new time steps" as the final value - // Else, reduced value exists in previous window - // If "old" time steps did not have reduced value for the key - // Then, reduce previous window's reduced value with that of "new time steps" for final value - // Else, reduced values exists in "old time steps" - // If "new values" did not have reduced value for the key - // Then, inverse-reduce "old values" from previous window's reduced value for final value - // Else, all 3 values exist, combine all of them together - // - logDebug("# old values = " + oldValues.size + ", # new values = " + newValues) - val finalValue = { - if (value(0).size == 0) { - newValues.reduce(reduceFunc) - } else { - val prevValue = value(0)(0) - logDebug("prev value = " + prevValue) - if (oldValues.size == 0) { - // assuming newValue.size > 0 (all 3 cannot be zero, as checked earlier) - val temp = newValues.reduce(reduceFunc) - reduceFunc(prevValue, temp) - } else if (newValues.size == 0) { - invReduceFunc(prevValue, oldValues.reduce(reduceFunc)) - } else { - val tempValue = invReduceFunc(prevValue, oldValues.reduce(reduceFunc)) - reduceFunc(tempValue, newValues.reduce(reduceFunc)) - } - } + // If new values exists, then reduce them with previous value + if (!newValues.isEmpty) { + // println("new values = " + newValues.map(_.toString).reduce(_ + " " + _)) + tempValue = reduceFunc(tempValue, newValues.reduce(reduceFunc)) } - (key, finalValue) - }) - //newRDD.persist(StorageLevel.MEMORY_ONLY_DESER_2) - Some(newRDD) + // println("final value = " + tempValue) + return tempValue + } } } diff --git a/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala new file mode 100644 index 0000000000..2634c9b405 --- /dev/null +++ b/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala @@ -0,0 +1,67 @@ +package spark.streaming + +import spark.streaming.StreamingContext._ +import scala.runtime.RichInt + +class DStreamBasicSuite extends DStreamSuiteBase { + + test("map-like operations") { + val input = Seq(1 to 4, 5 to 8, 9 to 12) + + // map + testOperation(input, (r: DStream[Int]) => r.map(_.toString), input.map(_.map(_.toString))) + + // flatMap + testOperation( + input, + (r: DStream[Int]) => r.flatMap(x => Seq(x, x * 2)), + input.map(_.flatMap(x => Array(x, x * 2))) + ) + } + + test("shuffle-based operations") { + // reduceByKey + testOperation( + Seq(Seq("a", "a", "b"), Seq("", ""), Seq()), + (s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _), + Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()), + true + ) + + // reduce + testOperation( + Seq(1 to 4, 5 to 8, 9 to 12), + (s: DStream[Int]) => s.reduce(_ + _), + Seq(Seq(10), Seq(26), Seq(42)) + ) + } + + test("stateful operations") { + val inputData = + Seq( + Seq("a", "b", "c"), + Seq("a", "b", "c"), + Seq("a", "b", "c") + ) + + val outputData = + Seq( + Seq(("a", 1), ("b", 1), ("c", 1)), + Seq(("a", 2), ("b", 2), ("c", 2)), + Seq(("a", 3), ("b", 3), ("c", 3)) + ) + + val updateStateOp = (s: DStream[String]) => { + val updateFunc = (values: Seq[Int], state: RichInt) => { + var newState = 0 + if (values != null) newState += values.reduce(_ + _) + if (state != null) newState += state.self + println("values = " + values + ", state = " + state + ", " + " new state = " + newState) + new RichInt(newState) + } + s.map(x => (x, 1)).updateStateByKey[RichInt](updateFunc).map(t => (t._1, t._2.self)) + } + + testOperation(inputData, updateStateOp, outputData, true) + } +} diff --git a/streaming/src/test/scala/spark/streaming/DStreamSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamSuite.scala deleted file mode 100644 index fc00952afe..0000000000 --- a/streaming/src/test/scala/spark/streaming/DStreamSuite.scala +++ /dev/null @@ -1,123 +0,0 @@ -package spark.streaming - -import spark.Logging -import spark.streaming.StreamingContext._ -import spark.streaming.util.ManualClock - -import org.scalatest.FunSuite -import org.scalatest.BeforeAndAfter - -import scala.collection.mutable.ArrayBuffer -import scala.runtime.RichInt - -class DStreamSuite extends FunSuite with BeforeAndAfter with Logging { - - var ssc: StreamingContext = null - val batchDurationMillis = 1000 - - System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") - - def testOp[U: ClassManifest, V: ClassManifest]( - input: Seq[Seq[U]], - operation: DStream[U] => DStream[V], - expectedOutput: Seq[Seq[V]], - useSet: Boolean = false - ) { - try { - ssc = new StreamingContext("local", "test") - ssc.setBatchDuration(Milliseconds(batchDurationMillis)) - - val inputStream = ssc.createQueueStream(input.map(ssc.sc.makeRDD(_, 2)).toIterator) - val outputStream = operation(inputStream) - val outputQueue = outputStream.toQueue - - ssc.start() - val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - clock.addToTime(input.size * batchDurationMillis) - - Thread.sleep(1000) - - val output = new ArrayBuffer[Seq[V]]() - while(outputQueue.size > 0) { - val rdd = outputQueue.take() - output += (rdd.collect()) - } - - assert(output.size === expectedOutput.size) - for (i <- 0 until output.size) { - if (useSet) { - assert(output(i).toSet === expectedOutput(i).toSet) - } else { - assert(output(i).toList === expectedOutput(i).toList) - } - } - } finally { - ssc.stop() - } - } - - test("map-like operations") { - val inputData = Seq(1 to 4, 5 to 8, 9 to 12) - - // map - testOp(inputData, (r: DStream[Int]) => r.map(_.toString), inputData.map(_.map(_.toString))) - - // flatMap - testOp( - inputData, - (r: DStream[Int]) => r.flatMap(x => Seq(x, x * 2)), - inputData.map(_.flatMap(x => Array(x, x * 2))) - ) - } - - test("shuffle-based operations") { - // reduceByKey - testOp( - Seq(Seq("a", "a", "b"), Seq("", ""), Seq()), - (s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _), - Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()), - true - ) - - // reduce - testOp( - Seq(1 to 4, 5 to 8, 9 to 12), - (s: DStream[Int]) => s.reduce(_ + _), - Seq(Seq(10), Seq(26), Seq(42)) - ) - } - - test("window-based operations") { - - } - - - test("stateful operations") { - val inputData = - Seq( - Seq("a", "b", "c"), - Seq("a", "b", "c"), - Seq("a", "b", "c") - ) - - val outputData = - Seq( - Seq(("a", 1), ("b", 1), ("c", 1)), - Seq(("a", 2), ("b", 2), ("c", 2)), - Seq(("a", 3), ("b", 3), ("c", 3)) - ) - - val updateStateOp = (s: DStream[String]) => { - val updateFunc = (values: Seq[Int], state: RichInt) => { - var newState = 0 - if (values != null) newState += values.reduce(_ + _) - if (state != null) newState += state.self - println("values = " + values + ", state = " + state + ", " + " new state = " + newState) - new RichInt(newState) - } - s.map(x => (x, 1)).updateStateByKey[RichInt](updateFunc).map(t => (t._1, t._2.self)) - } - - testOp(inputData, updateStateOp, outputData, true) - } -} diff --git a/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala b/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala new file mode 100644 index 0000000000..1c4ea14b1d --- /dev/null +++ b/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala @@ -0,0 +1,68 @@ +package spark.streaming + +import spark.{RDD, Logging} +import util.ManualClock +import collection.mutable.ArrayBuffer +import org.scalatest.FunSuite +import scala.collection.mutable.Queue + + +trait DStreamSuiteBase extends FunSuite with Logging { + + def batchDuration() = Seconds(1) + + def maxWaitTimeMillis() = 10000 + + def testOperation[U: ClassManifest, V: ClassManifest]( + input: Seq[Seq[U]], + operation: DStream[U] => DStream[V], + expectedOutput: Seq[Seq[V]], + useSet: Boolean = false + ) { + + val manualClock = true + + if (manualClock) { + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + } + + val ssc = new StreamingContext("local", "test") + + try { + ssc.setBatchDuration(Milliseconds(batchDuration)) + + val inputQueue = new Queue[RDD[U]]() + inputQueue ++= input.map(ssc.sc.makeRDD(_, 2)) + val emptyRDD = ssc.sc.makeRDD(Seq[U](), 2) + + val inputStream = ssc.createQueueStream(inputQueue, true, emptyRDD) + val outputStream = operation(inputStream) + + val output = new ArrayBuffer[Seq[V]]() + outputStream.foreachRDD(rdd => output += rdd.collect()) + + ssc.start() + + val clock = ssc.scheduler.clock + if (clock.isInstanceOf[ManualClock]) { + clock.asInstanceOf[ManualClock].addToTime(input.size * batchDuration.milliseconds) + } + + val startTime = System.currentTimeMillis() + while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { + Thread.sleep(500) + } + + assert(output.size === expectedOutput.size) + for (i <- 0 until output.size) { + if (useSet) { + assert(output(i).toSet === expectedOutput(i).toSet) + } else { + assert(output(i).toList === expectedOutput(i).toList) + } + } + } finally { + ssc.stop() + } + } +} diff --git a/streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala new file mode 100644 index 0000000000..c0e054418c --- /dev/null +++ b/streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala @@ -0,0 +1,107 @@ +package spark.streaming + +import spark.streaming.StreamingContext._ + +class DStreamWindowSuite extends DStreamSuiteBase { + + def testReduceByKeyAndWindow( + name: String, + input: Seq[Seq[(String, Int)]], + expectedOutput: Seq[Seq[(String, Int)]], + windowTime: Time = Seconds(2), + slideTime: Time = Seconds(1) + ) { + test("reduceByKeyAndWindow - " + name) { + testOperation( + input, + (s: DStream[(String, Int)]) => s.reduceByKeyAndWindow(_ + _, _ - _, windowTime, slideTime).persist(), + expectedOutput, + true + ) + } + } + + testReduceByKeyAndWindow( + "basic reduction", + Seq(Seq(("a", 1), ("a", 3)) ), + Seq(Seq(("a", 4)) ) + ) + + testReduceByKeyAndWindow( + "key already in window and new value added into window", + Seq( Seq(("a", 1)), Seq(("a", 1)) ), + Seq( Seq(("a", 1)), Seq(("a", 2)) ) + ) + + testReduceByKeyAndWindow( + "new key added into window", + Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)) ), + Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)) ) + ) + + testReduceByKeyAndWindow( + "key removed from window", + Seq( Seq(("a", 1)), Seq(("a", 1)), Seq(), Seq() ), + Seq( Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 1)), Seq(("a", 0)) ) + ) + + val largerSlideInput = Seq( + Seq(("a", 1)), // 1st window from here + Seq(("a", 2)), + Seq(("a", 3)), // 2nd window from here + Seq(("a", 4)), + Seq(("a", 5)), // 3rd window from here + Seq(("a", 6)), + Seq(), // 4th window from here + Seq(), + Seq() // 5th window from here + ) + + val largerSlideOutput = Seq( + Seq(("a", 1)), + Seq(("a", 6)), + Seq(("a", 14)), + Seq(("a", 15)), + Seq(("a", 6)) + ) + + testReduceByKeyAndWindow( + "larger slide time", + largerSlideInput, + largerSlideOutput, + Seconds(4), + Seconds(2) + ) + + val bigInput = Seq( + Seq(("a", 1)), + Seq(("a", 1), ("b", 1)), + Seq(("a", 1), ("b", 1), ("c", 1)), + Seq(("a", 1), ("b", 1)), + Seq(("a", 1)), + Seq(), + Seq(("a", 1)), + Seq(("a", 1), ("b", 1)), + Seq(("a", 1), ("b", 1), ("c", 1)), + Seq(("a", 1), ("b", 1)), + Seq(("a", 1)), + Seq() + ) + + val bigOutput = Seq( + Seq(("a", 1)), + Seq(("a", 2), ("b", 1)), + Seq(("a", 2), ("b", 2), ("c", 1)), + Seq(("a", 2), ("b", 2), ("c", 1)), + Seq(("a", 2), ("b", 1), ("c", 0)), + Seq(("a", 1), ("b", 0), ("c", 0)), + Seq(("a", 1), ("b", 0), ("c", 0)), + Seq(("a", 2), ("b", 1), ("c", 0)), + Seq(("a", 2), ("b", 2), ("c", 1)), + Seq(("a", 2), ("b", 2), ("c", 1)), + Seq(("a", 2), ("b", 1), ("c", 0)), + Seq(("a", 1), ("b", 0), ("c", 0)) + ) + + testReduceByKeyAndWindow("big test", bigInput, bigOutput) +} |