diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2015-11-17 14:48:29 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-11-17 14:48:29 -0800 |
commit | 928d631625297857fb6998fbeb0696917fbfd60f (patch) | |
tree | 16c2446b3381accc01b58d577db60fed24fe387e /python | |
parent | 936bc0bcbf957fa1d7cb5cfe88d628c830df5981 (diff) | |
download | spark-928d631625297857fb6998fbeb0696917fbfd60f.tar.gz spark-928d631625297857fb6998fbeb0696917fbfd60f.tar.bz2 spark-928d631625297857fb6998fbeb0696917fbfd60f.zip |
[SPARK-11740][STREAMING] Fix the race condition of two checkpoints in a batch
We will do checkpoint when generating a batch and completing a batch. When the processing time of a batch is greater than the batch interval, checkpointing for completing an old batch may run after checkpointing for generating a new batch. If this happens, checkpoint of an old batch actually has the latest information, so we want to recovery from it. This PR will use the latest checkpoint time as the file name, so that we can always recovery from the latest checkpoint file.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #9707 from zsxwing/fix-checkpoint.
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/streaming/tests.py | 9 |
1 files changed, 4 insertions, 5 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 2983028413..ff95639146 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -753,7 +753,6 @@ class CheckpointTests(unittest.TestCase): if self.cpd is not None: shutil.rmtree(self.cpd) - @unittest.skip("Enable it when we fix the checkpoint bug") def test_get_or_create_and_get_active_or_create(self): inputd = tempfile.mkdtemp() outputd = tempfile.mkdtemp() + "/" @@ -822,11 +821,11 @@ class CheckpointTests(unittest.TestCase): # Verify that getOrCreate() uses existing SparkContext self.ssc.stop(True, True) time.sleep(1) - sc = SparkContext(SparkConf()) + self.sc = SparkContext(conf=SparkConf()) self.setupCalled = False self.ssc = StreamingContext.getOrCreate(self.cpd, setup) self.assertFalse(self.setupCalled) - self.assertTrue(self.ssc.sparkContext == sc) + self.assertTrue(self.ssc.sparkContext == self.sc) # Verify the getActiveOrCreate() recovers from checkpoint files self.ssc.stop(True, True) @@ -845,11 +844,11 @@ class CheckpointTests(unittest.TestCase): # Verify that getActiveOrCreate() uses existing SparkContext self.ssc.stop(True, True) time.sleep(1) - self.sc = SparkContext(SparkConf()) + self.sc = SparkContext(conf=SparkConf()) self.setupCalled = False self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup) self.assertFalse(self.setupCalled) - self.assertTrue(self.ssc.sparkContext == sc) + self.assertTrue(self.ssc.sparkContext == self.sc) # Verify that getActiveOrCreate() calls setup() in absence of checkpoint files self.ssc.stop(True, True) |