aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/streaming.py
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-12-05 11:36:11 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-12-05 11:36:11 -0800
commit246012859f0ed5248809a2e00e8355fbdaa8beb5 (patch)
tree85a7d76e1e438b7839c70118d18c3f6c87e5f7b1 /python/pyspark/sql/streaming.py
parent410b7898661f77e748564aaee6a5ab7747ce34ad (diff)
downloadspark-246012859f0ed5248809a2e00e8355fbdaa8beb5.tar.gz
spark-246012859f0ed5248809a2e00e8355fbdaa8beb5.tar.bz2
spark-246012859f0ed5248809a2e00e8355fbdaa8beb5.zip
[SPARK-18694][SS] Add StreamingQuery.explain and exception to Python and fix StreamingQueryException
## What changes were proposed in this pull request? - Add StreamingQuery.explain and exception to Python. - Fix StreamingQueryException to not expose `OffsetSeq`. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #16125 from zsxwing/py-streaming-explain.
Diffstat (limited to 'python/pyspark/sql/streaming.py')
-rw-r--r--python/pyspark/sql/streaming.py40
1 files changed, 40 insertions, 0 deletions
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 84f01d3d9a..4a7d17ba51 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -30,6 +30,7 @@ from pyspark import since, keyword_only
from pyspark.rdd import ignore_unicode_prefix
from pyspark.sql.readwriter import OptionUtils, to_str
from pyspark.sql.types import *
+from pyspark.sql.utils import StreamingQueryException
__all__ = ["StreamingQuery", "StreamingQueryManager", "DataStreamReader", "DataStreamWriter"]
@@ -132,6 +133,45 @@ class StreamingQuery(object):
"""
self._jsq.stop()
+ @since(2.1)
+ def explain(self, extended=False):
+ """Prints the (logical and physical) plans to the console for debugging purpose.
+
+ :param extended: boolean, default ``False``. If ``False``, prints only the physical plan.
+
+ >>> sq = sdf.writeStream.format('memory').queryName('query_explain').start()
+ >>> sq.processAllAvailable() # Wait a bit to generate the runtime plans.
+ >>> sq.explain()
+ == Physical Plan ==
+ ...
+ >>> sq.explain(True)
+ == Parsed Logical Plan ==
+ ...
+ == Analyzed Logical Plan ==
+ ...
+ == Optimized Logical Plan ==
+ ...
+ == Physical Plan ==
+ ...
+ >>> sq.stop()
+ """
+ # Cannot call `_jsq.explain(...)` because it will print in the JVM process.
+ # We should print it in the Python process.
+ print(self._jsq.explainInternal(extended))
+
+ @since(2.1)
+ def exception(self):
+ """
+ :return: the StreamingQueryException if the query was terminated by an exception, or None.
+ """
+ if self._jsq.exception().isDefined():
+ je = self._jsq.exception().get()
+ msg = je.toString().split(': ', 1)[1] # Drop the Java StreamingQueryException type info
+ stackTrace = '\n\t at '.join(map(lambda x: x.toString(), je.getStackTrace()))
+ return StreamingQueryException(msg, stackTrace)
+ else:
+ return None
+
class StreamingQueryManager(object):
"""A class to manage all the :class:`StreamingQuery` StreamingQueries active.