diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2016-11-16 10:00:59 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-11-16 10:00:59 -0800 |
commit | 0048ce7ce64b02cbb6a1c4a2963a0b1b9541047e (patch) | |
tree | e267944042ecd6cc22a9078532baee2051c4a660 /sql | |
parent | 608ecc512b759514c75a1b475582f237ed569f10 (diff) | |
download | spark-0048ce7ce64b02cbb6a1c4a2963a0b1b9541047e.tar.gz spark-0048ce7ce64b02cbb6a1c4a2963a0b1b9541047e.tar.bz2 spark-0048ce7ce64b02cbb6a1c4a2963a0b1b9541047e.zip |
[SPARK-18459][SPARK-18460][STRUCTUREDSTREAMING] Rename triggerId to batchId and add triggerDetails to json in StreamingQueryStatus
## What changes were proposed in this pull request?
SPARK-18459: triggerId seems like a number that should be increasing with each trigger, whether or not there is data in it. However, actually, triggerId increases only where there is a batch of data in a trigger. So its better to rename it to batchId.
SPARK-18460: triggerDetails was missing from json representation. Fixed it.
## How was this patch tested?
Updated existing unit tests.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #15895 from tdas/SPARK-18459.
Diffstat (limited to 'sql')
5 files changed, 32 insertions, 14 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala index 5645554a58..942e6ed894 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala @@ -78,13 +78,13 @@ class StreamMetrics(sources: Set[Source], triggerClock: Clock, codahaleSourceNam // =========== Setter methods =========== - def reportTriggerStarted(triggerId: Long): Unit = synchronized { + def reportTriggerStarted(batchId: Long): Unit = synchronized { numInputRows.clear() triggerDetails.clear() sourceTriggerDetails.values.foreach(_.clear()) - reportTriggerDetail(TRIGGER_ID, triggerId) - sources.foreach(s => reportSourceTriggerDetail(s, TRIGGER_ID, triggerId)) + reportTriggerDetail(BATCH_ID, batchId) + sources.foreach(s => reportSourceTriggerDetail(s, BATCH_ID, batchId)) reportTriggerDetail(IS_TRIGGER_ACTIVE, true) currentTriggerStartTimestamp = triggerClock.getTimeMillis() reportTriggerDetail(START_TIMESTAMP, currentTriggerStartTimestamp) @@ -217,7 +217,7 @@ object StreamMetrics extends Logging { } - val TRIGGER_ID = "triggerId" + val BATCH_ID = "batchId" val IS_TRIGGER_ACTIVE = "isTriggerActive" val IS_DATA_PRESENT_IN_TRIGGER = "isDataPresentInTrigger" val STATUS_MESSAGE = "statusMessage" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala index 99c7729d02..ba732ff7fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala @@ -102,7 +102,7 @@ class StreamingQueryStatus private( ("inputRate" -> JDouble(inputRate)) ~ ("processingRate" -> JDouble(processingRate)) ~ ("latency" -> latency.map(JDouble).getOrElse(JNothing)) ~ - ("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala)) + ("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala)) ~ ("sourceStatuses" -> JArray(sourceStatuses.map(_.jsonValue).toList)) ~ ("sinkStatus" -> sinkStatus.jsonValue) } @@ -151,7 +151,7 @@ private[sql] object StreamingQueryStatus { desc = "MySink", offsetDesc = OffsetSeq(Some(LongOffset(1)) :: None :: Nil).toString), triggerDetails = Map( - TRIGGER_ID -> "5", + BATCH_ID -> "5", IS_TRIGGER_ACTIVE -> "true", IS_DATA_PRESENT_IN_TRIGGER -> "true", GET_OFFSET_LATENCY -> "10", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala index 938423db64..38c4ece439 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala @@ -50,10 +50,10 @@ class StreamMetricsSuite extends SparkFunSuite { assert(sm.currentSourceProcessingRate(source) === 0.0) assert(sm.currentLatency() === None) assert(sm.currentTriggerDetails() === - Map(TRIGGER_ID -> "1", IS_TRIGGER_ACTIVE -> "true", + Map(BATCH_ID -> "1", IS_TRIGGER_ACTIVE -> "true", START_TIMESTAMP -> "0", "key" -> "value")) assert(sm.currentSourceTriggerDetails(source) === - Map(TRIGGER_ID -> "1", "key2" -> "value2")) + Map(BATCH_ID -> "1", "key2" -> "value2")) // Finishing the trigger should calculate the rates, except input rate which needs // to have another trigger interval @@ -66,11 +66,11 @@ class StreamMetricsSuite extends SparkFunSuite { assert(sm.currentSourceProcessingRate(source) === 100.0) assert(sm.currentLatency() === None) assert(sm.currentTriggerDetails() === - Map(TRIGGER_ID -> "1", IS_TRIGGER_ACTIVE -> "false", + Map(BATCH_ID -> "1", IS_TRIGGER_ACTIVE -> "false", START_TIMESTAMP -> "0", FINISH_TIMESTAMP -> "1000", NUM_INPUT_ROWS -> "100", "key" -> "value")) assert(sm.currentSourceTriggerDetails(source) === - Map(TRIGGER_ID -> "1", NUM_SOURCE_INPUT_ROWS -> "100", "key2" -> "value2")) + Map(BATCH_ID -> "1", NUM_SOURCE_INPUT_ROWS -> "100", "key2" -> "value2")) // After another trigger starts, the rates and latencies should not change until // new rows are reported diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index cebb32a0a5..98f3bec708 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -84,7 +84,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { AssertOnLastQueryStatus { status: StreamingQueryStatus => // Check the correctness of the trigger info of the last completed batch reported by // onQueryProgress - assert(status.triggerDetails.containsKey("triggerId")) + assert(status.triggerDetails.containsKey("batchId")) assert(status.triggerDetails.get("isTriggerActive") === "false") assert(status.triggerDetails.get("isDataPresentInTrigger") === "true") @@ -104,7 +104,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { assert(status.triggerDetails.get("numRows.state.aggregation1.updated") === "1") assert(status.sourceStatuses.length === 1) - assert(status.sourceStatuses(0).triggerDetails.containsKey("triggerId")) + assert(status.sourceStatuses(0).triggerDetails.containsKey("batchId")) assert(status.sourceStatuses(0).triggerDetails.get("latency.getOffset.source") === "100") assert(status.sourceStatuses(0).triggerDetails.get("latency.getBatch.source") === "200") assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "2") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala index 6af19fb0c2..50a7d92ede 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala @@ -48,12 +48,12 @@ class StreamingQueryStatusSuite extends SparkFunSuite { | Processing rate 23.5 rows/sec | Latency: 345.0 ms | Trigger details: + | batchId: 5 | isDataPresentInTrigger: true | isTriggerActive: true | latency.getBatch.total: 20 | latency.getOffset.total: 10 | numRows.input.total: 100 - | triggerId: 5 | Source statuses [1 source]: | Source 1 - MySource1 | Available offset: 0 @@ -72,7 +72,11 @@ class StreamingQueryStatusSuite extends SparkFunSuite { test("json") { assert(StreamingQueryStatus.testStatus.json === """ - |{"sourceStatuses":[{"description":"MySource1","offsetDesc":"0","inputRate":15.5, + |{"name":"query","id":1,"timestamp":123,"inputRate":15.5,"processingRate":23.5, + |"latency":345.0,"triggerDetails":{"latency.getBatch.total":"20", + |"numRows.input.total":"100","isTriggerActive":"true","batchId":"5", + |"latency.getOffset.total":"10","isDataPresentInTrigger":"true"}, + |"sourceStatuses":[{"description":"MySource1","offsetDesc":"0","inputRate":15.5, |"processingRate":23.5,"triggerDetails":{"numRows.input.source":"100", |"latency.getOffset.source":"10","latency.getBatch.source":"20"}}], |"sinkStatus":{"description":"MySink","offsetDesc":"[1, -]"}} @@ -84,6 +88,20 @@ class StreamingQueryStatusSuite extends SparkFunSuite { StreamingQueryStatus.testStatus.prettyJson === """ |{ + | "name" : "query", + | "id" : 1, + | "timestamp" : 123, + | "inputRate" : 15.5, + | "processingRate" : 23.5, + | "latency" : 345.0, + | "triggerDetails" : { + | "latency.getBatch.total" : "20", + | "numRows.input.total" : "100", + | "isTriggerActive" : "true", + | "batchId" : "5", + | "latency.getOffset.total" : "10", + | "isDataPresentInTrigger" : "true" + | }, | "sourceStatuses" : [ { | "description" : "MySource1", | "offsetDesc" : "0", |