aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala12
1 files changed, 11 insertions, 1 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 4792ca1f8a..04925886c3 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.SparkContext._
import util.ManualClock
-import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.{SparkException, SparkConf}
import org.apache.spark.streaming.dstream.{WindowedDStream, DStream}
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import scala.reflect.ClassTag
@@ -398,6 +398,16 @@ class BasicOperationsSuite extends TestSuiteBase {
Thread.sleep(1000)
}
+ test("slice - has not been initialized") {
+ val ssc = new StreamingContext(conf, Seconds(1))
+ val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4))
+ val stream = new TestInputStream[Int](ssc, input, 2)
+ val thrown = intercept[SparkException] {
+ stream.slice(new Time(0), new Time(1000))
+ }
+ assert(thrown.getMessage.contains("has not been initialized"))
+ }
+
val cleanupTestInput = (0 until 10).map(x => Seq(x, x + 1)).toSeq
test("rdd cleanup - map and window") {