aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala11
1 files changed, 7 insertions, 4 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 9a3248b3e8..bdbac64b9b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -228,6 +228,11 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
}
}
+ test("non-existent checkpoint dir") {
+ // SPARK-13211
+ intercept[IllegalArgumentException](new StreamingContext("nosuchdirectory"))
+ }
+
test("basic rdd checkpoints + dstream graph checkpoint recovery") {
assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second")
@@ -262,10 +267,9 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
assert(!stateStream.checkpointData.currentCheckpointFiles.isEmpty,
"No checkpointed RDDs in state stream before first failure")
stateStream.checkpointData.currentCheckpointFiles.foreach {
- case (time, file) => {
+ case (time, file) =>
assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time +
" for state stream before first failure does not exist")
- }
}
// Run till a further time such that previous checkpoint files in the stream would be deleted
@@ -292,10 +296,9 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
assert(!stateStream.checkpointData.currentCheckpointFiles.isEmpty,
"No checkpointed RDDs in state stream before second failure")
stateStream.checkpointData.currentCheckpointFiles.foreach {
- case (time, file) => {
+ case (time, file) =>
assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time +
" for state stream before seconds failure does not exist")
- }
}
ssc.stop()