aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/context.py
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-08-23 19:24:32 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-23 19:24:32 -0700
commit053d94fcf32268369b5a40837271f15d6af41aa4 (patch)
tree04e0e1d58f4d291c4bd50a7d59444eec0c88f1d5 /python/pyspark/streaming/context.py
parentb963c19a803c5a26c9b65655d40ca6621acf8bd4 (diff)
downloadspark-053d94fcf32268369b5a40837271f15d6af41aa4.tar.gz
spark-053d94fcf32268369b5a40837271f15d6af41aa4.tar.bz2
spark-053d94fcf32268369b5a40837271f15d6af41aa4.zip
[SPARK-10142] [STREAMING] Made python checkpoint recovery handle non-local checkpoint paths and existing SparkContexts
The current code only checks checkpoint files in local filesystem, and always tries to create a new Python SparkContext (even if one already exists). The solution is to do the following: 1. Use the same code path as Java to check whether a valid checkpoint exists 2. Create a new Python SparkContext only if there no active one. There is not test for the path as its hard to test with distributed filesystem paths in a local unit test. I am going to test it with a distributed file system manually to verify that this patch works. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #8366 from tdas/SPARK-10142 and squashes the following commits: 3afa666 [Tathagata Das] Added tests 2dd4ae5 [Tathagata Das] Added the check to not create a context if one already exists 9bf151b [Tathagata Das] Made python checkpoint recovery use java to find the checkpoint files
Diffstat (limited to 'python/pyspark/streaming/context.py')
-rw-r--r--python/pyspark/streaming/context.py22
1 files changed, 13 insertions, 9 deletions
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py
index e3ba70e4e5..4069d7a149 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -150,26 +150,30 @@ class StreamingContext(object):
@param checkpointPath: Checkpoint directory used in an earlier streaming program
@param setupFunc: Function to create a new context and setup DStreams
"""
- # TODO: support checkpoint in HDFS
- if not os.path.exists(checkpointPath) or not os.listdir(checkpointPath):
+ cls._ensure_initialized()
+ gw = SparkContext._gateway
+
+ # Check whether valid checkpoint information exists in the given path
+ if gw.jvm.CheckpointReader.read(checkpointPath).isEmpty():
ssc = setupFunc()
ssc.checkpoint(checkpointPath)
return ssc
- cls._ensure_initialized()
- gw = SparkContext._gateway
-
try:
jssc = gw.jvm.JavaStreamingContext(checkpointPath)
except Exception:
print("failed to load StreamingContext from checkpoint", file=sys.stderr)
raise
- jsc = jssc.sparkContext()
- conf = SparkConf(_jconf=jsc.getConf())
- sc = SparkContext(conf=conf, gateway=gw, jsc=jsc)
+ # If there is already an active instance of Python SparkContext use it, or create a new one
+ if not SparkContext._active_spark_context:
+ jsc = jssc.sparkContext()
+ conf = SparkConf(_jconf=jsc.getConf())
+ SparkContext(conf=conf, gateway=gw, jsc=jsc)
+
+ sc = SparkContext._active_spark_context
+
# update ctx in serializer
- SparkContext._active_spark_context = sc
cls._transformerSerializer.ctx = sc
return StreamingContext(sc, None, jssc)