aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming
diff options
context:
space:
mode:
authorNick Evans <me@nicolasevans.org>2015-11-05 09:18:20 +0000
committerSean Owen <sowen@cloudera.com>2015-11-05 09:18:20 +0000
commit859dff56eb0f8c63c86e7e900a12340c199e6247 (patch)
treefe67298cc4acce37b3772f8d99b5621508cb7d6e /python/pyspark/streaming
parent6f81eae24f83df51a99d4bb2629dd7daadc01519 (diff)
downloadspark-859dff56eb0f8c63c86e7e900a12340c199e6247.tar.gz
spark-859dff56eb0f8c63c86e7e900a12340c199e6247.tar.bz2
spark-859dff56eb0f8c63c86e7e900a12340c199e6247.zip
[SPARK-11378][STREAMING] make StreamingContext.awaitTerminationOrTimeout return properly
This adds a failing test checking that `awaitTerminationOrTimeout` returns the expected value, and then fixes that failing test with the addition of a `return`. tdas zsxwing Author: Nick Evans <me@nicolasevans.org> Closes #9336 from manygrams/fix_await_termination_or_timeout.
Diffstat (limited to 'python/pyspark/streaming')
-rw-r--r--python/pyspark/streaming/context.py2
-rw-r--r--python/pyspark/streaming/tests.py7
2 files changed, 8 insertions, 1 deletions
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py
index 975c754732..8be56c9915 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -218,7 +218,7 @@ class StreamingContext(object):
@param timeout: time to wait in seconds
"""
- self._jssc.awaitTerminationOrTimeout(int(timeout * 1000))
+ return self._jssc.awaitTerminationOrTimeout(int(timeout * 1000))
def stop(self, stopSparkContext=True, stopGraceFully=False):
"""
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index f7fa481d50..179479625b 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -596,6 +596,13 @@ class StreamingContextTests(PySparkStreamingTestCase):
self.ssc = StreamingContext.getActiveOrCreate(None, setupFunc)
self.assertTrue(self.setupCalled)
+ def test_await_termination_or_timeout(self):
+ self._add_input_stream()
+ self.ssc.start()
+ self.assertFalse(self.ssc.awaitTerminationOrTimeout(0.001))
+ self.ssc.stop(False)
+ self.assertTrue(self.ssc.awaitTerminationOrTimeout(0.001))
+
class CheckpointTests(unittest.TestCase):