aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-10-24 16:43:33 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-10-24 16:43:33 -0700
commit39f6f75588b69f07cd963c5e211045fed103695b (patch)
treec6ba252990d9a970373f99e6291787b79bcb3628 /streaming
parent9423532fab4d58ac05f283ac2e9baddc3ee48928 (diff)
downloadspark-39f6f75588b69f07cd963c5e211045fed103695b.tar.gz
spark-39f6f75588b69f07cd963c5e211045fed103695b.tar.bz2
spark-39f6f75588b69f07cd963c5e211045fed103695b.zip
Some clean-up of tests
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala10
3 files changed, 10 insertions, 7 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
index 8a6604904d..5344ae7682 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
@@ -54,8 +54,7 @@ trait JavaTestBase extends TestSuiteBase {
{
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- val ostream = new TestOutputStream(dstream.dstream,
- new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]])
+ val ostream = new TestOutputStreamWithPartitions(dstream.dstream)
dstream.dstream.ssc.registerOutputStream(ostream)
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index a327de80b3..beb20831bd 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -366,7 +366,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
logInfo("Manual clock after advancing = " + clock.time)
Thread.sleep(batchDuration.milliseconds)
- val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
- outputStream.output
+ val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
+ outputStream.output.map(_.flatten)
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 26f515a778..be140699c2 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -63,7 +63,8 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[
*
* The buffer contains a sequence of RDD's, each containing a sequence of items
*/
-class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[T]])
+class TestOutputStream[T: ClassManifest](parent: DStream[T],
+ val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]())
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
output += collected
@@ -82,9 +83,10 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBu
* ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
*
* The buffer contains a sequence of RDD's, each containing a sequence of partitions, each
- * containing a sequnce of items.
+ * containing a sequence of items.
*/
-class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[Seq[T]]])
+class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T],
+ val output: ArrayBuffer[Seq[Seq[T]]] = ArrayBuffer[Seq[Seq[T]]]())
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.glom().collect().map(_.toSeq)
output += collected
@@ -96,6 +98,8 @@ class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T], val o
ois.defaultReadObject()
output.clear()
}
+
+ def toTestOutputStream = new TestOutputStream[T](this.parent, this.output.map(_.flatten))
}
/**