aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-08-23 19:24:32 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-23 19:24:32 -0700
commit053d94fcf32268369b5a40837271f15d6af41aa4 (patch)
tree04e0e1d58f4d291c4bd50a7d59444eec0c88f1d5 /streaming
parentb963c19a803c5a26c9b65655d40ca6621acf8bd4 (diff)
downloadspark-053d94fcf32268369b5a40837271f15d6af41aa4.tar.gz
spark-053d94fcf32268369b5a40837271f15d6af41aa4.tar.bz2
spark-053d94fcf32268369b5a40837271f15d6af41aa4.zip
[SPARK-10142] [STREAMING] Made python checkpoint recovery handle non-local checkpoint paths and existing SparkContexts
The current code only checks checkpoint files in local filesystem, and always tries to create a new Python SparkContext (even if one already exists). The solution is to do the following: 1. Use the same code path as Java to check whether a valid checkpoint exists 2. Create a new Python SparkContext only if there no active one. There is not test for the path as its hard to test with distributed filesystem paths in a local unit test. I am going to test it with a distributed file system manually to verify that this patch works. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #8366 from tdas/SPARK-10142 and squashes the following commits: 3afa666 [Tathagata Das] Added tests 2dd4ae5 [Tathagata Das] Added the check to not create a context if one already exists 9bf151b [Tathagata Das] Made python checkpoint recovery use java to find the checkpoint files
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala9
1 files changed, 9 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 6f6b449acc..cd5d960369 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -289,6 +289,15 @@ object CheckpointReader extends Logging {
/**
* Read checkpoint files present in the given checkpoint directory. If there are no checkpoint
* files, then return None, else try to return the latest valid checkpoint object. If no
+ * checkpoint files could be read correctly, then return None.
+ */
+ def read(checkpointDir: String): Option[Checkpoint] = {
+ read(checkpointDir, new SparkConf(), SparkHadoopUtil.get.conf, ignoreReadError = true)
+ }
+
+ /**
+ * Read checkpoint files present in the given checkpoint directory. If there are no checkpoint
+ * files, then return None, else try to return the latest valid checkpoint object. If no
* checkpoint files could be read correctly, then return None (if ignoreReadError = true),
* or throw exception (if ignoreReadError = false).
*/