From 4a5558ca9921ce89b3996e9ead13b07123fc7a2d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 10 Jan 2014 03:28:39 +0000 Subject: Fixed bugs in reading of checkpoints. --- .../org/apache/spark/streaming/Checkpoint.scala | 20 ++++++++++++++++---- .../apache/spark/streaming/StreamingContext.scala | 17 ++++------------- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 476ae70bc9..d268b68f90 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -165,16 +165,28 @@ object CheckpointReader extends Logging { def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] = { val checkpointPath = new Path(checkpointDir) def fs = checkpointPath.getFileSystem(hadoopConf) - val existingFiles = graphFileNames.map(new Path(checkpointPath, _)).filter(fs.exists) + + // See if the checkpoint directory exists + if (!fs.exists(checkpointPath)) { + logInfo("Could not load checkpoint as path '" + checkpointPath + "' does not exist") + return None + } - // Log the file listing if graph checkpoint file was not found + // Try to find the checkpoint data + val existingFiles = graphFileNames.map(new Path(checkpointPath, _)).filter(fs.exists) if (existingFiles.isEmpty) { - logInfo("Could not find graph file in " + checkpointDir + ", which contains the files:\n" + - fs.listStatus(checkpointPath).mkString("\n")) + logInfo("Could not load checkpoint as checkpoint data was not " + + "found in directory " + checkpointDir + "") + val statuses = fs.listStatus(checkpointPath) + if (statuses!=null) { + logInfo("Checkpoint directory " + checkpointDir + " contains the files:\n" + + statuses.mkString("\n")) + } return None } logInfo("Checkpoint files found: " + existingFiles.mkString(",")) + // Try to read the checkpoint data val compressionCodec = CompressionCodec.createCodec(conf) existingFiles.foreach(file => { logInfo("Attempting to load checkpoint from file '" + file + "'") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 76be81603c..dd34f6f4f2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -496,26 +496,17 @@ object StreamingContext extends Logging { hadoopConf: Configuration = new Configuration(), createOnError: Boolean = false ): StreamingContext = { - - try { - CheckpointReader.read(checkpointPath, new SparkConf(), hadoopConf) match { - case Some(checkpoint) => - return new StreamingContext(null, checkpoint, null) - case None => - logInfo("Creating new StreamingContext") - return creatingFunc() - } + val checkpointOption = try { + CheckpointReader.read(checkpointPath, new SparkConf(), hadoopConf) } catch { case e: Exception => if (createOnError) { - logWarning("Error reading checkpoint", e) - logInfo("Creating new StreamingContext") - return creatingFunc() + None } else { - logError("Error reading checkpoint", e) throw e } } + checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc()) } /** -- cgit v1.2.3