From 053d94fcf32268369b5a40837271f15d6af41aa4 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sun, 23 Aug 2015 19:24:32 -0700 Subject: [SPARK-10142] [STREAMING] Made python checkpoint recovery handle non-local checkpoint paths and existing SparkContexts The current code only checks checkpoint files in local filesystem, and always tries to create a new Python SparkContext (even if one already exists). The solution is to do the following: 1. Use the same code path as Java to check whether a valid checkpoint exists 2. Create a new Python SparkContext only if there no active one. There is not test for the path as its hard to test with distributed filesystem paths in a local unit test. I am going to test it with a distributed file system manually to verify that this patch works. Author: Tathagata Das Closes #8366 from tdas/SPARK-10142 and squashes the following commits: 3afa666 [Tathagata Das] Added tests 2dd4ae5 [Tathagata Das] Added the check to not create a context if one already exists 9bf151b [Tathagata Das] Made python checkpoint recovery use java to find the checkpoint files --- python/pyspark/streaming/context.py | 22 +++++++++++-------- python/pyspark/streaming/tests.py | 43 +++++++++++++++++++++++++++++++------ 2 files changed, 49 insertions(+), 16 deletions(-) (limited to 'python/pyspark') diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index e3ba70e4e5..4069d7a149 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -150,26 +150,30 @@ class StreamingContext(object): @param checkpointPath: Checkpoint directory used in an earlier streaming program @param setupFunc: Function to create a new context and setup DStreams """ - # TODO: support checkpoint in HDFS - if not os.path.exists(checkpointPath) or not os.listdir(checkpointPath): + cls._ensure_initialized() + gw = SparkContext._gateway + + # Check whether valid checkpoint information exists in the given path + if gw.jvm.CheckpointReader.read(checkpointPath).isEmpty(): ssc = setupFunc() ssc.checkpoint(checkpointPath) return ssc - cls._ensure_initialized() - gw = SparkContext._gateway - try: jssc = gw.jvm.JavaStreamingContext(checkpointPath) except Exception: print("failed to load StreamingContext from checkpoint", file=sys.stderr) raise - jsc = jssc.sparkContext() - conf = SparkConf(_jconf=jsc.getConf()) - sc = SparkContext(conf=conf, gateway=gw, jsc=jsc) + # If there is already an active instance of Python SparkContext use it, or create a new one + if not SparkContext._active_spark_context: + jsc = jssc.sparkContext() + conf = SparkConf(_jconf=jsc.getConf()) + SparkContext(conf=conf, gateway=gw, jsc=jsc) + + sc = SparkContext._active_spark_context + # update ctx in serializer - SparkContext._active_spark_context = sc cls._transformerSerializer.ctx = sc return StreamingContext(sc, None, jssc) diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 214d5be439..510a4f2b3e 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -603,6 +603,10 @@ class CheckpointTests(unittest.TestCase): def tearDown(self): if self.ssc is not None: self.ssc.stop(True) + if self.sc is not None: + self.sc.stop() + if self.cpd is not None: + shutil.rmtree(self.cpd) def test_get_or_create_and_get_active_or_create(self): inputd = tempfile.mkdtemp() @@ -622,8 +626,12 @@ class CheckpointTests(unittest.TestCase): self.setupCalled = True return ssc - cpd = tempfile.mkdtemp("test_streaming_cps") - self.ssc = StreamingContext.getOrCreate(cpd, setup) + # Verify that getOrCreate() calls setup() in absence of checkpoint files + self.cpd = tempfile.mkdtemp("test_streaming_cps") + self.setupCalled = False + self.ssc = StreamingContext.getOrCreate(self.cpd, setup) + self.assertFalse(self.setupCalled) + self.ssc.start() def check_output(n): @@ -660,31 +668,52 @@ class CheckpointTests(unittest.TestCase): self.ssc.stop(True, True) time.sleep(1) self.setupCalled = False - self.ssc = StreamingContext.getOrCreate(cpd, setup) + self.ssc = StreamingContext.getOrCreate(self.cpd, setup) self.assertFalse(self.setupCalled) self.ssc.start() check_output(3) + # Verify that getOrCreate() uses existing SparkContext + self.ssc.stop(True, True) + time.sleep(1) + sc = SparkContext(SparkConf()) + self.setupCalled = False + self.ssc = StreamingContext.getOrCreate(self.cpd, setup) + self.assertFalse(self.setupCalled) + self.assertTrue(self.ssc.sparkContext == sc) + # Verify the getActiveOrCreate() recovers from checkpoint files self.ssc.stop(True, True) time.sleep(1) self.setupCalled = False - self.ssc = StreamingContext.getActiveOrCreate(cpd, setup) + self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup) self.assertFalse(self.setupCalled) self.ssc.start() check_output(4) # Verify that getActiveOrCreate() returns active context self.setupCalled = False - self.assertEquals(StreamingContext.getActiveOrCreate(cpd, setup), self.ssc) + self.assertEquals(StreamingContext.getActiveOrCreate(self.cpd, setup), self.ssc) self.assertFalse(self.setupCalled) + # Verify that getActiveOrCreate() uses existing SparkContext + self.ssc.stop(True, True) + time.sleep(1) + self.sc = SparkContext(SparkConf()) + self.setupCalled = False + self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup) + self.assertFalse(self.setupCalled) + self.assertTrue(self.ssc.sparkContext == sc) + # Verify that getActiveOrCreate() calls setup() in absence of checkpoint files self.ssc.stop(True, True) - shutil.rmtree(cpd) # delete checkpoint directory + shutil.rmtree(self.cpd) # delete checkpoint directory + time.sleep(1) self.setupCalled = False - self.ssc = StreamingContext.getActiveOrCreate(cpd, setup) + self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup) self.assertTrue(self.setupCalled) + + # Stop everything self.ssc.stop(True, True) -- cgit v1.2.3