aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/tests.py
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-08-11 12:02:28 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-11 12:02:28 -0700
commit5b8bb1b213b8738f563fcd00747604410fbb3087 (patch)
tree9bb4f60b2b23b529a20754c028b419a82550a961 /python/pyspark/streaming/tests.py
parentdbd778d84d094ca142bc08c351478595b280bc2a (diff)
downloadspark-5b8bb1b213b8738f563fcd00747604410fbb3087.tar.gz
spark-5b8bb1b213b8738f563fcd00747604410fbb3087.tar.bz2
spark-5b8bb1b213b8738f563fcd00747604410fbb3087.zip
[SPARK-9572] [STREAMING] [PYSPARK] Added StreamingContext.getActiveOrCreate() in Python
Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #8080 from tdas/SPARK-9572 and squashes the following commits: 64a231d [Tathagata Das] Fix based on comments 741a0d0 [Tathagata Das] Fixed style f4f094c [Tathagata Das] Tweaked test 9afcdbe [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-9572 e21488d [Tathagata Das] Minor update 1a371d9 [Tathagata Das] Addressed comments. 60479da [Tathagata Das] Fixed indent 9c2da9c [Tathagata Das] Fixed bugs b5bd32c [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-9572 b55b348 [Tathagata Das] Removed prints 5781728 [Tathagata Das] Fix style issues b711214 [Tathagata Das] Reverted run-tests.py 643b59d [Tathagata Das] Revert unnecessary change 150e58c [Tathagata Das] Added StreamingContext.getActiveOrCreate() in Python
Diffstat (limited to 'python/pyspark/streaming/tests.py')
-rw-r--r--python/pyspark/streaming/tests.py133
1 files changed, 122 insertions, 11 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index f0ed415f97..6108c845c1 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -24,6 +24,7 @@ import operator
import tempfile
import random
import struct
+import shutil
from functools import reduce
if sys.version_info[:2] <= (2, 6):
@@ -59,12 +60,21 @@ class PySparkStreamingTestCase(unittest.TestCase):
@classmethod
def tearDownClass(cls):
cls.sc.stop()
+ # Clean up in the JVM just in case there has been some issues in Python API
+ jSparkContextOption = SparkContext._jvm.SparkContext.get()
+ if jSparkContextOption.nonEmpty():
+ jSparkContextOption.get().stop()
def setUp(self):
self.ssc = StreamingContext(self.sc, self.duration)
def tearDown(self):
- self.ssc.stop(False)
+ if self.ssc is not None:
+ self.ssc.stop(False)
+ # Clean up in the JVM just in case there has been some issues in Python API
+ jStreamingContextOption = StreamingContext._jvm.SparkContext.getActive()
+ if jStreamingContextOption.nonEmpty():
+ jStreamingContextOption.get().stop(False)
def wait_for(self, result, n):
start_time = time.time()
@@ -442,6 +452,7 @@ class WindowFunctionTests(PySparkStreamingTestCase):
class StreamingContextTests(PySparkStreamingTestCase):
duration = 0.1
+ setupCalled = False
def _add_input_stream(self):
inputs = [range(1, x) for x in range(101)]
@@ -515,10 +526,85 @@ class StreamingContextTests(PySparkStreamingTestCase):
self.assertEqual([2, 3, 1], self._take(dstream, 3))
+ def test_get_active(self):
+ self.assertEqual(StreamingContext.getActive(), None)
+
+ # Verify that getActive() returns the active context
+ self.ssc.queueStream([[1]]).foreachRDD(lambda rdd: rdd.count())
+ self.ssc.start()
+ self.assertEqual(StreamingContext.getActive(), self.ssc)
+
+ # Verify that getActive() returns None
+ self.ssc.stop(False)
+ self.assertEqual(StreamingContext.getActive(), None)
+
+ # Verify that if the Java context is stopped, then getActive() returns None
+ self.ssc = StreamingContext(self.sc, self.duration)
+ self.ssc.queueStream([[1]]).foreachRDD(lambda rdd: rdd.count())
+ self.ssc.start()
+ self.assertEqual(StreamingContext.getActive(), self.ssc)
+ self.ssc._jssc.stop(False)
+ self.assertEqual(StreamingContext.getActive(), None)
+
+ def test_get_active_or_create(self):
+ # Test StreamingContext.getActiveOrCreate() without checkpoint data
+ # See CheckpointTests for tests with checkpoint data
+ self.ssc = None
+ self.assertEqual(StreamingContext.getActive(), None)
+
+ def setupFunc():
+ ssc = StreamingContext(self.sc, self.duration)
+ ssc.queueStream([[1]]).foreachRDD(lambda rdd: rdd.count())
+ self.setupCalled = True
+ return ssc
+
+ # Verify that getActiveOrCreate() (w/o checkpoint) calls setupFunc when no context is active
+ self.setupCalled = False
+ self.ssc = StreamingContext.getActiveOrCreate(None, setupFunc)
+ self.assertTrue(self.setupCalled)
+
+ # Verify that getActiveOrCreate() retuns active context and does not call the setupFunc
+ self.ssc.start()
+ self.setupCalled = False
+ self.assertEqual(StreamingContext.getActiveOrCreate(None, setupFunc), self.ssc)
+ self.assertFalse(self.setupCalled)
+
+ # Verify that getActiveOrCreate() calls setupFunc after active context is stopped
+ self.ssc.stop(False)
+ self.setupCalled = False
+ self.ssc = StreamingContext.getActiveOrCreate(None, setupFunc)
+ self.assertTrue(self.setupCalled)
+
+ # Verify that if the Java context is stopped, then getActive() returns None
+ self.ssc = StreamingContext(self.sc, self.duration)
+ self.ssc.queueStream([[1]]).foreachRDD(lambda rdd: rdd.count())
+ self.ssc.start()
+ self.assertEqual(StreamingContext.getActive(), self.ssc)
+ self.ssc._jssc.stop(False)
+ self.setupCalled = False
+ self.ssc = StreamingContext.getActiveOrCreate(None, setupFunc)
+ self.assertTrue(self.setupCalled)
+
class CheckpointTests(unittest.TestCase):
- def test_get_or_create(self):
+ setupCalled = False
+
+ @staticmethod
+ def tearDownClass():
+ # Clean up in the JVM just in case there has been some issues in Python API
+ jStreamingContextOption = StreamingContext._jvm.SparkContext.getActive()
+ if jStreamingContextOption.nonEmpty():
+ jStreamingContextOption.get().stop()
+ jSparkContextOption = SparkContext._jvm.SparkContext.get()
+ if jSparkContextOption.nonEmpty():
+ jSparkContextOption.get().stop()
+
+ def tearDown(self):
+ if self.ssc is not None:
+ self.ssc.stop(True)
+
+ def test_get_or_create_and_get_active_or_create(self):
inputd = tempfile.mkdtemp()
outputd = tempfile.mkdtemp() + "/"
@@ -533,11 +619,12 @@ class CheckpointTests(unittest.TestCase):
wc = dstream.updateStateByKey(updater)
wc.map(lambda x: "%s,%d" % x).saveAsTextFiles(outputd + "test")
wc.checkpoint(.5)
+ self.setupCalled = True
return ssc
cpd = tempfile.mkdtemp("test_streaming_cps")
- ssc = StreamingContext.getOrCreate(cpd, setup)
- ssc.start()
+ self.ssc = StreamingContext.getOrCreate(cpd, setup)
+ self.ssc.start()
def check_output(n):
while not os.listdir(outputd):
@@ -552,7 +639,7 @@ class CheckpointTests(unittest.TestCase):
# not finished
time.sleep(0.01)
continue
- ordd = ssc.sparkContext.textFile(p).map(lambda line: line.split(","))
+ ordd = self.ssc.sparkContext.textFile(p).map(lambda line: line.split(","))
d = ordd.values().map(int).collect()
if not d:
time.sleep(0.01)
@@ -568,13 +655,37 @@ class CheckpointTests(unittest.TestCase):
check_output(1)
check_output(2)
- ssc.stop(True, True)
+ # Verify the getOrCreate() recovers from checkpoint files
+ self.ssc.stop(True, True)
time.sleep(1)
- ssc = StreamingContext.getOrCreate(cpd, setup)
- ssc.start()
+ self.setupCalled = False
+ self.ssc = StreamingContext.getOrCreate(cpd, setup)
+ self.assertFalse(self.setupCalled)
+ self.ssc.start()
check_output(3)
- ssc.stop(True, True)
+
+ # Verify the getActiveOrCreate() recovers from checkpoint files
+ self.ssc.stop(True, True)
+ time.sleep(1)
+ self.setupCalled = False
+ self.ssc = StreamingContext.getActiveOrCreate(cpd, setup)
+ self.assertFalse(self.setupCalled)
+ self.ssc.start()
+ check_output(4)
+
+ # Verify that getActiveOrCreate() returns active context
+ self.setupCalled = False
+ self.assertEquals(StreamingContext.getActiveOrCreate(cpd, setup), self.ssc)
+ self.assertFalse(self.setupCalled)
+
+ # Verify that getActiveOrCreate() calls setup() in absence of checkpoint files
+ self.ssc.stop(True, True)
+ shutil.rmtree(cpd) # delete checkpoint directory
+ self.setupCalled = False
+ self.ssc = StreamingContext.getActiveOrCreate(cpd, setup)
+ self.assertTrue(self.setupCalled)
+ self.ssc.stop(True, True)
class KafkaStreamTests(PySparkStreamingTestCase):
@@ -1134,7 +1245,7 @@ if __name__ == "__main__":
testcases.append(KinesisStreamTests)
elif are_kinesis_tests_enabled is False:
sys.stderr.write("Skipping all Kinesis Python tests as the optional Kinesis project was "
- "not compiled with -Pkinesis-asl profile. To run these tests, "
+ "not compiled into a JAR. To run these tests, "
"you need to build Spark with 'build/sbt -Pkinesis-asl assembly/assembly "
"streaming-kinesis-asl-assembly/assembly' or "
"'build/mvn -Pkinesis-asl package' before running this test.")
@@ -1150,4 +1261,4 @@ if __name__ == "__main__":
for testcase in testcases:
sys.stderr.write("[Running %s]\n" % (testcase))
tests = unittest.TestLoader().loadTestsFromTestCase(testcase)
- unittest.TextTestRunner(verbosity=2).run(tests)
+ unittest.TextTestRunner(verbosity=3).run(tests)