diff options
author | zsxwing <zsxwing@gmail.com> | 2014-04-25 19:04:34 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-04-25 19:04:34 -0700 |
commit | 058797c1722c9251f6bc6ad2672cb0e79146b04f (patch) | |
tree | c11807354496e1bcb829dd5e388c4881afcc1e01 /streaming/src/test | |
parent | 87cf35c2d6acc9649b3fb05648b79b9862b3959b (diff) | |
download | spark-058797c1722c9251f6bc6ad2672cb0e79146b04f.tar.gz spark-058797c1722c9251f6bc6ad2672cb0e79146b04f.tar.bz2 spark-058797c1722c9251f6bc6ad2672cb0e79146b04f.zip |
[Spark-1382] Fix NPE in DStream.slice (updated version of #365)
@zsxwing I cherry-picked your changes and merged the master. #365 had some conflicts once again!
Author: zsxwing <zsxwing@gmail.com>
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #562 from tdas/SPARK-1382 and squashes the following commits:
e2962c1 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-1382
20968d9 [zsxwing] Replace Exception with SparkException in DStream
e476651 [zsxwing] Merge remote-tracking branch 'origin/master' into SPARK-1382
35ba56a [zsxwing] SPARK-1382: Fix NPE in DStream.slice
Diffstat (limited to 'streaming/src/test')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala | 12 |
1 files changed, 11 insertions, 1 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 4792ca1f8a..04925886c3 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.rdd.{BlockRDD, RDD} import org.apache.spark.SparkContext._ import util.ManualClock -import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.{SparkException, SparkConf} import org.apache.spark.streaming.dstream.{WindowedDStream, DStream} import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} import scala.reflect.ClassTag @@ -398,6 +398,16 @@ class BasicOperationsSuite extends TestSuiteBase { Thread.sleep(1000) } + test("slice - has not been initialized") { + val ssc = new StreamingContext(conf, Seconds(1)) + val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4)) + val stream = new TestInputStream[Int](ssc, input, 2) + val thrown = intercept[SparkException] { + stream.slice(new Time(0), new Time(1000)) + } + assert(thrown.getMessage.contains("has not been initialized")) + } + val cleanupTestInput = (0 until 10).map(x => Seq(x, x + 1)).toSeq test("rdd cleanup - map and window") { |