diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2015-11-20 14:23:01 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-11-20 14:23:01 -0800 |
commit | be7a2cfd978143f6f265eca63e9e24f755bc9f22 (patch) | |
tree | 02b6e225f60847679f00241e6ada28776ead0fcc /python/pyspark/streaming/tests.py | |
parent | 9ed4ad4265cf9d3135307eb62dae6de0b220fc21 (diff) | |
download | spark-be7a2cfd978143f6f265eca63e9e24f755bc9f22.tar.gz spark-be7a2cfd978143f6f265eca63e9e24f755bc9f22.tar.bz2 spark-be7a2cfd978143f6f265eca63e9e24f755bc9f22.zip |
[SPARK-11870][STREAMING][PYSPARK] Rethrow the exceptions in TransformFunction and TransformFunctionSerializer
TransformFunction and TransformFunctionSerializer don't rethrow the exception, so when any exception happens, it just return None. This will cause some weird NPE and confuse people.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #9847 from zsxwing/pyspark-streaming-exception.
Diffstat (limited to 'python/pyspark/streaming/tests.py')
-rw-r--r-- | python/pyspark/streaming/tests.py | 16 |
1 files changed, 16 insertions, 0 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 3403f6d20d..a0e0267caf 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -403,6 +403,22 @@ class BasicOperationTests(PySparkStreamingTestCase): expected = [[('k', v)] for v in expected] self._test_func(input, func, expected) + def test_failed_func(self): + input = [self.sc.parallelize([d], 1) for d in range(4)] + input_stream = self.ssc.queueStream(input) + + def failed_func(i): + raise ValueError("failed") + + input_stream.map(failed_func).pprint() + self.ssc.start() + try: + self.ssc.awaitTerminationOrTimeout(10) + except: + return + + self.fail("a failed func should throw an error") + class StreamingListenerTests(PySparkStreamingTestCase): |