aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-11-29 23:08:56 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-11-29 23:08:56 -0800
commitbc09a2b8c3b03a207a6e20627f2c5ec23c1efe8c (patch)
treeda3025cf347f60d962258595f0cafb7d8633c52d /sql/core
parent4c82ca86d979e5526a15666683eef3c79c37dc68 (diff)
downloadspark-bc09a2b8c3b03a207a6e20627f2c5ec23c1efe8c.tar.gz
spark-bc09a2b8c3b03a207a6e20627f2c5ec23c1efe8c.tar.bz2
spark-bc09a2b8c3b03a207a6e20627f2c5ec23c1efe8c.zip
[SPARK-18516][STRUCTURED STREAMING] Follow up PR to add StreamingQuery.status to Python
## What changes were proposed in this pull request? - Add StreamingQueryStatus.json - Make it not case class (to avoid unnecessarily exposing implicit object StreamingQueryStatus, consistent with StreamingQueryProgress) - Add StreamingQuery.status to Python - Fix post-termination status ## How was this patch tested? New unit tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #16075 from tdas/SPARK-18516-1.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala38
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala29
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala)34
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala49
7 files changed, 114 insertions, 54 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index b7b6e1988e..ba77e7c7bf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -70,11 +70,12 @@ trait ProgressReporter extends Logging {
private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
@volatile
- protected var currentStatus: StreamingQueryStatus =
- StreamingQueryStatus(
+ protected var currentStatus: StreamingQueryStatus = {
+ new StreamingQueryStatus(
message = "Initializing StreamExecution",
isDataAvailable = false,
isTriggerActive = false)
+ }
/** Returns the current status of the query. */
def status: StreamingQueryStatus = currentStatus
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index e4f31af35f..6d0e269d34 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -238,8 +238,10 @@ class StreamExecution(
updateStatusMessage("Waiting for next trigger")
isTerminated
})
+ updateStatusMessage("Stopped")
} catch {
case _: InterruptedException if state == TERMINATED => // interrupted by stop()
+ updateStatusMessage("Stopped")
case e: Throwable =>
streamDeathCause = new StreamingQueryException(
this,
@@ -247,6 +249,7 @@ class StreamExecution(
e,
Some(committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json)))
logError(s"Query $name terminated with error", e)
+ updateStatusMessage(s"Terminated with exception: ${e.getMessage}")
// Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
// handle them
if (!NonFatal(e)) {
@@ -254,6 +257,7 @@ class StreamExecution(
}
} finally {
state = TERMINATED
+ currentStatus = status.copy(isTriggerActive = false, isDataAvailable = false)
// Update metrics and status
sparkSession.sparkContext.env.metricsSystem.removeSource(streamMetrics)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
index 4c1a7ce6a0..44befa0d2f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
@@ -17,6 +17,11 @@
package org.apache.spark.sql.streaming
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
/**
* Reports information about the instantaneous status of a streaming query.
*
@@ -27,7 +32,32 @@ package org.apache.spark.sql.streaming
*
* @since 2.1.0
*/
-case class StreamingQueryStatus protected[sql](
- message: String,
- isDataAvailable: Boolean,
- isTriggerActive: Boolean)
+class StreamingQueryStatus protected[sql](
+ val message: String,
+ val isDataAvailable: Boolean,
+ val isTriggerActive: Boolean) {
+
+ /** The compact JSON representation of this status. */
+ def json: String = compact(render(jsonValue))
+
+ /** The pretty (i.e. indented) JSON representation of this status. */
+ def prettyJson: String = pretty(render(jsonValue))
+
+ override def toString: String = prettyJson
+
+ private[sql] def copy(
+ message: String = this.message,
+ isDataAvailable: Boolean = this.isDataAvailable,
+ isTriggerActive: Boolean = this.isTriggerActive): StreamingQueryStatus = {
+ new StreamingQueryStatus(
+ message = message,
+ isDataAvailable = isDataAvailable,
+ isTriggerActive = isTriggerActive)
+ }
+
+ private[sql] def jsonValue: JValue = {
+ ("message" -> JString(message.toString)) ~
+ ("isDataAvailable" -> JBool(isDataAvailable)) ~
+ ("isTriggerActive" -> JBool(isTriggerActive))
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 7129fa4d15..4c8247458f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -23,7 +23,6 @@ import java.util.UUID
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
-import org.apache.jute.compiler.JLong
import org.json4s._
import org.json4s.JsonAST.JValue
import org.json4s.JsonDSL._
@@ -85,10 +84,10 @@ class StreamingQueryProgress private[sql](
/** The aggregate (across all sources) rate at which Spark is processing data. */
def processedRowsPerSecond: Double = sources.map(_.processedRowsPerSecond).sum
- /** The compact JSON representation of this status. */
+ /** The compact JSON representation of this progress. */
def json: String = compact(render(jsonValue))
- /** The pretty (i.e. indented) JSON representation of this status. */
+ /** The pretty (i.e. indented) JSON representation of this progress. */
def prettyJson: String = pretty(render(jsonValue))
override def toString: String = prettyJson
@@ -179,10 +178,10 @@ class SourceProgress protected[sql](
class SinkProgress protected[sql](
val description: String) {
- /** The compact JSON representation of this status. */
+ /** The compact JSON representation of this progress. */
def json: String = compact(render(jsonValue))
- /** The pretty (i.e. indented) JSON representation of this status. */
+ /** The pretty (i.e. indented) JSON representation of this progress. */
def prettyJson: String = pretty(render(jsonValue))
override def toString: String = prettyJson
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index c68f953b10..08b93e7d0b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -106,6 +106,11 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(listener.terminationEvent !== null)
assert(listener.terminationEvent.id === query.id)
assert(listener.terminationEvent.exception.nonEmpty)
+ // Make sure that the exception message reported through listener
+ // contains the actual exception and relevant stack trace
+ assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException"))
+ assert(listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException"))
+ assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite"))
listener.checkAsyncErrors()
true
}
@@ -159,28 +164,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
}
- testQuietly("exception should be reported in QueryTerminated") {
- val listener = new EventCollector
- withListenerAdded(listener) {
- val input = MemoryStream[Int]
- testStream(input.toDS.map(_ / 0))(
- StartStream(),
- AddData(input, 1),
- ExpectFailure[SparkException](),
- Assert {
- spark.sparkContext.listenerBus.waitUntilEmpty(10000)
- assert(listener.terminationEvent !== null)
- assert(listener.terminationEvent.exception.nonEmpty)
- // Make sure that the exception message reported through listener
- // contains the actual exception and relevant stack trace
- assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException"))
- assert(listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException"))
- assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite"))
- }
- )
- }
- }
-
test("QueryStartedEvent serialization") {
val queryStarted = new StreamingQueryListener.QueryStartedEvent(UUID.randomUUID(), "name")
val json = JsonProtocol.sparkEventToJson(queryStarted)
@@ -190,7 +173,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
test("QueryProgressEvent serialization") {
val event = new StreamingQueryListener.QueryProgressEvent(
- StreamingQueryProgressSuite.testProgress)
+ StreamingQueryStatusAndProgressSuite.testProgress)
val json = JsonProtocol.sparkEventToJson(event)
val newEvent = JsonProtocol.sparkEventFromJson(json)
.asInstanceOf[StreamingQueryListener.QueryProgressEvent]
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index 45d29f6b35..4da712fa0f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -25,12 +25,12 @@ import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.streaming.StreamingQueryProgressSuite._
+import org.apache.spark.sql.streaming.StreamingQueryStatusAndProgressSuite._
-class StreamingQueryProgressSuite extends SparkFunSuite {
+class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
- test("prettyJson") {
+ test("StreamingQueryProgress - prettyJson") {
val json = testProgress.prettyJson
assert(json ===
s"""
@@ -64,16 +64,36 @@ class StreamingQueryProgressSuite extends SparkFunSuite {
}
- test("json") {
+ test("StreamingQueryProgress - json") {
assert(compact(parse(testProgress.json)) === testProgress.json)
}
- test("toString") {
+ test("StreamingQueryProgress - toString") {
assert(testProgress.toString === testProgress.prettyJson)
}
+
+ test("StreamingQueryStatus - prettyJson") {
+ val json = testStatus.prettyJson
+ assert(json ===
+ """
+ |{
+ | "message" : "active",
+ | "isDataAvailable" : true,
+ | "isTriggerActive" : false
+ |}
+ """.stripMargin.trim)
+ }
+
+ test("StreamingQueryStatus - json") {
+ assert(compact(parse(testStatus.json)) === testStatus.json)
+ }
+
+ test("StreamingQueryStatus - toString") {
+ assert(testStatus.toString === testStatus.prettyJson)
+ }
}
-object StreamingQueryProgressSuite {
+object StreamingQueryStatusAndProgressSuite {
val testProgress = new StreamingQueryProgress(
id = UUID.randomUUID(),
name = "name",
@@ -94,5 +114,7 @@ object StreamingQueryProgressSuite {
),
sink = new SinkProgress("sink")
)
+
+ val testStatus = new StreamingQueryStatus("active", true, false)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 4f3b4a2d75..56abe1201c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -77,7 +77,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
q2.stop()
}
- testQuietly("lifecycle states and awaitTermination") {
+ testQuietly("isActive, exception, and awaitTermination") {
val inputData = MemoryStream[Int]
val mapped = inputData.toDS().map { 6 / _}
@@ -110,7 +110,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
)
}
- testQuietly("query statuses and progresses") {
+ testQuietly("status, lastProgress, and recentProgresses") {
import StreamingQuerySuite._
clock = new StreamManualClock
@@ -133,10 +133,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
}
// This is to make sure thatquery waits for manual clock to be 600 first time there is data
- val mapped = inputData.toDS().agg(count("*")).as[Long].coalesce(1).map { x =>
+ val mapped = inputData.toDS().as[Long].map { x =>
clock.waitTillTime(1100)
- x
- }
+ 10 / x
+ }.agg(count("*")).as[Long]
case class AssertStreamExecThreadToWaitForClock()
extends AssertOnQuery(q => {
@@ -151,25 +151,26 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
true
}, "")
+ var lastProgressBeforeStop: StreamingQueryProgress = null
+
testStream(mapped, OutputMode.Complete)(
StartStream(ProcessingTime(100), triggerClock = clock),
AssertStreamExecThreadToWaitForClock(),
AssertOnQuery(_.status.isDataAvailable === false),
AssertOnQuery(_.status.isTriggerActive === false),
- // TODO: test status.message before trigger has started
- // AssertOnQuery(_.lastProgress === null) // there is an empty trigger as soon as started
+ AssertOnQuery(_.status.message === "Waiting for next trigger"),
AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
- // Test status while offset is being fetched
+ // Test status and progress while offset is being fetched
AddData(inputData, 1, 2),
AdvanceManualClock(100), // time = 100 to start new trigger, will block on getOffset
AssertStreamExecThreadToWaitForClock(),
AssertOnQuery(_.status.isDataAvailable === false),
AssertOnQuery(_.status.isTriggerActive === true),
- AssertOnQuery(_.status.message.toLowerCase.contains("getting offsets from")),
+ AssertOnQuery(_.status.message.startsWith("Getting offsets from")),
AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
- // Test status while batch is being fetched
+ // Test status and progress while batch is being fetched
AdvanceManualClock(200), // time = 300 to unblock getOffset, will block on getBatch
AssertStreamExecThreadToWaitForClock(),
AssertOnQuery(_.status.isDataAvailable === true),
@@ -177,14 +178,14 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
AssertOnQuery(_.status.message === "Processing new data"),
AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
- // Test status while batch is being processed
+ // Test status and progress while batch is being processed
AdvanceManualClock(300), // time = 600 to unblock getBatch, will block in Spark job
AssertOnQuery(_.status.isDataAvailable === true),
AssertOnQuery(_.status.isTriggerActive === true),
AssertOnQuery(_.status.message === "Processing new data"),
AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
- // Test status while batch processing has completed
+ // Test status and progress while batch processing has completed
AdvanceManualClock(500), // time = 1100 to unblock job
AssertOnQuery { _ => clock.getTimeMillis() === 1100 },
CheckAnswer(2),
@@ -237,12 +238,32 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
true
},
- // Test status after data is not available for a trigger
+ // Test status and progress after data is not available for a trigger
AdvanceManualClock(100), // allow another trigger
AssertStreamExecThreadToWaitForClock(),
AssertOnQuery(_.status.isDataAvailable === false),
AssertOnQuery(_.status.isTriggerActive === false),
- AssertOnQuery(_.status.message === "Waiting for next trigger")
+ AssertOnQuery(_.status.message === "Waiting for next trigger"),
+
+ // Test status and progress after query stopped
+ AssertOnQuery { query =>
+ lastProgressBeforeStop = query.lastProgress
+ true
+ },
+ StopStream,
+ AssertOnQuery(_.lastProgress.json === lastProgressBeforeStop.json),
+ AssertOnQuery(_.status.isDataAvailable === false),
+ AssertOnQuery(_.status.isTriggerActive === false),
+ AssertOnQuery(_.status.message === "Stopped"),
+
+ // Test status and progress after query terminated with error
+ StartStream(ProcessingTime(100), triggerClock = clock),
+ AddData(inputData, 0),
+ AdvanceManualClock(100),
+ ExpectFailure[SparkException],
+ AssertOnQuery(_.status.isDataAvailable === false),
+ AssertOnQuery(_.status.isTriggerActive === false),
+ AssertOnQuery(_.status.message.startsWith("Terminated with exception"))
)
}