diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2015-05-07 00:21:10 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-05-07 00:21:10 -0700 |
commit | cfdadcbd2b529cd9ac721509a7ebafe436afcd8d (patch) | |
tree | 7a15085179679b3d0b916c74ed2ef981042a6fba /streaming | |
parent | 2d6612cc8b98f767d73c4d15e4065bf3d6c12ea7 (diff) | |
download | spark-cfdadcbd2b529cd9ac721509a7ebafe436afcd8d.tar.gz spark-cfdadcbd2b529cd9ac721509a7ebafe436afcd8d.tar.bz2 spark-cfdadcbd2b529cd9ac721509a7ebafe436afcd8d.zip |
[SPARK-7430] [STREAMING] [TEST] General improvements to streaming tests to increase debuggability
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #5961 from tdas/SPARK-7430 and squashes the following commits:
d654978 [Tathagata Das] Fix scala style
fbf7174 [Tathagata Das] Added more verbose assert failure messages.
6aea07a [Tathagata Das] Ensure SynchronizedBuffer is used in every TestSuiteBase
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala | 33 |
1 files changed, 21 insertions, 12 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 4d0cd7516f..4f70ae7f1f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -73,9 +73,11 @@ class TestInputStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]], * * The buffer contains a sequence of RDD's, each containing a sequence of items */ -class TestOutputStream[T: ClassTag](parent: DStream[T], - val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]()) - extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { +class TestOutputStream[T: ClassTag]( + parent: DStream[T], + val output: SynchronizedBuffer[Seq[T]] = + new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]] + ) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { val collected = rdd.collect() output += collected }) { @@ -95,8 +97,10 @@ class TestOutputStream[T: ClassTag](parent: DStream[T], * The buffer contains a sequence of RDD's, each containing a sequence of partitions, each * containing a sequence of items. */ -class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T], - val output: ArrayBuffer[Seq[Seq[T]]] = ArrayBuffer[Seq[Seq[T]]]()) +class TestOutputStreamWithPartitions[T: ClassTag]( + parent: DStream[T], + val output: SynchronizedBuffer[Seq[Seq[T]]] = + new ArrayBuffer[Seq[Seq[T]]] with SynchronizedBuffer[Seq[Seq[T]]]) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { val collected = rdd.glom().collect().map(_.toSeq) output += collected @@ -108,10 +112,6 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T], ois.defaultReadObject() output.clear() } - - def toTestOutputStream: TestOutputStream[T] = { - new TestOutputStream[T](this.parent, this.output.map(_.flatten)) - } } /** @@ -425,12 +425,21 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { logInfo("--------------------------------") // Match the output with the expected output - assert(output.size === expectedOutput.size, "Number of outputs do not match") for (i <- 0 until output.size) { if (useSet) { - assert(output(i).toSet === expectedOutput(i).toSet) + assert( + output(i).toSet === expectedOutput(i).toSet, + s"Set comparison failed\n" + + s"Expected output (${expectedOutput.size} items):\n${expectedOutput.mkString("\n")}\n" + + s"Generated output (${output.size} items): ${output.mkString("\n")}" + ) } else { - assert(output(i).toList === expectedOutput(i).toList) + assert( + output(i).toList === expectedOutput(i).toList, + s"Ordered list comparison failed\n" + + s"Expected output (${expectedOutput.size} items):\n${expectedOutput.mkString("\n")}\n" + + s"Generated output (${output.size} items): ${output.mkString("\n")}" + ) } } logInfo("Output verified successfully") |