diff options
author | Nick Evans <me@nicolasevans.org> | 2015-11-05 09:18:20 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2015-11-05 09:18:20 +0000 |
commit | 859dff56eb0f8c63c86e7e900a12340c199e6247 (patch) | |
tree | fe67298cc4acce37b3772f8d99b5621508cb7d6e | |
parent | 6f81eae24f83df51a99d4bb2629dd7daadc01519 (diff) | |
download | spark-859dff56eb0f8c63c86e7e900a12340c199e6247.tar.gz spark-859dff56eb0f8c63c86e7e900a12340c199e6247.tar.bz2 spark-859dff56eb0f8c63c86e7e900a12340c199e6247.zip |
[SPARK-11378][STREAMING] make StreamingContext.awaitTerminationOrTimeout return properly
This adds a failing test checking that `awaitTerminationOrTimeout` returns the expected value, and then fixes that failing test with the addition of a `return`.
tdas zsxwing
Author: Nick Evans <me@nicolasevans.org>
Closes #9336 from manygrams/fix_await_termination_or_timeout.
-rw-r--r-- | python/pyspark/streaming/context.py | 2 | ||||
-rw-r--r-- | python/pyspark/streaming/tests.py | 7 |
2 files changed, 8 insertions, 1 deletions
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 975c754732..8be56c9915 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -218,7 +218,7 @@ class StreamingContext(object): @param timeout: time to wait in seconds """ - self._jssc.awaitTerminationOrTimeout(int(timeout * 1000)) + return self._jssc.awaitTerminationOrTimeout(int(timeout * 1000)) def stop(self, stopSparkContext=True, stopGraceFully=False): """ diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index f7fa481d50..179479625b 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -596,6 +596,13 @@ class StreamingContextTests(PySparkStreamingTestCase): self.ssc = StreamingContext.getActiveOrCreate(None, setupFunc) self.assertTrue(self.setupCalled) + def test_await_termination_or_timeout(self): + self._add_input_stream() + self.ssc.start() + self.assertFalse(self.ssc.awaitTerminationOrTimeout(0.001)) + self.ssc.stop(False) + self.assertTrue(self.ssc.awaitTerminationOrTimeout(0.001)) + class CheckpointTests(unittest.TestCase): |