aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-05-07 00:21:10 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-07 00:21:10 -0700
commitcfdadcbd2b529cd9ac721509a7ebafe436afcd8d (patch)
tree7a15085179679b3d0b916c74ed2ef981042a6fba /streaming
parent2d6612cc8b98f767d73c4d15e4065bf3d6c12ea7 (diff)
downloadspark-cfdadcbd2b529cd9ac721509a7ebafe436afcd8d.tar.gz
spark-cfdadcbd2b529cd9ac721509a7ebafe436afcd8d.tar.bz2
spark-cfdadcbd2b529cd9ac721509a7ebafe436afcd8d.zip
[SPARK-7430] [STREAMING] [TEST] General improvements to streaming tests to increase debuggability
Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #5961 from tdas/SPARK-7430 and squashes the following commits: d654978 [Tathagata Das] Fix scala style fbf7174 [Tathagata Das] Added more verbose assert failure messages. 6aea07a [Tathagata Das] Ensure SynchronizedBuffer is used in every TestSuiteBase
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala33
1 files changed, 21 insertions, 12 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 4d0cd7516f..4f70ae7f1f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -73,9 +73,11 @@ class TestInputStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]],
*
* The buffer contains a sequence of RDD's, each containing a sequence of items
*/
-class TestOutputStream[T: ClassTag](parent: DStream[T],
- val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]())
- extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
+class TestOutputStream[T: ClassTag](
+ parent: DStream[T],
+ val output: SynchronizedBuffer[Seq[T]] =
+ new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]
+ ) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
output += collected
}) {
@@ -95,8 +97,10 @@ class TestOutputStream[T: ClassTag](parent: DStream[T],
* The buffer contains a sequence of RDD's, each containing a sequence of partitions, each
* containing a sequence of items.
*/
-class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
- val output: ArrayBuffer[Seq[Seq[T]]] = ArrayBuffer[Seq[Seq[T]]]())
+class TestOutputStreamWithPartitions[T: ClassTag](
+ parent: DStream[T],
+ val output: SynchronizedBuffer[Seq[Seq[T]]] =
+ new ArrayBuffer[Seq[Seq[T]]] with SynchronizedBuffer[Seq[Seq[T]]])
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.glom().collect().map(_.toSeq)
output += collected
@@ -108,10 +112,6 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
ois.defaultReadObject()
output.clear()
}
-
- def toTestOutputStream: TestOutputStream[T] = {
- new TestOutputStream[T](this.parent, this.output.map(_.flatten))
- }
}
/**
@@ -425,12 +425,21 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
logInfo("--------------------------------")
// Match the output with the expected output
- assert(output.size === expectedOutput.size, "Number of outputs do not match")
for (i <- 0 until output.size) {
if (useSet) {
- assert(output(i).toSet === expectedOutput(i).toSet)
+ assert(
+ output(i).toSet === expectedOutput(i).toSet,
+ s"Set comparison failed\n" +
+ s"Expected output (${expectedOutput.size} items):\n${expectedOutput.mkString("\n")}\n" +
+ s"Generated output (${output.size} items): ${output.mkString("\n")}"
+ )
} else {
- assert(output(i).toList === expectedOutput(i).toList)
+ assert(
+ output(i).toList === expectedOutput(i).toList,
+ s"Ordered list comparison failed\n" +
+ s"Expected output (${expectedOutput.size} items):\n${expectedOutput.mkString("\n")}\n" +
+ s"Generated output (${output.size} items): ${output.mkString("\n")}"
+ )
}
}
logInfo("Output verified successfully")