aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-10 03:28:39 +0000
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-10 03:28:39 +0000
commit4a5558ca9921ce89b3996e9ead13b07123fc7a2d (patch)
treea08043f9e11899412822c9017f79bf2d18efebfd
parentf1d206c6b4c0a5b2517b05af05fdda6049e2f7c2 (diff)
downloadspark-4a5558ca9921ce89b3996e9ead13b07123fc7a2d.tar.gz
spark-4a5558ca9921ce89b3996e9ead13b07123fc7a2d.tar.bz2
spark-4a5558ca9921ce89b3996e9ead13b07123fc7a2d.zip
Fixed bugs in reading of checkpoints.
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala20
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala17
2 files changed, 20 insertions, 17 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 476ae70bc9..d268b68f90 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -165,16 +165,28 @@ object CheckpointReader extends Logging {
def read(checkpointDir: String, conf: SparkConf, hadoopConf: Configuration): Option[Checkpoint] = {
val checkpointPath = new Path(checkpointDir)
def fs = checkpointPath.getFileSystem(hadoopConf)
- val existingFiles = graphFileNames.map(new Path(checkpointPath, _)).filter(fs.exists)
+
+ // See if the checkpoint directory exists
+ if (!fs.exists(checkpointPath)) {
+ logInfo("Could not load checkpoint as path '" + checkpointPath + "' does not exist")
+ return None
+ }
- // Log the file listing if graph checkpoint file was not found
+ // Try to find the checkpoint data
+ val existingFiles = graphFileNames.map(new Path(checkpointPath, _)).filter(fs.exists)
if (existingFiles.isEmpty) {
- logInfo("Could not find graph file in " + checkpointDir + ", which contains the files:\n" +
- fs.listStatus(checkpointPath).mkString("\n"))
+ logInfo("Could not load checkpoint as checkpoint data was not " +
+ "found in directory " + checkpointDir + "")
+ val statuses = fs.listStatus(checkpointPath)
+ if (statuses!=null) {
+ logInfo("Checkpoint directory " + checkpointDir + " contains the files:\n" +
+ statuses.mkString("\n"))
+ }
return None
}
logInfo("Checkpoint files found: " + existingFiles.mkString(","))
+ // Try to read the checkpoint data
val compressionCodec = CompressionCodec.createCodec(conf)
existingFiles.foreach(file => {
logInfo("Attempting to load checkpoint from file '" + file + "'")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 76be81603c..dd34f6f4f2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -496,26 +496,17 @@ object StreamingContext extends Logging {
hadoopConf: Configuration = new Configuration(),
createOnError: Boolean = false
): StreamingContext = {
-
- try {
- CheckpointReader.read(checkpointPath, new SparkConf(), hadoopConf) match {
- case Some(checkpoint) =>
- return new StreamingContext(null, checkpoint, null)
- case None =>
- logInfo("Creating new StreamingContext")
- return creatingFunc()
- }
+ val checkpointOption = try {
+ CheckpointReader.read(checkpointPath, new SparkConf(), hadoopConf)
} catch {
case e: Exception =>
if (createOnError) {
- logWarning("Error reading checkpoint", e)
- logInfo("Creating new StreamingContext")
- return creatingFunc()
+ None
} else {
- logError("Error reading checkpoint", e)
throw e
}
}
+ checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc())
}
/**