From 020d6434844b22c2fe611303b338eaf53397c9db Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 23 Oct 2012 16:24:05 -0700 Subject: Renamed the streaming testsuites. --- .../spark/streaming/BasicOperationsSuite.scala | 213 ++++++++++++++++++++ .../scala/spark/streaming/CheckpointSuite.scala | 2 +- .../scala/spark/streaming/DStreamBasicSuite.scala | 211 -------------------- .../scala/spark/streaming/DStreamSuiteBase.scala | 216 --------------------- .../scala/spark/streaming/DStreamWindowSuite.scala | 188 ------------------ .../test/scala/spark/streaming/TestSuiteBase.scala | 216 +++++++++++++++++++++ .../spark/streaming/WindowOperationsSuite.scala | 188 ++++++++++++++++++ 7 files changed, 618 insertions(+), 616 deletions(-) create mode 100644 streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala delete mode 100644 streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala delete mode 100644 streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala delete mode 100644 streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala create mode 100644 streaming/src/test/scala/spark/streaming/TestSuiteBase.scala create mode 100644 streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala (limited to 'streaming/src') diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala new file mode 100644 index 0000000000..d0aaac0f2e --- /dev/null +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -0,0 +1,213 @@ +package spark.streaming + +import spark.streaming.StreamingContext._ +import scala.runtime.RichInt +import util.ManualClock + +class BasicOperationsSuite extends TestSuiteBase { + + override def framework() = "BasicOperationsSuite" + + test("map") { + val input = Seq(1 to 4, 5 to 8, 9 to 12) + testOperation( + input, + (r: DStream[Int]) => r.map(_.toString), + input.map(_.map(_.toString)) + ) + } + + test("flatmap") { + val input = Seq(1 to 4, 5 to 8, 9 to 12) + testOperation( + input, + (r: DStream[Int]) => r.flatMap(x => Seq(x, x * 2)), + input.map(_.flatMap(x => Array(x, x * 2))) + ) + } + + test("filter") { + val input = Seq(1 to 4, 5 to 8, 9 to 12) + testOperation( + input, + (r: DStream[Int]) => r.filter(x => (x % 2 == 0)), + input.map(_.filter(x => (x % 2 == 0))) + ) + } + + test("glom") { + assert(numInputPartitions === 2, "Number of input partitions has been changed from 2") + val input = Seq(1 to 4, 5 to 8, 9 to 12) + val output = Seq( + Seq( Seq(1, 2), Seq(3, 4) ), + Seq( Seq(5, 6), Seq(7, 8) ), + Seq( Seq(9, 10), Seq(11, 12) ) + ) + val operation = (r: DStream[Int]) => r.glom().map(_.toSeq) + testOperation(input, operation, output) + } + + test("mapPartitions") { + assert(numInputPartitions === 2, "Number of input partitions has been changed from 2") + val input = Seq(1 to 4, 5 to 8, 9 to 12) + val output = Seq(Seq(3, 7), Seq(11, 15), Seq(19, 23)) + val operation = (r: DStream[Int]) => r.mapPartitions(x => Iterator(x.reduce(_ + _))) + testOperation(input, operation, output, true) + } + + test("groupByKey") { + testOperation( + Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ), + (s: DStream[String]) => s.map(x => (x, 1)).groupByKey(), + Seq( Seq(("a", Seq(1, 1)), ("b", Seq(1))), Seq(("", Seq(1, 1))), Seq() ), + true + ) + } + + test("reduceByKey") { + testOperation( + Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ), + (s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _), + Seq( Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq() ), + true + ) + } + + test("reduce") { + testOperation( + Seq(1 to 4, 5 to 8, 9 to 12), + (s: DStream[Int]) => s.reduce(_ + _), + Seq(Seq(10), Seq(26), Seq(42)) + ) + } + + test("mapValues") { + testOperation( + Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ), + (s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _).mapValues(_ + 10), + Seq( Seq(("a", 12), ("b", 11)), Seq(("", 12)), Seq() ), + true + ) + } + + test("flatMapValues") { + testOperation( + Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ), + (s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _).flatMapValues(x => Seq(x, x + 10)), + Seq( Seq(("a", 2), ("a", 12), ("b", 1), ("b", 11)), Seq(("", 2), ("", 12)), Seq() ), + true + ) + } + + test("cogroup") { + val inputData1 = Seq( Seq("a", "a", "b"), Seq("a", ""), Seq(""), Seq() ) + val inputData2 = Seq( Seq("a", "a", "b"), Seq("b", ""), Seq(), Seq() ) + val outputData = Seq( + Seq( ("a", (Seq(1, 1), Seq("x", "x"))), ("b", (Seq(1), Seq("x"))) ), + Seq( ("a", (Seq(1), Seq())), ("b", (Seq(), Seq("x"))), ("", (Seq(1), Seq("x"))) ), + Seq( ("", (Seq(1), Seq())) ), + Seq( ) + ) + val operation = (s1: DStream[String], s2: DStream[String]) => { + s1.map(x => (x,1)).cogroup(s2.map(x => (x, "x"))) + } + testOperation(inputData1, inputData2, operation, outputData, true) + } + + test("join") { + val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() ) + val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") ) + val outputData = Seq( + Seq( ("a", (1, "x")), ("b", (1, "x")) ), + Seq( ("", (1, "x")) ), + Seq( ), + Seq( ) + ) + val operation = (s1: DStream[String], s2: DStream[String]) => { + s1.map(x => (x,1)).join(s2.map(x => (x,"x"))) + } + testOperation(inputData1, inputData2, operation, outputData, true) + } + + test("updateStateByKey") { + val inputData = + Seq( + Seq("a"), + Seq("a", "b"), + Seq("a", "b", "c"), + Seq("a", "b"), + Seq("a"), + Seq() + ) + + val outputData = + Seq( + Seq(("a", 1)), + Seq(("a", 2), ("b", 1)), + Seq(("a", 3), ("b", 2), ("c", 1)), + Seq(("a", 4), ("b", 3), ("c", 1)), + Seq(("a", 5), ("b", 3), ("c", 1)), + Seq(("a", 5), ("b", 3), ("c", 1)) + ) + + val updateStateOperation = (s: DStream[String]) => { + val updateFunc = (values: Seq[Int], state: Option[RichInt]) => { + Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0))) + } + s.map(x => (x, 1)).updateStateByKey[RichInt](updateFunc).map(t => (t._1, t._2.self)) + } + + testOperation(inputData, updateStateOperation, outputData, true) + } + + test("forgetting of RDDs - map and window operations") { + assert(batchDuration === Seconds(1), "Batch duration has changed from 1 second") + + val input = (0 until 10).map(x => Seq(x, x + 1)).toSeq + val rememberDuration = Seconds(3) + + assert(input.size === 10, "Number of inputs have changed") + + def operation(s: DStream[Int]): DStream[(Int, Int)] = { + s.map(x => (x % 10, 1)) + .window(Seconds(2), Seconds(1)) + .window(Seconds(4), Seconds(2)) + } + + val ssc = setupStreams(input, operation _) + ssc.setRememberDuration(rememberDuration) + runStreams[(Int, Int)](ssc, input.size, input.size / 2) + + val windowedStream2 = ssc.graph.getOutputStreams().head.dependencies.head + val windowedStream1 = windowedStream2.dependencies.head + val mappedStream = windowedStream1.dependencies.head + + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + assert(clock.time === Seconds(10).milliseconds) + + // IDEALLY + // WindowedStream2 should remember till 7 seconds: 10, 8, + // WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5 + // MappedStream should remember till 7 seconds: 10, 9, 8, 7, 6, 5, 4, 3, + + // IN THIS TEST + // WindowedStream2 should remember till 7 seconds: 10, 8, + // WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5, 4 + // MappedStream should remember till 7 seconds: 10, 9, 8, 7, 6, 5, 4, 3, 2 + + // WindowedStream2 + assert(windowedStream2.generatedRDDs.contains(Seconds(10))) + assert(windowedStream2.generatedRDDs.contains(Seconds(8))) + assert(!windowedStream2.generatedRDDs.contains(Seconds(6))) + + // WindowedStream1 + assert(windowedStream1.generatedRDDs.contains(Seconds(10))) + assert(windowedStream1.generatedRDDs.contains(Seconds(4))) + assert(!windowedStream1.generatedRDDs.contains(Seconds(3))) + + // MappedStream + assert(mappedStream.generatedRDDs.contains(Seconds(10))) + assert(mappedStream.generatedRDDs.contains(Seconds(2))) + assert(!mappedStream.generatedRDDs.contains(Seconds(1))) + } +} diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index 061b331a16..6dcedcf463 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -3,7 +3,7 @@ package spark.streaming import spark.streaming.StreamingContext._ import java.io.File -class CheckpointSuite extends DStreamSuiteBase { +class CheckpointSuite extends TestSuiteBase { override def framework() = "CheckpointSuite" diff --git a/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala deleted file mode 100644 index 290a216797..0000000000 --- a/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala +++ /dev/null @@ -1,211 +0,0 @@ -package spark.streaming - -import spark.streaming.StreamingContext._ -import scala.runtime.RichInt -import util.ManualClock - -class DStreamBasicSuite extends DStreamSuiteBase { - - test("map") { - val input = Seq(1 to 4, 5 to 8, 9 to 12) - testOperation( - input, - (r: DStream[Int]) => r.map(_.toString), - input.map(_.map(_.toString)) - ) - } - - test("flatmap") { - val input = Seq(1 to 4, 5 to 8, 9 to 12) - testOperation( - input, - (r: DStream[Int]) => r.flatMap(x => Seq(x, x * 2)), - input.map(_.flatMap(x => Array(x, x * 2))) - ) - } - - test("filter") { - val input = Seq(1 to 4, 5 to 8, 9 to 12) - testOperation( - input, - (r: DStream[Int]) => r.filter(x => (x % 2 == 0)), - input.map(_.filter(x => (x % 2 == 0))) - ) - } - - test("glom") { - assert(numInputPartitions === 2, "Number of input partitions has been changed from 2") - val input = Seq(1 to 4, 5 to 8, 9 to 12) - val output = Seq( - Seq( Seq(1, 2), Seq(3, 4) ), - Seq( Seq(5, 6), Seq(7, 8) ), - Seq( Seq(9, 10), Seq(11, 12) ) - ) - val operation = (r: DStream[Int]) => r.glom().map(_.toSeq) - testOperation(input, operation, output) - } - - test("mapPartitions") { - assert(numInputPartitions === 2, "Number of input partitions has been changed from 2") - val input = Seq(1 to 4, 5 to 8, 9 to 12) - val output = Seq(Seq(3, 7), Seq(11, 15), Seq(19, 23)) - val operation = (r: DStream[Int]) => r.mapPartitions(x => Iterator(x.reduce(_ + _))) - testOperation(input, operation, output, true) - } - - test("groupByKey") { - testOperation( - Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ), - (s: DStream[String]) => s.map(x => (x, 1)).groupByKey(), - Seq( Seq(("a", Seq(1, 1)), ("b", Seq(1))), Seq(("", Seq(1, 1))), Seq() ), - true - ) - } - - test("reduceByKey") { - testOperation( - Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ), - (s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _), - Seq( Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq() ), - true - ) - } - - test("reduce") { - testOperation( - Seq(1 to 4, 5 to 8, 9 to 12), - (s: DStream[Int]) => s.reduce(_ + _), - Seq(Seq(10), Seq(26), Seq(42)) - ) - } - - test("mapValues") { - testOperation( - Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ), - (s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _).mapValues(_ + 10), - Seq( Seq(("a", 12), ("b", 11)), Seq(("", 12)), Seq() ), - true - ) - } - - test("flatMapValues") { - testOperation( - Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ), - (s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _).flatMapValues(x => Seq(x, x + 10)), - Seq( Seq(("a", 2), ("a", 12), ("b", 1), ("b", 11)), Seq(("", 2), ("", 12)), Seq() ), - true - ) - } - - test("cogroup") { - val inputData1 = Seq( Seq("a", "a", "b"), Seq("a", ""), Seq(""), Seq() ) - val inputData2 = Seq( Seq("a", "a", "b"), Seq("b", ""), Seq(), Seq() ) - val outputData = Seq( - Seq( ("a", (Seq(1, 1), Seq("x", "x"))), ("b", (Seq(1), Seq("x"))) ), - Seq( ("a", (Seq(1), Seq())), ("b", (Seq(), Seq("x"))), ("", (Seq(1), Seq("x"))) ), - Seq( ("", (Seq(1), Seq())) ), - Seq( ) - ) - val operation = (s1: DStream[String], s2: DStream[String]) => { - s1.map(x => (x,1)).cogroup(s2.map(x => (x, "x"))) - } - testOperation(inputData1, inputData2, operation, outputData, true) - } - - test("join") { - val inputData1 = Seq( Seq("a", "b"), Seq("a", ""), Seq(""), Seq() ) - val inputData2 = Seq( Seq("a", "b"), Seq("b", ""), Seq(), Seq("") ) - val outputData = Seq( - Seq( ("a", (1, "x")), ("b", (1, "x")) ), - Seq( ("", (1, "x")) ), - Seq( ), - Seq( ) - ) - val operation = (s1: DStream[String], s2: DStream[String]) => { - s1.map(x => (x,1)).join(s2.map(x => (x,"x"))) - } - testOperation(inputData1, inputData2, operation, outputData, true) - } - - test("updateStateByKey") { - val inputData = - Seq( - Seq("a"), - Seq("a", "b"), - Seq("a", "b", "c"), - Seq("a", "b"), - Seq("a"), - Seq() - ) - - val outputData = - Seq( - Seq(("a", 1)), - Seq(("a", 2), ("b", 1)), - Seq(("a", 3), ("b", 2), ("c", 1)), - Seq(("a", 4), ("b", 3), ("c", 1)), - Seq(("a", 5), ("b", 3), ("c", 1)), - Seq(("a", 5), ("b", 3), ("c", 1)) - ) - - val updateStateOperation = (s: DStream[String]) => { - val updateFunc = (values: Seq[Int], state: Option[RichInt]) => { - Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0))) - } - s.map(x => (x, 1)).updateStateByKey[RichInt](updateFunc).map(t => (t._1, t._2.self)) - } - - testOperation(inputData, updateStateOperation, outputData, true) - } - - test("forgetting of RDDs - map and window operations") { - assert(batchDuration === Seconds(1), "Batch duration has changed from 1 second") - - val input = (0 until 10).map(x => Seq(x, x + 1)).toSeq - val rememberDuration = Seconds(3) - - assert(input.size === 10, "Number of inputs have changed") - - def operation(s: DStream[Int]): DStream[(Int, Int)] = { - s.map(x => (x % 10, 1)) - .window(Seconds(2), Seconds(1)) - .window(Seconds(4), Seconds(2)) - } - - val ssc = setupStreams(input, operation _) - ssc.setRememberDuration(rememberDuration) - runStreams[(Int, Int)](ssc, input.size, input.size / 2) - - val windowedStream2 = ssc.graph.getOutputStreams().head.dependencies.head - val windowedStream1 = windowedStream2.dependencies.head - val mappedStream = windowedStream1.dependencies.head - - val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - assert(clock.time === Seconds(10).milliseconds) - - // IDEALLY - // WindowedStream2 should remember till 7 seconds: 10, 8, - // WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5 - // MappedStream should remember till 7 seconds: 10, 9, 8, 7, 6, 5, 4, 3, - - // IN THIS TEST - // WindowedStream2 should remember till 7 seconds: 10, 8, - // WindowedStream1 should remember till 4 seconds: 10, 9, 8, 7, 6, 5, 4 - // MappedStream should remember till 7 seconds: 10, 9, 8, 7, 6, 5, 4, 3, 2 - - // WindowedStream2 - assert(windowedStream2.generatedRDDs.contains(Seconds(10))) - assert(windowedStream2.generatedRDDs.contains(Seconds(8))) - assert(!windowedStream2.generatedRDDs.contains(Seconds(6))) - - // WindowedStream1 - assert(windowedStream1.generatedRDDs.contains(Seconds(10))) - assert(windowedStream1.generatedRDDs.contains(Seconds(4))) - assert(!windowedStream1.generatedRDDs.contains(Seconds(3))) - - // MappedStream - assert(mappedStream.generatedRDDs.contains(Seconds(10))) - assert(mappedStream.generatedRDDs.contains(Seconds(2))) - assert(!mappedStream.generatedRDDs.contains(Seconds(1))) - } -} diff --git a/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala b/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala deleted file mode 100644 index 2a4b37c965..0000000000 --- a/streaming/src/test/scala/spark/streaming/DStreamSuiteBase.scala +++ /dev/null @@ -1,216 +0,0 @@ -package spark.streaming - -import spark.{RDD, Logging} -import util.ManualClock -import collection.mutable.ArrayBuffer -import org.scalatest.FunSuite -import collection.mutable.SynchronizedBuffer - -class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int) - extends InputDStream[T](ssc_) { - var currentIndex = 0 - - def start() {} - - def stop() {} - - def compute(validTime: Time): Option[RDD[T]] = { - logInfo("Computing RDD for time " + validTime) - val rdd = if (currentIndex < input.size) { - ssc.sc.makeRDD(input(currentIndex), numPartitions) - } else { - ssc.sc.makeRDD(Seq[T](), numPartitions) - } - logInfo("Created RDD " + rdd.id) - currentIndex += 1 - Some(rdd) - } -} - -class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[T]]) - extends PerRDDForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { - val collected = rdd.collect() - output += collected - }) - -trait DStreamSuiteBase extends FunSuite with Logging { - - System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") - - def framework() = "DStreamSuiteBase" - - def master() = "local[2]" - - def batchDuration() = Seconds(1) - - def checkpointFile() = null.asInstanceOf[String] - - def checkpointInterval() = batchDuration - - def numInputPartitions() = 2 - - def maxWaitTimeMillis() = 10000 - - def setupStreams[U: ClassManifest, V: ClassManifest]( - input: Seq[Seq[U]], - operation: DStream[U] => DStream[V] - ): StreamingContext = { - - // Create StreamingContext - val ssc = new StreamingContext(master, framework) - ssc.setBatchDuration(batchDuration) - if (checkpointFile != null) { - ssc.setCheckpointDetails(checkpointFile, checkpointInterval()) - } - - // Setup the stream computation - val inputStream = new TestInputStream(ssc, input, numInputPartitions) - val operatedStream = operation(inputStream) - val outputStream = new TestOutputStream(operatedStream, new ArrayBuffer[Seq[V]] with SynchronizedBuffer[Seq[V]]) - ssc.registerInputStream(inputStream) - ssc.registerOutputStream(outputStream) - ssc - } - - def setupStreams[U: ClassManifest, V: ClassManifest, W: ClassManifest]( - input1: Seq[Seq[U]], - input2: Seq[Seq[V]], - operation: (DStream[U], DStream[V]) => DStream[W] - ): StreamingContext = { - - // Create StreamingContext - val ssc = new StreamingContext(master, framework) - ssc.setBatchDuration(batchDuration) - if (checkpointFile != null) { - ssc.setCheckpointDetails(checkpointFile, checkpointInterval()) - } - - // Setup the stream computation - val inputStream1 = new TestInputStream(ssc, input1, numInputPartitions) - val inputStream2 = new TestInputStream(ssc, input2, numInputPartitions) - val operatedStream = operation(inputStream1, inputStream2) - val outputStream = new TestOutputStream(operatedStream, new ArrayBuffer[Seq[W]] with SynchronizedBuffer[Seq[W]]) - ssc.registerInputStream(inputStream1) - ssc.registerInputStream(inputStream2) - ssc.registerOutputStream(outputStream) - ssc - } - - - def runStreams[V: ClassManifest]( - ssc: StreamingContext, - numBatches: Int, - numExpectedOutput: Int - ): Seq[Seq[V]] = { - - assert(numBatches > 0, "Number of batches to run stream computation is zero") - assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero") - logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput) - - // Get the output buffer - val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]] - val output = outputStream.output - - try { - // Start computation - ssc.start() - - // Advance manual clock - val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - logInfo("Manual clock before advancing = " + clock.time) - clock.addToTime(numBatches * batchDuration.milliseconds) - logInfo("Manual clock after advancing = " + clock.time) - - // Wait until expected number of output items have been generated - val startTime = System.currentTimeMillis() - while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { - logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput) - Thread.sleep(100) - } - val timeTaken = System.currentTimeMillis() - startTime - - assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") - assert(output.size === numExpectedOutput, "Unexpected number of outputs generated") - - Thread.sleep(500) // Give some time for the forgetting old RDDs to complete - } catch { - case e: Exception => e.printStackTrace(); throw e; - } finally { - ssc.stop() - } - - output - } - - def verifyOutput[V: ClassManifest]( - output: Seq[Seq[V]], - expectedOutput: Seq[Seq[V]], - useSet: Boolean - ) { - logInfo("--------------------------------") - logInfo("output.size = " + output.size) - logInfo("output") - output.foreach(x => logInfo("[" + x.mkString(",") + "]")) - logInfo("expected output.size = " + expectedOutput.size) - logInfo("expected output") - expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) - logInfo("--------------------------------") - - // Match the output with the expected output - assert(output.size === expectedOutput.size, "Number of outputs do not match") - for (i <- 0 until output.size) { - if (useSet) { - assert(output(i).toSet === expectedOutput(i).toSet) - } else { - assert(output(i).toList === expectedOutput(i).toList) - } - } - logInfo("Output verified successfully") - } - - def testOperation[U: ClassManifest, V: ClassManifest]( - input: Seq[Seq[U]], - operation: DStream[U] => DStream[V], - expectedOutput: Seq[Seq[V]], - useSet: Boolean = false - ) { - testOperation[U, V](input, operation, expectedOutput, -1, useSet) - } - - def testOperation[U: ClassManifest, V: ClassManifest]( - input: Seq[Seq[U]], - operation: DStream[U] => DStream[V], - expectedOutput: Seq[Seq[V]], - numBatches: Int, - useSet: Boolean - ) { - val numBatches_ = if (numBatches > 0) numBatches else expectedOutput.size - val ssc = setupStreams[U, V](input, operation) - val output = runStreams[V](ssc, numBatches_, expectedOutput.size) - verifyOutput[V](output, expectedOutput, useSet) - } - - def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest]( - input1: Seq[Seq[U]], - input2: Seq[Seq[V]], - operation: (DStream[U], DStream[V]) => DStream[W], - expectedOutput: Seq[Seq[W]], - useSet: Boolean - ) { - testOperation[U, V, W](input1, input2, operation, expectedOutput, -1, useSet) - } - - def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest]( - input1: Seq[Seq[U]], - input2: Seq[Seq[V]], - operation: (DStream[U], DStream[V]) => DStream[W], - expectedOutput: Seq[Seq[W]], - numBatches: Int, - useSet: Boolean - ) { - val numBatches_ = if (numBatches > 0) numBatches else expectedOutput.size - val ssc = setupStreams[U, V, W](input1, input2, operation) - val output = runStreams[W](ssc, numBatches_, expectedOutput.size) - verifyOutput[W](output, expectedOutput, useSet) - } -} diff --git a/streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala b/streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala deleted file mode 100644 index cfcab6298d..0000000000 --- a/streaming/src/test/scala/spark/streaming/DStreamWindowSuite.scala +++ /dev/null @@ -1,188 +0,0 @@ -package spark.streaming - -import spark.streaming.StreamingContext._ - -class DStreamWindowSuite extends DStreamSuiteBase { - - override def framework() = "DStreamWindowSuite" - - override def maxWaitTimeMillis() = 20000 - - val largerSlideInput = Seq( - Seq(("a", 1)), - Seq(("a", 2)), // 1st window from here - Seq(("a", 3)), - Seq(("a", 4)), // 2nd window from here - Seq(("a", 5)), - Seq(("a", 6)), // 3rd window from here - Seq(), - Seq() // 4th window from here - ) - - val largerSlideOutput = Seq( - Seq(("a", 3)), - Seq(("a", 10)), - Seq(("a", 18)), - Seq(("a", 11)) - ) - - - val bigInput = Seq( - Seq(("a", 1)), - Seq(("a", 1), ("b", 1)), - Seq(("a", 1), ("b", 1), ("c", 1)), - Seq(("a", 1), ("b", 1)), - Seq(("a", 1)), - Seq(), - Seq(("a", 1)), - Seq(("a", 1), ("b", 1)), - Seq(("a", 1), ("b", 1), ("c", 1)), - Seq(("a", 1), ("b", 1)), - Seq(("a", 1)), - Seq() - ) - - val bigOutput = Seq( - Seq(("a", 1)), - Seq(("a", 2), ("b", 1)), - Seq(("a", 2), ("b", 2), ("c", 1)), - Seq(("a", 2), ("b", 2), ("c", 1)), - Seq(("a", 2), ("b", 1)), - Seq(("a", 1)), - Seq(("a", 1)), - Seq(("a", 2), ("b", 1)), - Seq(("a", 2), ("b", 2), ("c", 1)), - Seq(("a", 2), ("b", 2), ("c", 1)), - Seq(("a", 2), ("b", 1)), - Seq(("a", 1)) - ) - - /* - The output of the reduceByKeyAndWindow with inverse reduce function is - difference from the naive reduceByKeyAndWindow. Even if the count of a - particular key is 0, the key does not get eliminated from the RDDs of - ReducedWindowedDStream. This causes the number of keys in these RDDs to - increase forever. A more generalized version that allows elimination of - keys should be considered. - */ - val bigOutputInv = Seq( - Seq(("a", 1)), - Seq(("a", 2), ("b", 1)), - Seq(("a", 2), ("b", 2), ("c", 1)), - Seq(("a", 2), ("b", 2), ("c", 1)), - Seq(("a", 2), ("b", 1), ("c", 0)), - Seq(("a", 1), ("b", 0), ("c", 0)), - Seq(("a", 1), ("b", 0), ("c", 0)), - Seq(("a", 2), ("b", 1), ("c", 0)), - Seq(("a", 2), ("b", 2), ("c", 1)), - Seq(("a", 2), ("b", 2), ("c", 1)), - Seq(("a", 2), ("b", 1), ("c", 0)), - Seq(("a", 1), ("b", 0), ("c", 0)) - ) - - def testReduceByKeyAndWindow( - name: String, - input: Seq[Seq[(String, Int)]], - expectedOutput: Seq[Seq[(String, Int)]], - windowTime: Time = batchDuration * 2, - slideTime: Time = batchDuration - ) { - test("reduceByKeyAndWindow - " + name) { - val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt - val operation = (s: DStream[(String, Int)]) => { - s.reduceByKeyAndWindow(_ + _, windowTime, slideTime).persist() - } - testOperation(input, operation, expectedOutput, numBatches, true) - } - } - - def testReduceByKeyAndWindowInv( - name: String, - input: Seq[Seq[(String, Int)]], - expectedOutput: Seq[Seq[(String, Int)]], - windowTime: Time = batchDuration * 2, - slideTime: Time = batchDuration - ) { - test("reduceByKeyAndWindowInv - " + name) { - val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt - val operation = (s: DStream[(String, Int)]) => { - s.reduceByKeyAndWindow(_ + _, _ - _, windowTime, slideTime).persist() - } - testOperation(input, operation, expectedOutput, numBatches, true) - } - } - - - // Testing naive reduceByKeyAndWindow (without invertible function) - - testReduceByKeyAndWindow( - "basic reduction", - Seq( Seq(("a", 1), ("a", 3)) ), - Seq( Seq(("a", 4)) ) - ) - - testReduceByKeyAndWindow( - "key already in window and new value added into window", - Seq( Seq(("a", 1)), Seq(("a", 1)) ), - Seq( Seq(("a", 1)), Seq(("a", 2)) ) - ) - - testReduceByKeyAndWindow( - "new key added into window", - Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)) ), - Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)) ) - ) - - testReduceByKeyAndWindow( - "key removed from window", - Seq( Seq(("a", 1)), Seq(("a", 1)), Seq(), Seq() ), - Seq( Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 1)), Seq() ) - ) - - testReduceByKeyAndWindow( - "larger slide time", - largerSlideInput, - largerSlideOutput, - Seconds(4), - Seconds(2) - ) - - testReduceByKeyAndWindow("big test", bigInput, bigOutput) - - - // Testing reduceByKeyAndWindow (with invertible reduce function) - - testReduceByKeyAndWindowInv( - "basic reduction", - Seq(Seq(("a", 1), ("a", 3)) ), - Seq(Seq(("a", 4)) ) - ) - - testReduceByKeyAndWindowInv( - "key already in window and new value added into window", - Seq( Seq(("a", 1)), Seq(("a", 1)) ), - Seq( Seq(("a", 1)), Seq(("a", 2)) ) - ) - - testReduceByKeyAndWindowInv( - "new key added into window", - Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)) ), - Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)) ) - ) - - testReduceByKeyAndWindowInv( - "key removed from window", - Seq( Seq(("a", 1)), Seq(("a", 1)), Seq(), Seq() ), - Seq( Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 1)), Seq(("a", 0)) ) - ) - - testReduceByKeyAndWindowInv( - "larger slide time", - largerSlideInput, - largerSlideOutput, - Seconds(4), - Seconds(2) - ) - - testReduceByKeyAndWindowInv("big test", bigInput, bigOutputInv) -} diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala new file mode 100644 index 0000000000..c1b7772e7b --- /dev/null +++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala @@ -0,0 +1,216 @@ +package spark.streaming + +import spark.{RDD, Logging} +import util.ManualClock +import collection.mutable.ArrayBuffer +import org.scalatest.FunSuite +import collection.mutable.SynchronizedBuffer + +class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int) + extends InputDStream[T](ssc_) { + var currentIndex = 0 + + def start() {} + + def stop() {} + + def compute(validTime: Time): Option[RDD[T]] = { + logInfo("Computing RDD for time " + validTime) + val rdd = if (currentIndex < input.size) { + ssc.sc.makeRDD(input(currentIndex), numPartitions) + } else { + ssc.sc.makeRDD(Seq[T](), numPartitions) + } + logInfo("Created RDD " + rdd.id) + currentIndex += 1 + Some(rdd) + } +} + +class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[T]]) + extends PerRDDForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { + val collected = rdd.collect() + output += collected + }) + +trait TestSuiteBase extends FunSuite with Logging { + + System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + + def framework() = "TestSuiteBase" + + def master() = "local[2]" + + def batchDuration() = Seconds(1) + + def checkpointFile() = null.asInstanceOf[String] + + def checkpointInterval() = batchDuration + + def numInputPartitions() = 2 + + def maxWaitTimeMillis() = 10000 + + def setupStreams[U: ClassManifest, V: ClassManifest]( + input: Seq[Seq[U]], + operation: DStream[U] => DStream[V] + ): StreamingContext = { + + // Create StreamingContext + val ssc = new StreamingContext(master, framework) + ssc.setBatchDuration(batchDuration) + if (checkpointFile != null) { + ssc.setCheckpointDetails(checkpointFile, checkpointInterval()) + } + + // Setup the stream computation + val inputStream = new TestInputStream(ssc, input, numInputPartitions) + val operatedStream = operation(inputStream) + val outputStream = new TestOutputStream(operatedStream, new ArrayBuffer[Seq[V]] with SynchronizedBuffer[Seq[V]]) + ssc.registerInputStream(inputStream) + ssc.registerOutputStream(outputStream) + ssc + } + + def setupStreams[U: ClassManifest, V: ClassManifest, W: ClassManifest]( + input1: Seq[Seq[U]], + input2: Seq[Seq[V]], + operation: (DStream[U], DStream[V]) => DStream[W] + ): StreamingContext = { + + // Create StreamingContext + val ssc = new StreamingContext(master, framework) + ssc.setBatchDuration(batchDuration) + if (checkpointFile != null) { + ssc.setCheckpointDetails(checkpointFile, checkpointInterval()) + } + + // Setup the stream computation + val inputStream1 = new TestInputStream(ssc, input1, numInputPartitions) + val inputStream2 = new TestInputStream(ssc, input2, numInputPartitions) + val operatedStream = operation(inputStream1, inputStream2) + val outputStream = new TestOutputStream(operatedStream, new ArrayBuffer[Seq[W]] with SynchronizedBuffer[Seq[W]]) + ssc.registerInputStream(inputStream1) + ssc.registerInputStream(inputStream2) + ssc.registerOutputStream(outputStream) + ssc + } + + + def runStreams[V: ClassManifest]( + ssc: StreamingContext, + numBatches: Int, + numExpectedOutput: Int + ): Seq[Seq[V]] = { + + assert(numBatches > 0, "Number of batches to run stream computation is zero") + assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero") + logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput) + + // Get the output buffer + val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]] + val output = outputStream.output + + try { + // Start computation + ssc.start() + + // Advance manual clock + val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + logInfo("Manual clock before advancing = " + clock.time) + clock.addToTime(numBatches * batchDuration.milliseconds) + logInfo("Manual clock after advancing = " + clock.time) + + // Wait until expected number of output items have been generated + val startTime = System.currentTimeMillis() + while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { + logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput) + Thread.sleep(100) + } + val timeTaken = System.currentTimeMillis() - startTime + + assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") + assert(output.size === numExpectedOutput, "Unexpected number of outputs generated") + + Thread.sleep(500) // Give some time for the forgetting old RDDs to complete + } catch { + case e: Exception => e.printStackTrace(); throw e; + } finally { + ssc.stop() + } + + output + } + + def verifyOutput[V: ClassManifest]( + output: Seq[Seq[V]], + expectedOutput: Seq[Seq[V]], + useSet: Boolean + ) { + logInfo("--------------------------------") + logInfo("output.size = " + output.size) + logInfo("output") + output.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("expected output.size = " + expectedOutput.size) + logInfo("expected output") + expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) + logInfo("--------------------------------") + + // Match the output with the expected output + assert(output.size === expectedOutput.size, "Number of outputs do not match") + for (i <- 0 until output.size) { + if (useSet) { + assert(output(i).toSet === expectedOutput(i).toSet) + } else { + assert(output(i).toList === expectedOutput(i).toList) + } + } + logInfo("Output verified successfully") + } + + def testOperation[U: ClassManifest, V: ClassManifest]( + input: Seq[Seq[U]], + operation: DStream[U] => DStream[V], + expectedOutput: Seq[Seq[V]], + useSet: Boolean = false + ) { + testOperation[U, V](input, operation, expectedOutput, -1, useSet) + } + + def testOperation[U: ClassManifest, V: ClassManifest]( + input: Seq[Seq[U]], + operation: DStream[U] => DStream[V], + expectedOutput: Seq[Seq[V]], + numBatches: Int, + useSet: Boolean + ) { + val numBatches_ = if (numBatches > 0) numBatches else expectedOutput.size + val ssc = setupStreams[U, V](input, operation) + val output = runStreams[V](ssc, numBatches_, expectedOutput.size) + verifyOutput[V](output, expectedOutput, useSet) + } + + def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest]( + input1: Seq[Seq[U]], + input2: Seq[Seq[V]], + operation: (DStream[U], DStream[V]) => DStream[W], + expectedOutput: Seq[Seq[W]], + useSet: Boolean + ) { + testOperation[U, V, W](input1, input2, operation, expectedOutput, -1, useSet) + } + + def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest]( + input1: Seq[Seq[U]], + input2: Seq[Seq[V]], + operation: (DStream[U], DStream[V]) => DStream[W], + expectedOutput: Seq[Seq[W]], + numBatches: Int, + useSet: Boolean + ) { + val numBatches_ = if (numBatches > 0) numBatches else expectedOutput.size + val ssc = setupStreams[U, V, W](input1, input2, operation) + val output = runStreams[W](ssc, numBatches_, expectedOutput.size) + verifyOutput[W](output, expectedOutput, useSet) + } +} diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala new file mode 100644 index 0000000000..90d67844bb --- /dev/null +++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala @@ -0,0 +1,188 @@ +package spark.streaming + +import spark.streaming.StreamingContext._ + +class WindowOperationsSuite extends TestSuiteBase { + + override def framework() = "WindowOperationsSuite" + + override def maxWaitTimeMillis() = 20000 + + val largerSlideInput = Seq( + Seq(("a", 1)), + Seq(("a", 2)), // 1st window from here + Seq(("a", 3)), + Seq(("a", 4)), // 2nd window from here + Seq(("a", 5)), + Seq(("a", 6)), // 3rd window from here + Seq(), + Seq() // 4th window from here + ) + + val largerSlideOutput = Seq( + Seq(("a", 3)), + Seq(("a", 10)), + Seq(("a", 18)), + Seq(("a", 11)) + ) + + + val bigInput = Seq( + Seq(("a", 1)), + Seq(("a", 1), ("b", 1)), + Seq(("a", 1), ("b", 1), ("c", 1)), + Seq(("a", 1), ("b", 1)), + Seq(("a", 1)), + Seq(), + Seq(("a", 1)), + Seq(("a", 1), ("b", 1)), + Seq(("a", 1), ("b", 1), ("c", 1)), + Seq(("a", 1), ("b", 1)), + Seq(("a", 1)), + Seq() + ) + + val bigOutput = Seq( + Seq(("a", 1)), + Seq(("a", 2), ("b", 1)), + Seq(("a", 2), ("b", 2), ("c", 1)), + Seq(("a", 2), ("b", 2), ("c", 1)), + Seq(("a", 2), ("b", 1)), + Seq(("a", 1)), + Seq(("a", 1)), + Seq(("a", 2), ("b", 1)), + Seq(("a", 2), ("b", 2), ("c", 1)), + Seq(("a", 2), ("b", 2), ("c", 1)), + Seq(("a", 2), ("b", 1)), + Seq(("a", 1)) + ) + + /* + The output of the reduceByKeyAndWindow with inverse reduce function is + difference from the naive reduceByKeyAndWindow. Even if the count of a + particular key is 0, the key does not get eliminated from the RDDs of + ReducedWindowedDStream. This causes the number of keys in these RDDs to + increase forever. A more generalized version that allows elimination of + keys should be considered. + */ + val bigOutputInv = Seq( + Seq(("a", 1)), + Seq(("a", 2), ("b", 1)), + Seq(("a", 2), ("b", 2), ("c", 1)), + Seq(("a", 2), ("b", 2), ("c", 1)), + Seq(("a", 2), ("b", 1), ("c", 0)), + Seq(("a", 1), ("b", 0), ("c", 0)), + Seq(("a", 1), ("b", 0), ("c", 0)), + Seq(("a", 2), ("b", 1), ("c", 0)), + Seq(("a", 2), ("b", 2), ("c", 1)), + Seq(("a", 2), ("b", 2), ("c", 1)), + Seq(("a", 2), ("b", 1), ("c", 0)), + Seq(("a", 1), ("b", 0), ("c", 0)) + ) + + def testReduceByKeyAndWindow( + name: String, + input: Seq[Seq[(String, Int)]], + expectedOutput: Seq[Seq[(String, Int)]], + windowTime: Time = batchDuration * 2, + slideTime: Time = batchDuration + ) { + test("reduceByKeyAndWindow - " + name) { + val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt + val operation = (s: DStream[(String, Int)]) => { + s.reduceByKeyAndWindow(_ + _, windowTime, slideTime).persist() + } + testOperation(input, operation, expectedOutput, numBatches, true) + } + } + + def testReduceByKeyAndWindowInv( + name: String, + input: Seq[Seq[(String, Int)]], + expectedOutput: Seq[Seq[(String, Int)]], + windowTime: Time = batchDuration * 2, + slideTime: Time = batchDuration + ) { + test("reduceByKeyAndWindowInv - " + name) { + val numBatches = expectedOutput.size * (slideTime.millis / batchDuration.millis).toInt + val operation = (s: DStream[(String, Int)]) => { + s.reduceByKeyAndWindow(_ + _, _ - _, windowTime, slideTime).persist() + } + testOperation(input, operation, expectedOutput, numBatches, true) + } + } + + + // Testing naive reduceByKeyAndWindow (without invertible function) + + testReduceByKeyAndWindow( + "basic reduction", + Seq( Seq(("a", 1), ("a", 3)) ), + Seq( Seq(("a", 4)) ) + ) + + testReduceByKeyAndWindow( + "key already in window and new value added into window", + Seq( Seq(("a", 1)), Seq(("a", 1)) ), + Seq( Seq(("a", 1)), Seq(("a", 2)) ) + ) + + testReduceByKeyAndWindow( + "new key added into window", + Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)) ), + Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)) ) + ) + + testReduceByKeyAndWindow( + "key removed from window", + Seq( Seq(("a", 1)), Seq(("a", 1)), Seq(), Seq() ), + Seq( Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 1)), Seq() ) + ) + + testReduceByKeyAndWindow( + "larger slide time", + largerSlideInput, + largerSlideOutput, + Seconds(4), + Seconds(2) + ) + + testReduceByKeyAndWindow("big test", bigInput, bigOutput) + + + // Testing reduceByKeyAndWindow (with invertible reduce function) + + testReduceByKeyAndWindowInv( + "basic reduction", + Seq(Seq(("a", 1), ("a", 3)) ), + Seq(Seq(("a", 4)) ) + ) + + testReduceByKeyAndWindowInv( + "key already in window and new value added into window", + Seq( Seq(("a", 1)), Seq(("a", 1)) ), + Seq( Seq(("a", 1)), Seq(("a", 2)) ) + ) + + testReduceByKeyAndWindowInv( + "new key added into window", + Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)) ), + Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)) ) + ) + + testReduceByKeyAndWindowInv( + "key removed from window", + Seq( Seq(("a", 1)), Seq(("a", 1)), Seq(), Seq() ), + Seq( Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 1)), Seq(("a", 0)) ) + ) + + testReduceByKeyAndWindowInv( + "larger slide time", + largerSlideInput, + largerSlideOutput, + Seconds(4), + Seconds(2) + ) + + testReduceByKeyAndWindowInv("big test", bigInput, bigOutputInv) +} -- cgit v1.2.3