diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-12-05 11:36:11 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2016-12-05 11:36:11 -0800 |
commit | 246012859f0ed5248809a2e00e8355fbdaa8beb5 (patch) | |
tree | 85a7d76e1e438b7839c70118d18c3f6c87e5f7b1 /python/pyspark/sql/streaming.py | |
parent | 410b7898661f77e748564aaee6a5ab7747ce34ad (diff) | |
download | spark-246012859f0ed5248809a2e00e8355fbdaa8beb5.tar.gz spark-246012859f0ed5248809a2e00e8355fbdaa8beb5.tar.bz2 spark-246012859f0ed5248809a2e00e8355fbdaa8beb5.zip |
[SPARK-18694][SS] Add StreamingQuery.explain and exception to Python and fix StreamingQueryException
## What changes were proposed in this pull request?
- Add StreamingQuery.explain and exception to Python.
- Fix StreamingQueryException to not expose `OffsetSeq`.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #16125 from zsxwing/py-streaming-explain.
Diffstat (limited to 'python/pyspark/sql/streaming.py')
-rw-r--r-- | python/pyspark/sql/streaming.py | 40 |
1 files changed, 40 insertions, 0 deletions
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 84f01d3d9a..4a7d17ba51 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -30,6 +30,7 @@ from pyspark import since, keyword_only from pyspark.rdd import ignore_unicode_prefix from pyspark.sql.readwriter import OptionUtils, to_str from pyspark.sql.types import * +from pyspark.sql.utils import StreamingQueryException __all__ = ["StreamingQuery", "StreamingQueryManager", "DataStreamReader", "DataStreamWriter"] @@ -132,6 +133,45 @@ class StreamingQuery(object): """ self._jsq.stop() + @since(2.1) + def explain(self, extended=False): + """Prints the (logical and physical) plans to the console for debugging purpose. + + :param extended: boolean, default ``False``. If ``False``, prints only the physical plan. + + >>> sq = sdf.writeStream.format('memory').queryName('query_explain').start() + >>> sq.processAllAvailable() # Wait a bit to generate the runtime plans. + >>> sq.explain() + == Physical Plan == + ... + >>> sq.explain(True) + == Parsed Logical Plan == + ... + == Analyzed Logical Plan == + ... + == Optimized Logical Plan == + ... + == Physical Plan == + ... + >>> sq.stop() + """ + # Cannot call `_jsq.explain(...)` because it will print in the JVM process. + # We should print it in the Python process. + print(self._jsq.explainInternal(extended)) + + @since(2.1) + def exception(self): + """ + :return: the StreamingQueryException if the query was terminated by an exception, or None. + """ + if self._jsq.exception().isDefined(): + je = self._jsq.exception().get() + msg = je.toString().split(': ', 1)[1] # Drop the Java StreamingQueryException type info + stackTrace = '\n\t at '.join(map(lambda x: x.toString(), je.getStackTrace())) + return StreamingQueryException(msg, stackTrace) + else: + return None + class StreamingQueryManager(object): """A class to manage all the :class:`StreamingQuery` StreamingQueries active. |