aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-10-24 14:29:19 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-10-24 14:31:34 -0700
commit05ac9940ee97744b8952ede74edfcd63e6e55a5b (patch)
tree1dc68eab9a2ddbe701f71b28b731cdfb4ac70b09 /streaming
parent2fda84fe3ffa4d887e58b4d1717765a42a30b0f9 (diff)
downloadspark-05ac9940ee97744b8952ede74edfcd63e6e55a5b.tar.gz
spark-05ac9940ee97744b8952ede74edfcd63e6e55a5b.tar.bz2
spark-05ac9940ee97744b8952ede74edfcd63e6e55a5b.zip
Adding tests
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala38
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala55
2 files changed, 88 insertions, 5 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 11586f72b6..55cfcb371a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -82,6 +82,44 @@ class BasicOperationsSuite extends TestSuiteBase {
testOperation(input, operation, output, true)
}
+ test("repartition (more partitions)") {
+ val input = Seq(1 to 100, 101 to 200, 201 to 300)
+ val operation = (r: DStream[Int]) => r.repartition(5)
+ val ssc = setupStreams(input, operation, 2)
+ val output = runStreamsWithPartitions(ssc, 3, 3)
+ assert(output.size === 3)
+ val first = output(0)
+ val second = output(1)
+ val third = output(2)
+
+ assert(first.size === 5)
+ assert(second.size === 5)
+ assert(third.size === 5)
+
+ assert(first.flatten.toSet === (1 to 100).toSet)
+ assert(second.flatten.toSet === (101 to 200).toSet)
+ assert(third.flatten.toSet === (201 to 300).toSet)
+ }
+
+ test("repartition (fewer partitions)") {
+ val input = Seq(1 to 100, 101 to 200, 201 to 300)
+ val operation = (r: DStream[Int]) => r.repartition(2)
+ val ssc = setupStreams(input, operation, 5)
+ val output = runStreamsWithPartitions(ssc, 3, 3)
+ assert(output.size === 3)
+ val first = output(0)
+ val second = output(1)
+ val third = output(2)
+
+ assert(first.size === 2)
+ assert(second.size === 2)
+ assert(third.size === 2)
+
+ assert(first.flatten.toSet === (1 to 100).toSet)
+ assert(second.flatten.toSet === (101 to 200).toSet)
+ assert(third.flatten.toSet === (201 to 300).toSet)
+ }
+
test("groupByKey") {
testOperation(
Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ),
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 37dd9c4cc6..26f515a778 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -60,6 +60,8 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[
/**
* This is a output stream just for the testsuites. All the output is collected into a
* ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
+ *
+ * The buffer contains a sequence of RDD's, each containing a sequence of items
*/
class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[T]])
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
@@ -76,6 +78,27 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBu
}
/**
+ * This is a output stream just for the testsuites. All the output is collected into a
+ * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
+ *
+ * The buffer contains a sequence of RDD's, each containing a sequence of partitions, each
+ * containing a sequnce of items.
+ */
+class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[Seq[T]]])
+ extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
+ val collected = rdd.glom().collect().map(_.toSeq)
+ output += collected
+ }) {
+
+ // This is to clear the output buffer every it is read from a checkpoint
+ @throws(classOf[IOException])
+ private def readObject(ois: ObjectInputStream) {
+ ois.defaultReadObject()
+ output.clear()
+ }
+}
+
+/**
* This is the base trait for Spark Streaming testsuites. This provides basic functionality
* to run user-defined set of input on user-defined stream operations, and verify the output.
*/
@@ -108,7 +131,8 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
*/
def setupStreams[U: ClassManifest, V: ClassManifest](
input: Seq[Seq[U]],
- operation: DStream[U] => DStream[V]
+ operation: DStream[U] => DStream[V],
+ numPartitions: Int = numInputPartitions
): StreamingContext = {
// Create StreamingContext
@@ -118,9 +142,10 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
}
// Setup the stream computation
- val inputStream = new TestInputStream(ssc, input, numInputPartitions)
+ val inputStream = new TestInputStream(ssc, input, numPartitions)
val operatedStream = operation(inputStream)
- val outputStream = new TestOutputStream(operatedStream, new ArrayBuffer[Seq[V]] with SynchronizedBuffer[Seq[V]])
+ val outputStream = new TestOutputStreamWithPartitions(operatedStream,
+ new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]])
ssc.registerInputStream(inputStream)
ssc.registerOutputStream(outputStream)
ssc
@@ -146,7 +171,8 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
val inputStream1 = new TestInputStream(ssc, input1, numInputPartitions)
val inputStream2 = new TestInputStream(ssc, input2, numInputPartitions)
val operatedStream = operation(inputStream1, inputStream2)
- val outputStream = new TestOutputStream(operatedStream, new ArrayBuffer[Seq[W]] with SynchronizedBuffer[Seq[W]])
+ val outputStream = new TestOutputStreamWithPartitions(operatedStream,
+ new ArrayBuffer[Seq[Seq[W]]] with SynchronizedBuffer[Seq[Seq[W]]])
ssc.registerInputStream(inputStream1)
ssc.registerInputStream(inputStream2)
ssc.registerOutputStream(outputStream)
@@ -157,18 +183,37 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* Runs the streams set up in `ssc` on manual clock for `numBatches` batches and
* returns the collected output. It will wait until `numExpectedOutput` number of
* output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached.
+ *
+ * Returns a sequence of items for each RDD.
*/
def runStreams[V: ClassManifest](
ssc: StreamingContext,
numBatches: Int,
numExpectedOutput: Int
): Seq[Seq[V]] = {
+ // Flatten each RDD into a single Seq
+ runStreamsWithPartitions(ssc, numBatches, numExpectedOutput).map(_.flatten.toSeq)
+ }
+
+ /**
+ * Runs the streams set up in `ssc` on manual clock for `numBatches` batches and
+ * returns the collected output. It will wait until `numExpectedOutput` number of
+ * output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached.
+ *
+ * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each
+ * representing one partition.
+ */
+ def runStreamsWithPartitions[V: ClassManifest](
+ ssc: StreamingContext,
+ numBatches: Int,
+ numExpectedOutput: Int
+ ): Seq[Seq[Seq[V]]] = {
assert(numBatches > 0, "Number of batches to run stream computation is zero")
assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero")
logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput)
// Get the output buffer
- val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
+ val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
val output = outputStream.output
try {