diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-12-05 11:36:11 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2016-12-05 11:36:11 -0800 |
commit | 246012859f0ed5248809a2e00e8355fbdaa8beb5 (patch) | |
tree | 85a7d76e1e438b7839c70118d18c3f6c87e5f7b1 /sql/core | |
parent | 410b7898661f77e748564aaee6a5ab7747ce34ad (diff) | |
download | spark-246012859f0ed5248809a2e00e8355fbdaa8beb5.tar.gz spark-246012859f0ed5248809a2e00e8355fbdaa8beb5.tar.bz2 spark-246012859f0ed5248809a2e00e8355fbdaa8beb5.zip |
[SPARK-18694][SS] Add StreamingQuery.explain and exception to Python and fix StreamingQueryException
## What changes were proposed in this pull request?
- Add StreamingQuery.explain and exception to Python.
- Fix StreamingQueryException to not expose `OffsetSeq`.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #16125 from zsxwing/py-streaming-explain.
Diffstat (limited to 'sql/core')
5 files changed, 42 insertions, 24 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 8804c647a7..6b1c01ab2a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -93,7 +93,7 @@ class StreamExecution( * once, since the field's value may change at any time. */ @volatile - protected var availableOffsets = new StreamProgress + var availableOffsets = new StreamProgress /** The current batchId or -1 if execution has not yet been initialized. */ protected var currentBatchId: Long = -1 @@ -263,7 +263,8 @@ class StreamExecution( this, s"Query $name terminated with exception: ${e.getMessage}", e, - Some(committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json))) + committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString, + availableOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString) logError(s"Query $name terminated with error", e) updateStatusMessage(s"Terminated with exception: ${e.getMessage}") // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala index 13f11ba1c9..a96150aa89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala @@ -24,32 +24,42 @@ import org.apache.spark.sql.execution.streaming.{Offset, OffsetSeq, StreamExecut * :: Experimental :: * Exception that stopped a [[StreamingQuery]]. Use `cause` get the actual exception * that caused the failure. - * @param query Query that caused the exception * @param message Message of this exception * @param cause Internal cause of this exception - * @param startOffset Starting offset (if known) of the range of data in which exception occurred - * @param endOffset Ending offset (if known) of the range of data in exception occurred + * @param startOffset Starting offset in json of the range of data in which exception occurred + * @param endOffset Ending offset in json of the range of data in exception occurred * @since 2.0.0 */ @Experimental -class StreamingQueryException private[sql]( - @transient val query: StreamingQuery, +class StreamingQueryException private( + causeString: String, val message: String, val cause: Throwable, - val startOffset: Option[OffsetSeq] = None, - val endOffset: Option[OffsetSeq] = None) + val startOffset: String, + val endOffset: String) extends Exception(message, cause) { + private[sql] def this( + query: StreamingQuery, + message: String, + cause: Throwable, + startOffset: String, + endOffset: String) { + this( + // scalastyle:off + s"""${classOf[StreamingQueryException].getName}: ${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")} + | + |${query.asInstanceOf[StreamExecution].toDebugString} + """.stripMargin, + // scalastyle:on + message, + cause, + startOffset, + endOffset) + } + /** Time when the exception occurred */ val time: Long = System.currentTimeMillis - override def toString(): String = { - val causeStr = - s"${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}" - s""" - |$causeStr - | - |${query.asInstanceOf[StreamExecution].toDebugString} - """.stripMargin - } + override def toString(): String = causeString } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 4c8247458f..fb5bad0123 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -38,6 +38,13 @@ import org.apache.spark.annotation.Experimental class StateOperatorProgress private[sql]( val numRowsTotal: Long, val numRowsUpdated: Long) { + + /** The compact JSON representation of this progress. */ + def json: String = compact(render(jsonValue)) + + /** The pretty (i.e. indented) JSON representation of this progress. */ + def prettyJson: String = pretty(render(jsonValue)) + private[sql] def jsonValue: JValue = { ("numRowsTotal" -> JInt(numRowsTotal)) ~ ("numRowsUpdated" -> JInt(numRowsUpdated)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index a2629f7f68..4332265129 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -412,8 +412,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { eventually("microbatch thread not stopped after termination with failure") { assert(!currentStream.microBatchThread.isAlive) } - verify(thrownException.query.eq(currentStream), - s"incorrect query reference in exception") verify(currentStream.exception === Some(thrownException), s"incorrect exception returned by query.exception()") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 56abe1201c..f7fc19494d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -103,10 +103,12 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { TestAwaitTermination(ExpectException[SparkException]), TestAwaitTermination(ExpectException[SparkException], timeoutMs = 2000), TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10), - AssertOnQuery( - q => q.exception.get.startOffset.get.offsets === - q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").offsets, - "incorrect start offset on exception") + AssertOnQuery(q => { + q.exception.get.startOffset === + q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").toString && + q.exception.get.endOffset === + q.availableOffsets.toOffsetSeq(Seq(inputData), "{}").toString + }, "incorrect start offset or end offset on exception") ) } |