aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/tests.py
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-12-05 11:36:11 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-12-05 11:36:11 -0800
commit246012859f0ed5248809a2e00e8355fbdaa8beb5 (patch)
tree85a7d76e1e438b7839c70118d18c3f6c87e5f7b1 /python/pyspark/sql/tests.py
parent410b7898661f77e748564aaee6a5ab7747ce34ad (diff)
downloadspark-246012859f0ed5248809a2e00e8355fbdaa8beb5.tar.gz
spark-246012859f0ed5248809a2e00e8355fbdaa8beb5.tar.bz2
spark-246012859f0ed5248809a2e00e8355fbdaa8beb5.zip
[SPARK-18694][SS] Add StreamingQuery.explain and exception to Python and fix StreamingQueryException
## What changes were proposed in this pull request? - Add StreamingQuery.explain and exception to Python. - Fix StreamingQueryException to not expose `OffsetSeq`. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #16125 from zsxwing/py-streaming-explain.
Diffstat (limited to 'python/pyspark/sql/tests.py')
-rw-r--r--python/pyspark/sql/tests.py29
1 files changed, 29 insertions, 0 deletions
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 0aff9cebe9..9f34414f64 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1137,6 +1137,35 @@ class SQLTests(ReusedPySparkTestCase):
q.stop()
shutil.rmtree(tmpPath)
+ def test_stream_exception(self):
+ sdf = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
+ sq = sdf.writeStream.format('memory').queryName('query_explain').start()
+ try:
+ sq.processAllAvailable()
+ self.assertEqual(sq.exception(), None)
+ finally:
+ sq.stop()
+
+ from pyspark.sql.functions import col, udf
+ from pyspark.sql.utils import StreamingQueryException
+ bad_udf = udf(lambda x: 1 / 0)
+ sq = sdf.select(bad_udf(col("value")))\
+ .writeStream\
+ .format('memory')\
+ .queryName('this_query')\
+ .start()
+ try:
+ # Process some data to fail the query
+ sq.processAllAvailable()
+ self.fail("bad udf should fail the query")
+ except StreamingQueryException as e:
+ # This is expected
+ self.assertTrue("ZeroDivisionError" in e.desc)
+ finally:
+ sq.stop()
+ self.assertTrue(type(sq.exception()) is StreamingQueryException)
+ self.assertTrue("ZeroDivisionError" in sq.exception().desc)
+
def test_query_manager_await_termination(self):
df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
for q in self.spark._wrapped.streams.active: