aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/tests.py
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-08-23 19:24:32 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-23 19:24:32 -0700
commit053d94fcf32268369b5a40837271f15d6af41aa4 (patch)
tree04e0e1d58f4d291c4bd50a7d59444eec0c88f1d5 /python/pyspark/streaming/tests.py
parentb963c19a803c5a26c9b65655d40ca6621acf8bd4 (diff)
downloadspark-053d94fcf32268369b5a40837271f15d6af41aa4.tar.gz
spark-053d94fcf32268369b5a40837271f15d6af41aa4.tar.bz2
spark-053d94fcf32268369b5a40837271f15d6af41aa4.zip
[SPARK-10142] [STREAMING] Made python checkpoint recovery handle non-local checkpoint paths and existing SparkContexts
The current code only checks checkpoint files in local filesystem, and always tries to create a new Python SparkContext (even if one already exists). The solution is to do the following: 1. Use the same code path as Java to check whether a valid checkpoint exists 2. Create a new Python SparkContext only if there no active one. There is not test for the path as its hard to test with distributed filesystem paths in a local unit test. I am going to test it with a distributed file system manually to verify that this patch works. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #8366 from tdas/SPARK-10142 and squashes the following commits: 3afa666 [Tathagata Das] Added tests 2dd4ae5 [Tathagata Das] Added the check to not create a context if one already exists 9bf151b [Tathagata Das] Made python checkpoint recovery use java to find the checkpoint files
Diffstat (limited to 'python/pyspark/streaming/tests.py')
-rw-r--r--python/pyspark/streaming/tests.py43
1 files changed, 36 insertions, 7 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 214d5be439..510a4f2b3e 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -603,6 +603,10 @@ class CheckpointTests(unittest.TestCase):
def tearDown(self):
if self.ssc is not None:
self.ssc.stop(True)
+ if self.sc is not None:
+ self.sc.stop()
+ if self.cpd is not None:
+ shutil.rmtree(self.cpd)
def test_get_or_create_and_get_active_or_create(self):
inputd = tempfile.mkdtemp()
@@ -622,8 +626,12 @@ class CheckpointTests(unittest.TestCase):
self.setupCalled = True
return ssc
- cpd = tempfile.mkdtemp("test_streaming_cps")
- self.ssc = StreamingContext.getOrCreate(cpd, setup)
+ # Verify that getOrCreate() calls setup() in absence of checkpoint files
+ self.cpd = tempfile.mkdtemp("test_streaming_cps")
+ self.setupCalled = False
+ self.ssc = StreamingContext.getOrCreate(self.cpd, setup)
+ self.assertFalse(self.setupCalled)
+
self.ssc.start()
def check_output(n):
@@ -660,31 +668,52 @@ class CheckpointTests(unittest.TestCase):
self.ssc.stop(True, True)
time.sleep(1)
self.setupCalled = False
- self.ssc = StreamingContext.getOrCreate(cpd, setup)
+ self.ssc = StreamingContext.getOrCreate(self.cpd, setup)
self.assertFalse(self.setupCalled)
self.ssc.start()
check_output(3)
+ # Verify that getOrCreate() uses existing SparkContext
+ self.ssc.stop(True, True)
+ time.sleep(1)
+ sc = SparkContext(SparkConf())
+ self.setupCalled = False
+ self.ssc = StreamingContext.getOrCreate(self.cpd, setup)
+ self.assertFalse(self.setupCalled)
+ self.assertTrue(self.ssc.sparkContext == sc)
+
# Verify the getActiveOrCreate() recovers from checkpoint files
self.ssc.stop(True, True)
time.sleep(1)
self.setupCalled = False
- self.ssc = StreamingContext.getActiveOrCreate(cpd, setup)
+ self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup)
self.assertFalse(self.setupCalled)
self.ssc.start()
check_output(4)
# Verify that getActiveOrCreate() returns active context
self.setupCalled = False
- self.assertEquals(StreamingContext.getActiveOrCreate(cpd, setup), self.ssc)
+ self.assertEquals(StreamingContext.getActiveOrCreate(self.cpd, setup), self.ssc)
self.assertFalse(self.setupCalled)
+ # Verify that getActiveOrCreate() uses existing SparkContext
+ self.ssc.stop(True, True)
+ time.sleep(1)
+ self.sc = SparkContext(SparkConf())
+ self.setupCalled = False
+ self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup)
+ self.assertFalse(self.setupCalled)
+ self.assertTrue(self.ssc.sparkContext == sc)
+
# Verify that getActiveOrCreate() calls setup() in absence of checkpoint files
self.ssc.stop(True, True)
- shutil.rmtree(cpd) # delete checkpoint directory
+ shutil.rmtree(self.cpd) # delete checkpoint directory
+ time.sleep(1)
self.setupCalled = False
- self.ssc = StreamingContext.getActiveOrCreate(cpd, setup)
+ self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup)
self.assertTrue(self.setupCalled)
+
+ # Stop everything
self.ssc.stop(True, True)