From 053d94fcf32268369b5a40837271f15d6af41aa4 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sun, 23 Aug 2015 19:24:32 -0700 Subject: [SPARK-10142] [STREAMING] Made python checkpoint recovery handle non-local checkpoint paths and existing SparkContexts The current code only checks checkpoint files in local filesystem, and always tries to create a new Python SparkContext (even if one already exists). The solution is to do the following: 1. Use the same code path as Java to check whether a valid checkpoint exists 2. Create a new Python SparkContext only if there no active one. There is not test for the path as its hard to test with distributed filesystem paths in a local unit test. I am going to test it with a distributed file system manually to verify that this patch works. Author: Tathagata Das Closes #8366 from tdas/SPARK-10142 and squashes the following commits: 3afa666 [Tathagata Das] Added tests 2dd4ae5 [Tathagata Das] Added the check to not create a context if one already exists 9bf151b [Tathagata Das] Made python checkpoint recovery use java to find the checkpoint files --- .../src/main/scala/org/apache/spark/streaming/Checkpoint.scala | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 6f6b449acc..cd5d960369 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -286,6 +286,15 @@ class CheckpointWriter( private[streaming] object CheckpointReader extends Logging { + /** + * Read checkpoint files present in the given checkpoint directory. If there are no checkpoint + * files, then return None, else try to return the latest valid checkpoint object. If no + * checkpoint files could be read correctly, then return None. + */ + def read(checkpointDir: String): Option[Checkpoint] = { + read(checkpointDir, new SparkConf(), SparkHadoopUtil.get.conf, ignoreReadError = true) + } + /** * Read checkpoint files present in the given checkpoint directory. If there are no checkpoint * files, then return None, else try to return the latest valid checkpoint object. If no -- cgit v1.2.3