aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-09-08 20:39:15 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-09-08 20:39:15 -0700
commit820913f554bef610d07ca2dadaead657f916ae63 (patch)
tree5a249567fc50429383ff22f40441fd1655ff991e /streaming/src/test
parentae74c3fa84885dac18f39368e6dda48a2f7a25a5 (diff)
downloadspark-820913f554bef610d07ca2dadaead657f916ae63.tar.gz
spark-820913f554bef610d07ca2dadaead657f916ae63.tar.bz2
spark-820913f554bef610d07ca2dadaead657f916ae63.zip
[SPARK-10071] [STREAMING] Output a warning when writing QueueInputDStream and throw a better exception when reading QueueInputDStream
Output a warning when serializing QueueInputDStream rather than throwing an exception to allow unit tests use it. Moreover, this PR also throws an better exception when deserializing QueueInputDStream to make the user find out the problem easily. The previous exception is hard to understand: https://issues.apache.org/jira/browse/SPARK-8553 Author: zsxwing <zsxwing@gmail.com> Closes #8624 from zsxwing/SPARK-10071 and squashes the following commits: 847cfa8 [zsxwing] Output a warning when writing QueueInputDStream and throw a better exception when reading QueueInputDStream
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala28
1 files changed, 19 insertions, 9 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 7423ef6bcb..d26894e88f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -30,7 +30,7 @@ import org.scalatest.concurrent.Timeouts
import org.scalatest.exceptions.TestFailedDueToTimeoutException
import org.scalatest.time.SpanSugar._
-import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark._
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.metrics.source.Source
import org.apache.spark.storage.StorageLevel
@@ -726,16 +726,26 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
}
test("queueStream doesn't support checkpointing") {
- val checkpointDir = Utils.createTempDir()
- ssc = new StreamingContext(master, appName, batchDuration)
- val rdd = ssc.sparkContext.parallelize(1 to 10)
- ssc.queueStream[Int](Queue(rdd)).print()
- ssc.checkpoint(checkpointDir.getAbsolutePath)
- val e = intercept[NotSerializableException] {
- ssc.start()
+ val checkpointDirectory = Utils.createTempDir().getAbsolutePath()
+ def creatingFunction(): StreamingContext = {
+ val _ssc = new StreamingContext(conf, batchDuration)
+ val rdd = _ssc.sparkContext.parallelize(1 to 10)
+ _ssc.checkpoint(checkpointDirectory)
+ _ssc.queueStream[Int](Queue(rdd)).register()
+ _ssc
+ }
+ ssc = StreamingContext.getOrCreate(checkpointDirectory, creatingFunction _)
+ ssc.start()
+ eventually(timeout(10000 millis)) {
+ assert(Checkpoint.getCheckpointFiles(checkpointDirectory).size > 1)
+ }
+ ssc.stop()
+ val e = intercept[SparkException] {
+ ssc = StreamingContext.getOrCreate(checkpointDirectory, creatingFunction _)
}
// StreamingContext.validate changes the message, so use "contains" here
- assert(e.getMessage.contains("queueStream doesn't support checkpointing"))
+ assert(e.getCause.getMessage.contains("queueStream doesn't support checkpointing. " +
+ "Please don't use queueStream when checkpointing is enabled."))
}
def addInputStream(s: StreamingContext): DStream[Int] = {