aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-12-05 11:36:11 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-12-05 11:36:11 -0800
commit246012859f0ed5248809a2e00e8355fbdaa8beb5 (patch)
tree85a7d76e1e438b7839c70118d18c3f6c87e5f7b1 /sql/core
parent410b7898661f77e748564aaee6a5ab7747ce34ad (diff)
downloadspark-246012859f0ed5248809a2e00e8355fbdaa8beb5.tar.gz
spark-246012859f0ed5248809a2e00e8355fbdaa8beb5.tar.bz2
spark-246012859f0ed5248809a2e00e8355fbdaa8beb5.zip
[SPARK-18694][SS] Add StreamingQuery.explain and exception to Python and fix StreamingQueryException
## What changes were proposed in this pull request? - Add StreamingQuery.explain and exception to Python. - Fix StreamingQueryException to not expose `OffsetSeq`. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #16125 from zsxwing/py-streaming-explain.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala42
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala10
5 files changed, 42 insertions, 24 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 8804c647a7..6b1c01ab2a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -93,7 +93,7 @@ class StreamExecution(
* once, since the field's value may change at any time.
*/
@volatile
- protected var availableOffsets = new StreamProgress
+ var availableOffsets = new StreamProgress
/** The current batchId or -1 if execution has not yet been initialized. */
protected var currentBatchId: Long = -1
@@ -263,7 +263,8 @@ class StreamExecution(
this,
s"Query $name terminated with exception: ${e.getMessage}",
e,
- Some(committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json)))
+ committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString,
+ availableOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString)
logError(s"Query $name terminated with error", e)
updateStatusMessage(s"Terminated with exception: ${e.getMessage}")
// Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
index 13f11ba1c9..a96150aa89 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
@@ -24,32 +24,42 @@ import org.apache.spark.sql.execution.streaming.{Offset, OffsetSeq, StreamExecut
* :: Experimental ::
* Exception that stopped a [[StreamingQuery]]. Use `cause` get the actual exception
* that caused the failure.
- * @param query Query that caused the exception
* @param message Message of this exception
* @param cause Internal cause of this exception
- * @param startOffset Starting offset (if known) of the range of data in which exception occurred
- * @param endOffset Ending offset (if known) of the range of data in exception occurred
+ * @param startOffset Starting offset in json of the range of data in which exception occurred
+ * @param endOffset Ending offset in json of the range of data in exception occurred
* @since 2.0.0
*/
@Experimental
-class StreamingQueryException private[sql](
- @transient val query: StreamingQuery,
+class StreamingQueryException private(
+ causeString: String,
val message: String,
val cause: Throwable,
- val startOffset: Option[OffsetSeq] = None,
- val endOffset: Option[OffsetSeq] = None)
+ val startOffset: String,
+ val endOffset: String)
extends Exception(message, cause) {
+ private[sql] def this(
+ query: StreamingQuery,
+ message: String,
+ cause: Throwable,
+ startOffset: String,
+ endOffset: String) {
+ this(
+ // scalastyle:off
+ s"""${classOf[StreamingQueryException].getName}: ${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}
+ |
+ |${query.asInstanceOf[StreamExecution].toDebugString}
+ """.stripMargin,
+ // scalastyle:on
+ message,
+ cause,
+ startOffset,
+ endOffset)
+ }
+
/** Time when the exception occurred */
val time: Long = System.currentTimeMillis
- override def toString(): String = {
- val causeStr =
- s"${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}"
- s"""
- |$causeStr
- |
- |${query.asInstanceOf[StreamExecution].toDebugString}
- """.stripMargin
- }
+ override def toString(): String = causeString
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 4c8247458f..fb5bad0123 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -38,6 +38,13 @@ import org.apache.spark.annotation.Experimental
class StateOperatorProgress private[sql](
val numRowsTotal: Long,
val numRowsUpdated: Long) {
+
+ /** The compact JSON representation of this progress. */
+ def json: String = compact(render(jsonValue))
+
+ /** The pretty (i.e. indented) JSON representation of this progress. */
+ def prettyJson: String = pretty(render(jsonValue))
+
private[sql] def jsonValue: JValue = {
("numRowsTotal" -> JInt(numRowsTotal)) ~
("numRowsUpdated" -> JInt(numRowsUpdated))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index a2629f7f68..4332265129 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -412,8 +412,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
eventually("microbatch thread not stopped after termination with failure") {
assert(!currentStream.microBatchThread.isAlive)
}
- verify(thrownException.query.eq(currentStream),
- s"incorrect query reference in exception")
verify(currentStream.exception === Some(thrownException),
s"incorrect exception returned by query.exception()")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 56abe1201c..f7fc19494d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -103,10 +103,12 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
TestAwaitTermination(ExpectException[SparkException]),
TestAwaitTermination(ExpectException[SparkException], timeoutMs = 2000),
TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10),
- AssertOnQuery(
- q => q.exception.get.startOffset.get.offsets ===
- q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").offsets,
- "incorrect start offset on exception")
+ AssertOnQuery(q => {
+ q.exception.get.startOffset ===
+ q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").toString &&
+ q.exception.get.endOffset ===
+ q.availableOffsets.toOffsetSeq(Seq(inputData), "{}").toString
+ }, "incorrect start offset or end offset on exception")
)
}