From 8e5c1cbf2c3d5eaa7d9dd35def177414a0d4cf82 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Tue, 5 Apr 2016 19:57:23 -0700 Subject: [SPARK-13211][STREAMING] StreamingContext throws NoSuchElementException when created from non-existent checkpoint directory ## What changes were proposed in this pull request? Take 2: avoid None.get NoSuchElementException in favor of more descriptive IllegalArgumentException if a non-existent checkpoint dir is used without a SparkContext ## How was this patch tested? Jenkins test plus new test for this particular case Author: Sean Owen Closes #12174 from srowen/SPARK-13211. --- .../main/scala/org/apache/spark/streaming/Checkpoint.scala | 3 +-- .../scala/org/apache/spark/streaming/StreamingContext.scala | 11 ++++------- .../scala/org/apache/spark/streaming/CheckpointSuite.scala | 5 +++++ 3 files changed, 10 insertions(+), 9 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index f9f3d97ef3..5cc677d085 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -334,8 +334,7 @@ object CheckpointReader extends Logging { ignoreReadError: Boolean = false): Option[Checkpoint] = { val checkpointPath = new Path(checkpointDir) - // TODO(rxin): Why is this a def?! - def fs: FileSystem = checkpointPath.getFileSystem(hadoopConf) + val fs = checkpointPath.getFileSystem(hadoopConf) // Try to find the checkpoint files val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).reverse diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index ac37e8e022..83a1092b16 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -106,7 +106,7 @@ class StreamingContext private[streaming] ( * HDFS compatible filesystems */ def this(path: String, hadoopConf: Configuration) = - this(null, CheckpointReader.read(path, new SparkConf(), hadoopConf).get, null) + this(null, CheckpointReader.read(path, new SparkConf(), hadoopConf).orNull, null) /** * Recreate a StreamingContext from a checkpoint file. @@ -122,15 +122,12 @@ class StreamingContext private[streaming] ( def this(path: String, sparkContext: SparkContext) = { this( sparkContext, - CheckpointReader.read(path, sparkContext.conf, sparkContext.hadoopConfiguration).get, + CheckpointReader.read(path, sparkContext.conf, sparkContext.hadoopConfiguration).orNull, null) } - - if (_sc == null && _cp == null) { - throw new Exception("Spark Streaming cannot be initialized with " + - "both SparkContext and checkpoint as null") - } + require(_sc != null || _cp != null, + "Spark Streaming cannot be initialized with both SparkContext and checkpoint as null") private[streaming] val isCheckpointPresent: Boolean = _cp != null diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 9a3248b3e8..fbb25d4c59 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -228,6 +228,11 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester } } + test("non-existent checkpoint dir") { + // SPARK-13211 + intercept[IllegalArgumentException](new StreamingContext("nosuchdirectory")) + } + test("basic rdd checkpoints + dstream graph checkpoint recovery") { assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second") -- cgit v1.2.3