aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-12-05 11:36:11 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-12-05 11:36:11 -0800
commit246012859f0ed5248809a2e00e8355fbdaa8beb5 (patch)
tree85a7d76e1e438b7839c70118d18c3f6c87e5f7b1
parent410b7898661f77e748564aaee6a5ab7747ce34ad (diff)
downloadspark-246012859f0ed5248809a2e00e8355fbdaa8beb5.tar.gz
spark-246012859f0ed5248809a2e00e8355fbdaa8beb5.tar.bz2
spark-246012859f0ed5248809a2e00e8355fbdaa8beb5.zip
[SPARK-18694][SS] Add StreamingQuery.explain and exception to Python and fix StreamingQueryException
## What changes were proposed in this pull request? - Add StreamingQuery.explain and exception to Python. - Fix StreamingQueryException to not expose `OffsetSeq`. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #16125 from zsxwing/py-streaming-explain.
-rw-r--r--project/MimaExcludes.scala9
-rw-r--r--python/pyspark/sql/streaming.py40
-rw-r--r--python/pyspark/sql/tests.py29
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala42
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala10
8 files changed, 119 insertions, 25 deletions
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 7fed8cb008..f3e5a21d77 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -106,7 +106,14 @@ object MimaExcludes {
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_="),
// [SPARK-18236] Reduce duplicate objects in Spark UI and HistoryServer
- ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables")
+ ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables"),
+
+ // [SPARK-18694] Add StreamingQuery.explain and exception to Python and fix StreamingQueryException
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryException$"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.startOffset"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.endOffset"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.this"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryException.query")
)
}
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 84f01d3d9a..4a7d17ba51 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -30,6 +30,7 @@ from pyspark import since, keyword_only
from pyspark.rdd import ignore_unicode_prefix
from pyspark.sql.readwriter import OptionUtils, to_str
from pyspark.sql.types import *
+from pyspark.sql.utils import StreamingQueryException
__all__ = ["StreamingQuery", "StreamingQueryManager", "DataStreamReader", "DataStreamWriter"]
@@ -132,6 +133,45 @@ class StreamingQuery(object):
"""
self._jsq.stop()
+ @since(2.1)
+ def explain(self, extended=False):
+ """Prints the (logical and physical) plans to the console for debugging purpose.
+
+ :param extended: boolean, default ``False``. If ``False``, prints only the physical plan.
+
+ >>> sq = sdf.writeStream.format('memory').queryName('query_explain').start()
+ >>> sq.processAllAvailable() # Wait a bit to generate the runtime plans.
+ >>> sq.explain()
+ == Physical Plan ==
+ ...
+ >>> sq.explain(True)
+ == Parsed Logical Plan ==
+ ...
+ == Analyzed Logical Plan ==
+ ...
+ == Optimized Logical Plan ==
+ ...
+ == Physical Plan ==
+ ...
+ >>> sq.stop()
+ """
+ # Cannot call `_jsq.explain(...)` because it will print in the JVM process.
+ # We should print it in the Python process.
+ print(self._jsq.explainInternal(extended))
+
+ @since(2.1)
+ def exception(self):
+ """
+ :return: the StreamingQueryException if the query was terminated by an exception, or None.
+ """
+ if self._jsq.exception().isDefined():
+ je = self._jsq.exception().get()
+ msg = je.toString().split(': ', 1)[1] # Drop the Java StreamingQueryException type info
+ stackTrace = '\n\t at '.join(map(lambda x: x.toString(), je.getStackTrace()))
+ return StreamingQueryException(msg, stackTrace)
+ else:
+ return None
+
class StreamingQueryManager(object):
"""A class to manage all the :class:`StreamingQuery` StreamingQueries active.
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 0aff9cebe9..9f34414f64 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1137,6 +1137,35 @@ class SQLTests(ReusedPySparkTestCase):
q.stop()
shutil.rmtree(tmpPath)
+ def test_stream_exception(self):
+ sdf = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
+ sq = sdf.writeStream.format('memory').queryName('query_explain').start()
+ try:
+ sq.processAllAvailable()
+ self.assertEqual(sq.exception(), None)
+ finally:
+ sq.stop()
+
+ from pyspark.sql.functions import col, udf
+ from pyspark.sql.utils import StreamingQueryException
+ bad_udf = udf(lambda x: 1 / 0)
+ sq = sdf.select(bad_udf(col("value")))\
+ .writeStream\
+ .format('memory')\
+ .queryName('this_query')\
+ .start()
+ try:
+ # Process some data to fail the query
+ sq.processAllAvailable()
+ self.fail("bad udf should fail the query")
+ except StreamingQueryException as e:
+ # This is expected
+ self.assertTrue("ZeroDivisionError" in e.desc)
+ finally:
+ sq.stop()
+ self.assertTrue(type(sq.exception()) is StreamingQueryException)
+ self.assertTrue("ZeroDivisionError" in sq.exception().desc)
+
def test_query_manager_await_termination(self):
df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
for q in self.spark._wrapped.streams.active:
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 8804c647a7..6b1c01ab2a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -93,7 +93,7 @@ class StreamExecution(
* once, since the field's value may change at any time.
*/
@volatile
- protected var availableOffsets = new StreamProgress
+ var availableOffsets = new StreamProgress
/** The current batchId or -1 if execution has not yet been initialized. */
protected var currentBatchId: Long = -1
@@ -263,7 +263,8 @@ class StreamExecution(
this,
s"Query $name terminated with exception: ${e.getMessage}",
e,
- Some(committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json)))
+ committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString,
+ availableOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString)
logError(s"Query $name terminated with error", e)
updateStatusMessage(s"Terminated with exception: ${e.getMessage}")
// Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
index 13f11ba1c9..a96150aa89 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
@@ -24,32 +24,42 @@ import org.apache.spark.sql.execution.streaming.{Offset, OffsetSeq, StreamExecut
* :: Experimental ::
* Exception that stopped a [[StreamingQuery]]. Use `cause` get the actual exception
* that caused the failure.
- * @param query Query that caused the exception
* @param message Message of this exception
* @param cause Internal cause of this exception
- * @param startOffset Starting offset (if known) of the range of data in which exception occurred
- * @param endOffset Ending offset (if known) of the range of data in exception occurred
+ * @param startOffset Starting offset in json of the range of data in which exception occurred
+ * @param endOffset Ending offset in json of the range of data in exception occurred
* @since 2.0.0
*/
@Experimental
-class StreamingQueryException private[sql](
- @transient val query: StreamingQuery,
+class StreamingQueryException private(
+ causeString: String,
val message: String,
val cause: Throwable,
- val startOffset: Option[OffsetSeq] = None,
- val endOffset: Option[OffsetSeq] = None)
+ val startOffset: String,
+ val endOffset: String)
extends Exception(message, cause) {
+ private[sql] def this(
+ query: StreamingQuery,
+ message: String,
+ cause: Throwable,
+ startOffset: String,
+ endOffset: String) {
+ this(
+ // scalastyle:off
+ s"""${classOf[StreamingQueryException].getName}: ${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}
+ |
+ |${query.asInstanceOf[StreamExecution].toDebugString}
+ """.stripMargin,
+ // scalastyle:on
+ message,
+ cause,
+ startOffset,
+ endOffset)
+ }
+
/** Time when the exception occurred */
val time: Long = System.currentTimeMillis
- override def toString(): String = {
- val causeStr =
- s"${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}"
- s"""
- |$causeStr
- |
- |${query.asInstanceOf[StreamExecution].toDebugString}
- """.stripMargin
- }
+ override def toString(): String = causeString
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 4c8247458f..fb5bad0123 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -38,6 +38,13 @@ import org.apache.spark.annotation.Experimental
class StateOperatorProgress private[sql](
val numRowsTotal: Long,
val numRowsUpdated: Long) {
+
+ /** The compact JSON representation of this progress. */
+ def json: String = compact(render(jsonValue))
+
+ /** The pretty (i.e. indented) JSON representation of this progress. */
+ def prettyJson: String = pretty(render(jsonValue))
+
private[sql] def jsonValue: JValue = {
("numRowsTotal" -> JInt(numRowsTotal)) ~
("numRowsUpdated" -> JInt(numRowsUpdated))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index a2629f7f68..4332265129 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -412,8 +412,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
eventually("microbatch thread not stopped after termination with failure") {
assert(!currentStream.microBatchThread.isAlive)
}
- verify(thrownException.query.eq(currentStream),
- s"incorrect query reference in exception")
verify(currentStream.exception === Some(thrownException),
s"incorrect exception returned by query.exception()")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 56abe1201c..f7fc19494d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -103,10 +103,12 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
TestAwaitTermination(ExpectException[SparkException]),
TestAwaitTermination(ExpectException[SparkException], timeoutMs = 2000),
TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10),
- AssertOnQuery(
- q => q.exception.get.startOffset.get.offsets ===
- q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").offsets,
- "incorrect start offset on exception")
+ AssertOnQuery(q => {
+ q.exception.get.startOffset ===
+ q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").toString &&
+ q.exception.get.endOffset ===
+ q.availableOffsets.toOffsetSeq(Seq(inputData), "{}").toString
+ }, "incorrect start offset or end offset on exception")
)
}