aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-08-23 19:24:32 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-23 19:24:42 -0700
commitb40059dbda4dafbb883a53fbd5c5f69bc01a3e19 (patch)
tree0d17b29cf9937e799891f5ab15daf8183f19470a
parent00f812d38aeb179290a710e3af1e0c11cc16da71 (diff)
downloadspark-b40059dbda4dafbb883a53fbd5c5f69bc01a3e19.tar.gz
spark-b40059dbda4dafbb883a53fbd5c5f69bc01a3e19.tar.bz2
spark-b40059dbda4dafbb883a53fbd5c5f69bc01a3e19.zip
[SPARK-10142] [STREAMING] Made python checkpoint recovery handle non-local checkpoint paths and existing SparkContexts
The current code only checks checkpoint files in local filesystem, and always tries to create a new Python SparkContext (even if one already exists). The solution is to do the following: 1. Use the same code path as Java to check whether a valid checkpoint exists 2. Create a new Python SparkContext only if there no active one. There is not test for the path as its hard to test with distributed filesystem paths in a local unit test. I am going to test it with a distributed file system manually to verify that this patch works. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #8366 from tdas/SPARK-10142 and squashes the following commits: 3afa666 [Tathagata Das] Added tests 2dd4ae5 [Tathagata Das] Added the check to not create a context if one already exists 9bf151b [Tathagata Das] Made python checkpoint recovery use java to find the checkpoint files (cherry picked from commit 053d94fcf32268369b5a40837271f15d6af41aa4) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
-rw-r--r--python/pyspark/streaming/context.py22
-rw-r--r--python/pyspark/streaming/tests.py43
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala9
3 files changed, 58 insertions, 16 deletions
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py
index e3ba70e4e5..4069d7a149 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -150,26 +150,30 @@ class StreamingContext(object):
@param checkpointPath: Checkpoint directory used in an earlier streaming program
@param setupFunc: Function to create a new context and setup DStreams
"""
- # TODO: support checkpoint in HDFS
- if not os.path.exists(checkpointPath) or not os.listdir(checkpointPath):
+ cls._ensure_initialized()
+ gw = SparkContext._gateway
+
+ # Check whether valid checkpoint information exists in the given path
+ if gw.jvm.CheckpointReader.read(checkpointPath).isEmpty():
ssc = setupFunc()
ssc.checkpoint(checkpointPath)
return ssc
- cls._ensure_initialized()
- gw = SparkContext._gateway
-
try:
jssc = gw.jvm.JavaStreamingContext(checkpointPath)
except Exception:
print("failed to load StreamingContext from checkpoint", file=sys.stderr)
raise
- jsc = jssc.sparkContext()
- conf = SparkConf(_jconf=jsc.getConf())
- sc = SparkContext(conf=conf, gateway=gw, jsc=jsc)
+ # If there is already an active instance of Python SparkContext use it, or create a new one
+ if not SparkContext._active_spark_context:
+ jsc = jssc.sparkContext()
+ conf = SparkConf(_jconf=jsc.getConf())
+ SparkContext(conf=conf, gateway=gw, jsc=jsc)
+
+ sc = SparkContext._active_spark_context
+
# update ctx in serializer
- SparkContext._active_spark_context = sc
cls._transformerSerializer.ctx = sc
return StreamingContext(sc, None, jssc)
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 214d5be439..510a4f2b3e 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -603,6 +603,10 @@ class CheckpointTests(unittest.TestCase):
def tearDown(self):
if self.ssc is not None:
self.ssc.stop(True)
+ if self.sc is not None:
+ self.sc.stop()
+ if self.cpd is not None:
+ shutil.rmtree(self.cpd)
def test_get_or_create_and_get_active_or_create(self):
inputd = tempfile.mkdtemp()
@@ -622,8 +626,12 @@ class CheckpointTests(unittest.TestCase):
self.setupCalled = True
return ssc
- cpd = tempfile.mkdtemp("test_streaming_cps")
- self.ssc = StreamingContext.getOrCreate(cpd, setup)
+ # Verify that getOrCreate() calls setup() in absence of checkpoint files
+ self.cpd = tempfile.mkdtemp("test_streaming_cps")
+ self.setupCalled = False
+ self.ssc = StreamingContext.getOrCreate(self.cpd, setup)
+ self.assertFalse(self.setupCalled)
+
self.ssc.start()
def check_output(n):
@@ -660,31 +668,52 @@ class CheckpointTests(unittest.TestCase):
self.ssc.stop(True, True)
time.sleep(1)
self.setupCalled = False
- self.ssc = StreamingContext.getOrCreate(cpd, setup)
+ self.ssc = StreamingContext.getOrCreate(self.cpd, setup)
self.assertFalse(self.setupCalled)
self.ssc.start()
check_output(3)
+ # Verify that getOrCreate() uses existing SparkContext
+ self.ssc.stop(True, True)
+ time.sleep(1)
+ sc = SparkContext(SparkConf())
+ self.setupCalled = False
+ self.ssc = StreamingContext.getOrCreate(self.cpd, setup)
+ self.assertFalse(self.setupCalled)
+ self.assertTrue(self.ssc.sparkContext == sc)
+
# Verify the getActiveOrCreate() recovers from checkpoint files
self.ssc.stop(True, True)
time.sleep(1)
self.setupCalled = False
- self.ssc = StreamingContext.getActiveOrCreate(cpd, setup)
+ self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup)
self.assertFalse(self.setupCalled)
self.ssc.start()
check_output(4)
# Verify that getActiveOrCreate() returns active context
self.setupCalled = False
- self.assertEquals(StreamingContext.getActiveOrCreate(cpd, setup), self.ssc)
+ self.assertEquals(StreamingContext.getActiveOrCreate(self.cpd, setup), self.ssc)
self.assertFalse(self.setupCalled)
+ # Verify that getActiveOrCreate() uses existing SparkContext
+ self.ssc.stop(True, True)
+ time.sleep(1)
+ self.sc = SparkContext(SparkConf())
+ self.setupCalled = False
+ self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup)
+ self.assertFalse(self.setupCalled)
+ self.assertTrue(self.ssc.sparkContext == sc)
+
# Verify that getActiveOrCreate() calls setup() in absence of checkpoint files
self.ssc.stop(True, True)
- shutil.rmtree(cpd) # delete checkpoint directory
+ shutil.rmtree(self.cpd) # delete checkpoint directory
+ time.sleep(1)
self.setupCalled = False
- self.ssc = StreamingContext.getActiveOrCreate(cpd, setup)
+ self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup)
self.assertTrue(self.setupCalled)
+
+ # Stop everything
self.ssc.stop(True, True)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 6f6b449acc..cd5d960369 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -289,6 +289,15 @@ object CheckpointReader extends Logging {
/**
* Read checkpoint files present in the given checkpoint directory. If there are no checkpoint
* files, then return None, else try to return the latest valid checkpoint object. If no
+ * checkpoint files could be read correctly, then return None.
+ */
+ def read(checkpointDir: String): Option[Checkpoint] = {
+ read(checkpointDir, new SparkConf(), SparkHadoopUtil.get.conf, ignoreReadError = true)
+ }
+
+ /**
+ * Read checkpoint files present in the given checkpoint directory. If there are no checkpoint
+ * files, then return None, else try to return the latest valid checkpoint object. If no
* checkpoint files could be read correctly, then return None (if ignoreReadError = true),
* or throw exception (if ignoreReadError = false).
*/