From bc09a2b8c3b03a207a6e20627f2c5ec23c1efe8c Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 29 Nov 2016 23:08:56 -0800 Subject: [SPARK-18516][STRUCTURED STREAMING] Follow up PR to add StreamingQuery.status to Python ## What changes were proposed in this pull request? - Add StreamingQueryStatus.json - Make it not case class (to avoid unnecessarily exposing implicit object StreamingQueryStatus, consistent with StreamingQueryProgress) - Add StreamingQuery.status to Python - Fix post-termination status ## How was this patch tested? New unit tests Author: Tathagata Das Closes #16075 from tdas/SPARK-18516-1. --- .../sql/execution/streaming/ProgressReporter.scala | 5 +- .../sql/execution/streaming/StreamExecution.scala | 4 + .../spark/sql/streaming/StreamingQueryStatus.scala | 38 ++++++- .../org/apache/spark/sql/streaming/progress.scala | 9 +- .../streaming/StreamingQueryListenerSuite.scala | 29 ++--- .../streaming/StreamingQueryProgressSuite.scala | 98 ----------------- .../StreamingQueryStatusAndProgressSuite.scala | 120 +++++++++++++++++++++ .../spark/sql/streaming/StreamingQuerySuite.scala | 49 ++++++--- 8 files changed, 206 insertions(+), 146 deletions(-) delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala (limited to 'sql/core') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index b7b6e1988e..ba77e7c7bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -70,11 +70,12 @@ trait ProgressReporter extends Logging { private val progressBuffer = new mutable.Queue[StreamingQueryProgress]() @volatile - protected var currentStatus: StreamingQueryStatus = - StreamingQueryStatus( + protected var currentStatus: StreamingQueryStatus = { + new StreamingQueryStatus( message = "Initializing StreamExecution", isDataAvailable = false, isTriggerActive = false) + } /** Returns the current status of the query. */ def status: StreamingQueryStatus = currentStatus diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index e4f31af35f..6d0e269d34 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -238,8 +238,10 @@ class StreamExecution( updateStatusMessage("Waiting for next trigger") isTerminated }) + updateStatusMessage("Stopped") } catch { case _: InterruptedException if state == TERMINATED => // interrupted by stop() + updateStatusMessage("Stopped") case e: Throwable => streamDeathCause = new StreamingQueryException( this, @@ -247,6 +249,7 @@ class StreamExecution( e, Some(committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json))) logError(s"Query $name terminated with error", e) + updateStatusMessage(s"Terminated with exception: ${e.getMessage}") // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to // handle them if (!NonFatal(e)) { @@ -254,6 +257,7 @@ class StreamExecution( } } finally { state = TERMINATED + currentStatus = status.copy(isTriggerActive = false, isDataAvailable = false) // Update metrics and status sparkSession.sparkContext.env.metricsSystem.removeSource(streamMetrics) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala index 4c1a7ce6a0..44befa0d2f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala @@ -17,6 +17,11 @@ package org.apache.spark.sql.streaming +import org.json4s._ +import org.json4s.JsonAST.JValue +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + /** * Reports information about the instantaneous status of a streaming query. * @@ -27,7 +32,32 @@ package org.apache.spark.sql.streaming * * @since 2.1.0 */ -case class StreamingQueryStatus protected[sql]( - message: String, - isDataAvailable: Boolean, - isTriggerActive: Boolean) +class StreamingQueryStatus protected[sql]( + val message: String, + val isDataAvailable: Boolean, + val isTriggerActive: Boolean) { + + /** The compact JSON representation of this status. */ + def json: String = compact(render(jsonValue)) + + /** The pretty (i.e. indented) JSON representation of this status. */ + def prettyJson: String = pretty(render(jsonValue)) + + override def toString: String = prettyJson + + private[sql] def copy( + message: String = this.message, + isDataAvailable: Boolean = this.isDataAvailable, + isTriggerActive: Boolean = this.isTriggerActive): StreamingQueryStatus = { + new StreamingQueryStatus( + message = message, + isDataAvailable = isDataAvailable, + isTriggerActive = isTriggerActive) + } + + private[sql] def jsonValue: JValue = { + ("message" -> JString(message.toString)) ~ + ("isDataAvailable" -> JBool(isDataAvailable)) ~ + ("isTriggerActive" -> JBool(isTriggerActive)) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 7129fa4d15..4c8247458f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -23,7 +23,6 @@ import java.util.UUID import scala.collection.JavaConverters._ import scala.util.control.NonFatal -import org.apache.jute.compiler.JLong import org.json4s._ import org.json4s.JsonAST.JValue import org.json4s.JsonDSL._ @@ -85,10 +84,10 @@ class StreamingQueryProgress private[sql]( /** The aggregate (across all sources) rate at which Spark is processing data. */ def processedRowsPerSecond: Double = sources.map(_.processedRowsPerSecond).sum - /** The compact JSON representation of this status. */ + /** The compact JSON representation of this progress. */ def json: String = compact(render(jsonValue)) - /** The pretty (i.e. indented) JSON representation of this status. */ + /** The pretty (i.e. indented) JSON representation of this progress. */ def prettyJson: String = pretty(render(jsonValue)) override def toString: String = prettyJson @@ -179,10 +178,10 @@ class SourceProgress protected[sql]( class SinkProgress protected[sql]( val description: String) { - /** The compact JSON representation of this status. */ + /** The compact JSON representation of this progress. */ def json: String = compact(render(jsonValue)) - /** The pretty (i.e. indented) JSON representation of this status. */ + /** The pretty (i.e. indented) JSON representation of this progress. */ def prettyJson: String = pretty(render(jsonValue)) override def toString: String = prettyJson diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index c68f953b10..08b93e7d0b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -106,6 +106,11 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { assert(listener.terminationEvent !== null) assert(listener.terminationEvent.id === query.id) assert(listener.terminationEvent.exception.nonEmpty) + // Make sure that the exception message reported through listener + // contains the actual exception and relevant stack trace + assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException")) + assert(listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException")) + assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite")) listener.checkAsyncErrors() true } @@ -159,28 +164,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } } - testQuietly("exception should be reported in QueryTerminated") { - val listener = new EventCollector - withListenerAdded(listener) { - val input = MemoryStream[Int] - testStream(input.toDS.map(_ / 0))( - StartStream(), - AddData(input, 1), - ExpectFailure[SparkException](), - Assert { - spark.sparkContext.listenerBus.waitUntilEmpty(10000) - assert(listener.terminationEvent !== null) - assert(listener.terminationEvent.exception.nonEmpty) - // Make sure that the exception message reported through listener - // contains the actual exception and relevant stack trace - assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException")) - assert(listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException")) - assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite")) - } - ) - } - } - test("QueryStartedEvent serialization") { val queryStarted = new StreamingQueryListener.QueryStartedEvent(UUID.randomUUID(), "name") val json = JsonProtocol.sparkEventToJson(queryStarted) @@ -190,7 +173,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { test("QueryProgressEvent serialization") { val event = new StreamingQueryListener.QueryProgressEvent( - StreamingQueryProgressSuite.testProgress) + StreamingQueryStatusAndProgressSuite.testProgress) val json = JsonProtocol.sparkEventToJson(event) val newEvent = JsonProtocol.sparkEventFromJson(json) .asInstanceOf[StreamingQueryListener.QueryProgressEvent] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala deleted file mode 100644 index 45d29f6b35..0000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.streaming - -import java.util.UUID - -import scala.collection.JavaConverters._ - -import org.json4s._ -import org.json4s.jackson.JsonMethods._ - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.streaming.StreamingQueryProgressSuite._ - - -class StreamingQueryProgressSuite extends SparkFunSuite { - - test("prettyJson") { - val json = testProgress.prettyJson - assert(json === - s""" - |{ - | "id" : "${testProgress.id.toString}", - | "name" : "name", - | "timestamp" : 1, - | "numInputRows" : 678, - | "inputRowsPerSecond" : 10.0, - | "durationMs" : { - | "total" : 0 - | }, - | "currentWatermark" : 3, - | "stateOperators" : [ { - | "numRowsTotal" : 0, - | "numRowsUpdated" : 1 - | } ], - | "sources" : [ { - | "description" : "source", - | "startOffset" : 123, - | "endOffset" : 456, - | "numInputRows" : 678, - | "inputRowsPerSecond" : 10.0 - | } ], - | "sink" : { - | "description" : "sink" - | } - |} - """.stripMargin.trim) - assert(compact(parse(json)) === testProgress.json) - - } - - test("json") { - assert(compact(parse(testProgress.json)) === testProgress.json) - } - - test("toString") { - assert(testProgress.toString === testProgress.prettyJson) - } -} - -object StreamingQueryProgressSuite { - val testProgress = new StreamingQueryProgress( - id = UUID.randomUUID(), - name = "name", - timestamp = 1L, - batchId = 2L, - durationMs = Map("total" -> 0L).mapValues(long2Long).asJava, - currentWatermark = 3L, - stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)), - sources = Array( - new SourceProgress( - description = "source", - startOffset = "123", - endOffset = "456", - numInputRows = 678, - inputRowsPerSecond = 10.0, - processedRowsPerSecond = Double.PositiveInfinity // should not be present in the json - ) - ), - sink = new SinkProgress("sink") - ) -} - diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala new file mode 100644 index 0000000000..4da712fa0f --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.util.UUID + +import scala.collection.JavaConverters._ + +import org.json4s._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.streaming.StreamingQueryStatusAndProgressSuite._ + + +class StreamingQueryStatusAndProgressSuite extends SparkFunSuite { + + test("StreamingQueryProgress - prettyJson") { + val json = testProgress.prettyJson + assert(json === + s""" + |{ + | "id" : "${testProgress.id.toString}", + | "name" : "name", + | "timestamp" : 1, + | "numInputRows" : 678, + | "inputRowsPerSecond" : 10.0, + | "durationMs" : { + | "total" : 0 + | }, + | "currentWatermark" : 3, + | "stateOperators" : [ { + | "numRowsTotal" : 0, + | "numRowsUpdated" : 1 + | } ], + | "sources" : [ { + | "description" : "source", + | "startOffset" : 123, + | "endOffset" : 456, + | "numInputRows" : 678, + | "inputRowsPerSecond" : 10.0 + | } ], + | "sink" : { + | "description" : "sink" + | } + |} + """.stripMargin.trim) + assert(compact(parse(json)) === testProgress.json) + + } + + test("StreamingQueryProgress - json") { + assert(compact(parse(testProgress.json)) === testProgress.json) + } + + test("StreamingQueryProgress - toString") { + assert(testProgress.toString === testProgress.prettyJson) + } + + test("StreamingQueryStatus - prettyJson") { + val json = testStatus.prettyJson + assert(json === + """ + |{ + | "message" : "active", + | "isDataAvailable" : true, + | "isTriggerActive" : false + |} + """.stripMargin.trim) + } + + test("StreamingQueryStatus - json") { + assert(compact(parse(testStatus.json)) === testStatus.json) + } + + test("StreamingQueryStatus - toString") { + assert(testStatus.toString === testStatus.prettyJson) + } +} + +object StreamingQueryStatusAndProgressSuite { + val testProgress = new StreamingQueryProgress( + id = UUID.randomUUID(), + name = "name", + timestamp = 1L, + batchId = 2L, + durationMs = Map("total" -> 0L).mapValues(long2Long).asJava, + currentWatermark = 3L, + stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)), + sources = Array( + new SourceProgress( + description = "source", + startOffset = "123", + endOffset = "456", + numInputRows = 678, + inputRowsPerSecond = 10.0, + processedRowsPerSecond = Double.PositiveInfinity // should not be present in the json + ) + ), + sink = new SinkProgress("sink") + ) + + val testStatus = new StreamingQueryStatus("active", true, false) +} + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 4f3b4a2d75..56abe1201c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -77,7 +77,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { q2.stop() } - testQuietly("lifecycle states and awaitTermination") { + testQuietly("isActive, exception, and awaitTermination") { val inputData = MemoryStream[Int] val mapped = inputData.toDS().map { 6 / _} @@ -110,7 +110,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { ) } - testQuietly("query statuses and progresses") { + testQuietly("status, lastProgress, and recentProgresses") { import StreamingQuerySuite._ clock = new StreamManualClock @@ -133,10 +133,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { } // This is to make sure thatquery waits for manual clock to be 600 first time there is data - val mapped = inputData.toDS().agg(count("*")).as[Long].coalesce(1).map { x => + val mapped = inputData.toDS().as[Long].map { x => clock.waitTillTime(1100) - x - } + 10 / x + }.agg(count("*")).as[Long] case class AssertStreamExecThreadToWaitForClock() extends AssertOnQuery(q => { @@ -151,25 +151,26 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { true }, "") + var lastProgressBeforeStop: StreamingQueryProgress = null + testStream(mapped, OutputMode.Complete)( StartStream(ProcessingTime(100), triggerClock = clock), AssertStreamExecThreadToWaitForClock(), AssertOnQuery(_.status.isDataAvailable === false), AssertOnQuery(_.status.isTriggerActive === false), - // TODO: test status.message before trigger has started - // AssertOnQuery(_.lastProgress === null) // there is an empty trigger as soon as started + AssertOnQuery(_.status.message === "Waiting for next trigger"), AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0), - // Test status while offset is being fetched + // Test status and progress while offset is being fetched AddData(inputData, 1, 2), AdvanceManualClock(100), // time = 100 to start new trigger, will block on getOffset AssertStreamExecThreadToWaitForClock(), AssertOnQuery(_.status.isDataAvailable === false), AssertOnQuery(_.status.isTriggerActive === true), - AssertOnQuery(_.status.message.toLowerCase.contains("getting offsets from")), + AssertOnQuery(_.status.message.startsWith("Getting offsets from")), AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0), - // Test status while batch is being fetched + // Test status and progress while batch is being fetched AdvanceManualClock(200), // time = 300 to unblock getOffset, will block on getBatch AssertStreamExecThreadToWaitForClock(), AssertOnQuery(_.status.isDataAvailable === true), @@ -177,14 +178,14 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { AssertOnQuery(_.status.message === "Processing new data"), AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0), - // Test status while batch is being processed + // Test status and progress while batch is being processed AdvanceManualClock(300), // time = 600 to unblock getBatch, will block in Spark job AssertOnQuery(_.status.isDataAvailable === true), AssertOnQuery(_.status.isTriggerActive === true), AssertOnQuery(_.status.message === "Processing new data"), AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0), - // Test status while batch processing has completed + // Test status and progress while batch processing has completed AdvanceManualClock(500), // time = 1100 to unblock job AssertOnQuery { _ => clock.getTimeMillis() === 1100 }, CheckAnswer(2), @@ -237,12 +238,32 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { true }, - // Test status after data is not available for a trigger + // Test status and progress after data is not available for a trigger AdvanceManualClock(100), // allow another trigger AssertStreamExecThreadToWaitForClock(), AssertOnQuery(_.status.isDataAvailable === false), AssertOnQuery(_.status.isTriggerActive === false), - AssertOnQuery(_.status.message === "Waiting for next trigger") + AssertOnQuery(_.status.message === "Waiting for next trigger"), + + // Test status and progress after query stopped + AssertOnQuery { query => + lastProgressBeforeStop = query.lastProgress + true + }, + StopStream, + AssertOnQuery(_.lastProgress.json === lastProgressBeforeStop.json), + AssertOnQuery(_.status.isDataAvailable === false), + AssertOnQuery(_.status.isTriggerActive === false), + AssertOnQuery(_.status.message === "Stopped"), + + // Test status and progress after query terminated with error + StartStream(ProcessingTime(100), triggerClock = clock), + AddData(inputData, 0), + AdvanceManualClock(100), + ExpectFailure[SparkException], + AssertOnQuery(_.status.isDataAvailable === false), + AssertOnQuery(_.status.isTriggerActive === false), + AssertOnQuery(_.status.message.startsWith("Terminated with exception")) ) } -- cgit v1.2.3