aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-04-05 19:57:23 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-04-05 19:57:23 -0700
commit8e5c1cbf2c3d5eaa7d9dd35def177414a0d4cf82 (patch)
tree4f07bc782342195e8423c2844d2e4e9459232b2a /streaming
parent7d29c72f64f8637d8182fb7c495f87ab7ce86ea0 (diff)
downloadspark-8e5c1cbf2c3d5eaa7d9dd35def177414a0d4cf82.tar.gz
spark-8e5c1cbf2c3d5eaa7d9dd35def177414a0d4cf82.tar.bz2
spark-8e5c1cbf2c3d5eaa7d9dd35def177414a0d4cf82.zip
[SPARK-13211][STREAMING] StreamingContext throws NoSuchElementException when created from non-existent checkpoint directory
## What changes were proposed in this pull request? Take 2: avoid None.get NoSuchElementException in favor of more descriptive IllegalArgumentException if a non-existent checkpoint dir is used without a SparkContext ## How was this patch tested? Jenkins test plus new test for this particular case Author: Sean Owen <sowen@cloudera.com> Closes #12174 from srowen/SPARK-13211.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala11
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala5
3 files changed, 10 insertions, 9 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index f9f3d97ef3..5cc677d085 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -334,8 +334,7 @@ object CheckpointReader extends Logging {
ignoreReadError: Boolean = false): Option[Checkpoint] = {
val checkpointPath = new Path(checkpointDir)
- // TODO(rxin): Why is this a def?!
- def fs: FileSystem = checkpointPath.getFileSystem(hadoopConf)
+ val fs = checkpointPath.getFileSystem(hadoopConf)
// Try to find the checkpoint files
val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).reverse
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index ac37e8e022..83a1092b16 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -106,7 +106,7 @@ class StreamingContext private[streaming] (
* HDFS compatible filesystems
*/
def this(path: String, hadoopConf: Configuration) =
- this(null, CheckpointReader.read(path, new SparkConf(), hadoopConf).get, null)
+ this(null, CheckpointReader.read(path, new SparkConf(), hadoopConf).orNull, null)
/**
* Recreate a StreamingContext from a checkpoint file.
@@ -122,15 +122,12 @@ class StreamingContext private[streaming] (
def this(path: String, sparkContext: SparkContext) = {
this(
sparkContext,
- CheckpointReader.read(path, sparkContext.conf, sparkContext.hadoopConfiguration).get,
+ CheckpointReader.read(path, sparkContext.conf, sparkContext.hadoopConfiguration).orNull,
null)
}
-
- if (_sc == null && _cp == null) {
- throw new Exception("Spark Streaming cannot be initialized with " +
- "both SparkContext and checkpoint as null")
- }
+ require(_sc != null || _cp != null,
+ "Spark Streaming cannot be initialized with both SparkContext and checkpoint as null")
private[streaming] val isCheckpointPresent: Boolean = _cp != null
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 9a3248b3e8..fbb25d4c59 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -228,6 +228,11 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
}
}
+ test("non-existent checkpoint dir") {
+ // SPARK-13211
+ intercept[IllegalArgumentException](new StreamingContext("nosuchdirectory"))
+ }
+
test("basic rdd checkpoints + dstream graph checkpoint recovery") {
assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second")