aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/streaming/tests.py9
1 files changed, 4 insertions, 5 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 2983028413..ff95639146 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -753,7 +753,6 @@ class CheckpointTests(unittest.TestCase):
if self.cpd is not None:
shutil.rmtree(self.cpd)
- @unittest.skip("Enable it when we fix the checkpoint bug")
def test_get_or_create_and_get_active_or_create(self):
inputd = tempfile.mkdtemp()
outputd = tempfile.mkdtemp() + "/"
@@ -822,11 +821,11 @@ class CheckpointTests(unittest.TestCase):
# Verify that getOrCreate() uses existing SparkContext
self.ssc.stop(True, True)
time.sleep(1)
- sc = SparkContext(SparkConf())
+ self.sc = SparkContext(conf=SparkConf())
self.setupCalled = False
self.ssc = StreamingContext.getOrCreate(self.cpd, setup)
self.assertFalse(self.setupCalled)
- self.assertTrue(self.ssc.sparkContext == sc)
+ self.assertTrue(self.ssc.sparkContext == self.sc)
# Verify the getActiveOrCreate() recovers from checkpoint files
self.ssc.stop(True, True)
@@ -845,11 +844,11 @@ class CheckpointTests(unittest.TestCase):
# Verify that getActiveOrCreate() uses existing SparkContext
self.ssc.stop(True, True)
time.sleep(1)
- self.sc = SparkContext(SparkConf())
+ self.sc = SparkContext(conf=SparkConf())
self.setupCalled = False
self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup)
self.assertFalse(self.setupCalled)
- self.assertTrue(self.ssc.sparkContext == sc)
+ self.assertTrue(self.ssc.sparkContext == self.sc)
# Verify that getActiveOrCreate() calls setup() in absence of checkpoint files
self.ssc.stop(True, True)