diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-10-24 14:29:19 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-10-24 14:31:34 -0700 |
commit | 05ac9940ee97744b8952ede74edfcd63e6e55a5b (patch) | |
tree | 1dc68eab9a2ddbe701f71b28b731cdfb4ac70b09 /streaming/src | |
parent | 2fda84fe3ffa4d887e58b4d1717765a42a30b0f9 (diff) | |
download | spark-05ac9940ee97744b8952ede74edfcd63e6e55a5b.tar.gz spark-05ac9940ee97744b8952ede74edfcd63e6e55a5b.tar.bz2 spark-05ac9940ee97744b8952ede74edfcd63e6e55a5b.zip |
Adding tests
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala | 38 | ||||
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala | 55 |
2 files changed, 88 insertions, 5 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 11586f72b6..55cfcb371a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -82,6 +82,44 @@ class BasicOperationsSuite extends TestSuiteBase { testOperation(input, operation, output, true) } + test("repartition (more partitions)") { + val input = Seq(1 to 100, 101 to 200, 201 to 300) + val operation = (r: DStream[Int]) => r.repartition(5) + val ssc = setupStreams(input, operation, 2) + val output = runStreamsWithPartitions(ssc, 3, 3) + assert(output.size === 3) + val first = output(0) + val second = output(1) + val third = output(2) + + assert(first.size === 5) + assert(second.size === 5) + assert(third.size === 5) + + assert(first.flatten.toSet === (1 to 100).toSet) + assert(second.flatten.toSet === (101 to 200).toSet) + assert(third.flatten.toSet === (201 to 300).toSet) + } + + test("repartition (fewer partitions)") { + val input = Seq(1 to 100, 101 to 200, 201 to 300) + val operation = (r: DStream[Int]) => r.repartition(2) + val ssc = setupStreams(input, operation, 5) + val output = runStreamsWithPartitions(ssc, 3, 3) + assert(output.size === 3) + val first = output(0) + val second = output(1) + val third = output(2) + + assert(first.size === 2) + assert(second.size === 2) + assert(third.size === 2) + + assert(first.flatten.toSet === (1 to 100).toSet) + assert(second.flatten.toSet === (101 to 200).toSet) + assert(third.flatten.toSet === (201 to 300).toSet) + } + test("groupByKey") { testOperation( Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ), diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 37dd9c4cc6..26f515a778 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -60,6 +60,8 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[ /** * This is a output stream just for the testsuites. All the output is collected into a * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint. + * + * The buffer contains a sequence of RDD's, each containing a sequence of items */ class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[T]]) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { @@ -76,6 +78,27 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBu } /** + * This is a output stream just for the testsuites. All the output is collected into a + * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint. + * + * The buffer contains a sequence of RDD's, each containing a sequence of partitions, each + * containing a sequnce of items. + */ +class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[Seq[T]]]) + extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { + val collected = rdd.glom().collect().map(_.toSeq) + output += collected + }) { + + // This is to clear the output buffer every it is read from a checkpoint + @throws(classOf[IOException]) + private def readObject(ois: ObjectInputStream) { + ois.defaultReadObject() + output.clear() + } +} + +/** * This is the base trait for Spark Streaming testsuites. This provides basic functionality * to run user-defined set of input on user-defined stream operations, and verify the output. */ @@ -108,7 +131,8 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { */ def setupStreams[U: ClassManifest, V: ClassManifest]( input: Seq[Seq[U]], - operation: DStream[U] => DStream[V] + operation: DStream[U] => DStream[V], + numPartitions: Int = numInputPartitions ): StreamingContext = { // Create StreamingContext @@ -118,9 +142,10 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { } // Setup the stream computation - val inputStream = new TestInputStream(ssc, input, numInputPartitions) + val inputStream = new TestInputStream(ssc, input, numPartitions) val operatedStream = operation(inputStream) - val outputStream = new TestOutputStream(operatedStream, new ArrayBuffer[Seq[V]] with SynchronizedBuffer[Seq[V]]) + val outputStream = new TestOutputStreamWithPartitions(operatedStream, + new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]]) ssc.registerInputStream(inputStream) ssc.registerOutputStream(outputStream) ssc @@ -146,7 +171,8 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { val inputStream1 = new TestInputStream(ssc, input1, numInputPartitions) val inputStream2 = new TestInputStream(ssc, input2, numInputPartitions) val operatedStream = operation(inputStream1, inputStream2) - val outputStream = new TestOutputStream(operatedStream, new ArrayBuffer[Seq[W]] with SynchronizedBuffer[Seq[W]]) + val outputStream = new TestOutputStreamWithPartitions(operatedStream, + new ArrayBuffer[Seq[Seq[W]]] with SynchronizedBuffer[Seq[Seq[W]]]) ssc.registerInputStream(inputStream1) ssc.registerInputStream(inputStream2) ssc.registerOutputStream(outputStream) @@ -157,18 +183,37 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { * Runs the streams set up in `ssc` on manual clock for `numBatches` batches and * returns the collected output. It will wait until `numExpectedOutput` number of * output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached. + * + * Returns a sequence of items for each RDD. */ def runStreams[V: ClassManifest]( ssc: StreamingContext, numBatches: Int, numExpectedOutput: Int ): Seq[Seq[V]] = { + // Flatten each RDD into a single Seq + runStreamsWithPartitions(ssc, numBatches, numExpectedOutput).map(_.flatten.toSeq) + } + + /** + * Runs the streams set up in `ssc` on manual clock for `numBatches` batches and + * returns the collected output. It will wait until `numExpectedOutput` number of + * output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached. + * + * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each + * representing one partition. + */ + def runStreamsWithPartitions[V: ClassManifest]( + ssc: StreamingContext, + numBatches: Int, + numExpectedOutput: Int + ): Seq[Seq[Seq[V]]] = { assert(numBatches > 0, "Number of batches to run stream computation is zero") assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero") logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput) // Get the output buffer - val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]] + val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStreamWithPartitions[V]] val output = outputStream.output try { |