aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/pyspark/sql/streaming.py6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala22
6 files changed, 35 insertions, 17 deletions
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index f326f16232..0e4589be97 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -212,12 +212,12 @@ class StreamingQueryStatus(object):
Processing rate 23.5 rows/sec
Latency: 345.0 ms
Trigger details:
+ batchId: 5
isDataPresentInTrigger: true
isTriggerActive: true
latency.getBatch.total: 20
latency.getOffset.total: 10
numRows.input.total: 100
- triggerId: 5
Source statuses [1 source]:
Source 1 - MySource1
Available offset: 0
@@ -341,8 +341,8 @@ class StreamingQueryStatus(object):
If no trigger is currently active, then it will have details of the last completed trigger.
>>> sqs.triggerDetails
- {u'triggerId': u'5', u'latency.getBatch.total': u'20', u'numRows.input.total': u'100',
- u'isTriggerActive': u'true', u'latency.getOffset.total': u'10',
+ {u'latency.getBatch.total': u'20', u'numRows.input.total': u'100',
+ u'isTriggerActive': u'true', u'batchId': u'5', u'latency.getOffset.total': u'10',
u'isDataPresentInTrigger': u'true'}
"""
return self._jsqs.triggerDetails()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
index 5645554a58..942e6ed894 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
@@ -78,13 +78,13 @@ class StreamMetrics(sources: Set[Source], triggerClock: Clock, codahaleSourceNam
// =========== Setter methods ===========
- def reportTriggerStarted(triggerId: Long): Unit = synchronized {
+ def reportTriggerStarted(batchId: Long): Unit = synchronized {
numInputRows.clear()
triggerDetails.clear()
sourceTriggerDetails.values.foreach(_.clear())
- reportTriggerDetail(TRIGGER_ID, triggerId)
- sources.foreach(s => reportSourceTriggerDetail(s, TRIGGER_ID, triggerId))
+ reportTriggerDetail(BATCH_ID, batchId)
+ sources.foreach(s => reportSourceTriggerDetail(s, BATCH_ID, batchId))
reportTriggerDetail(IS_TRIGGER_ACTIVE, true)
currentTriggerStartTimestamp = triggerClock.getTimeMillis()
reportTriggerDetail(START_TIMESTAMP, currentTriggerStartTimestamp)
@@ -217,7 +217,7 @@ object StreamMetrics extends Logging {
}
- val TRIGGER_ID = "triggerId"
+ val BATCH_ID = "batchId"
val IS_TRIGGER_ACTIVE = "isTriggerActive"
val IS_DATA_PRESENT_IN_TRIGGER = "isDataPresentInTrigger"
val STATUS_MESSAGE = "statusMessage"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
index 99c7729d02..ba732ff7fc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
@@ -102,7 +102,7 @@ class StreamingQueryStatus private(
("inputRate" -> JDouble(inputRate)) ~
("processingRate" -> JDouble(processingRate)) ~
("latency" -> latency.map(JDouble).getOrElse(JNothing)) ~
- ("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala))
+ ("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala)) ~
("sourceStatuses" -> JArray(sourceStatuses.map(_.jsonValue).toList)) ~
("sinkStatus" -> sinkStatus.jsonValue)
}
@@ -151,7 +151,7 @@ private[sql] object StreamingQueryStatus {
desc = "MySink",
offsetDesc = OffsetSeq(Some(LongOffset(1)) :: None :: Nil).toString),
triggerDetails = Map(
- TRIGGER_ID -> "5",
+ BATCH_ID -> "5",
IS_TRIGGER_ACTIVE -> "true",
IS_DATA_PRESENT_IN_TRIGGER -> "true",
GET_OFFSET_LATENCY -> "10",
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala
index 938423db64..38c4ece439 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala
@@ -50,10 +50,10 @@ class StreamMetricsSuite extends SparkFunSuite {
assert(sm.currentSourceProcessingRate(source) === 0.0)
assert(sm.currentLatency() === None)
assert(sm.currentTriggerDetails() ===
- Map(TRIGGER_ID -> "1", IS_TRIGGER_ACTIVE -> "true",
+ Map(BATCH_ID -> "1", IS_TRIGGER_ACTIVE -> "true",
START_TIMESTAMP -> "0", "key" -> "value"))
assert(sm.currentSourceTriggerDetails(source) ===
- Map(TRIGGER_ID -> "1", "key2" -> "value2"))
+ Map(BATCH_ID -> "1", "key2" -> "value2"))
// Finishing the trigger should calculate the rates, except input rate which needs
// to have another trigger interval
@@ -66,11 +66,11 @@ class StreamMetricsSuite extends SparkFunSuite {
assert(sm.currentSourceProcessingRate(source) === 100.0)
assert(sm.currentLatency() === None)
assert(sm.currentTriggerDetails() ===
- Map(TRIGGER_ID -> "1", IS_TRIGGER_ACTIVE -> "false",
+ Map(BATCH_ID -> "1", IS_TRIGGER_ACTIVE -> "false",
START_TIMESTAMP -> "0", FINISH_TIMESTAMP -> "1000",
NUM_INPUT_ROWS -> "100", "key" -> "value"))
assert(sm.currentSourceTriggerDetails(source) ===
- Map(TRIGGER_ID -> "1", NUM_SOURCE_INPUT_ROWS -> "100", "key2" -> "value2"))
+ Map(BATCH_ID -> "1", NUM_SOURCE_INPUT_ROWS -> "100", "key2" -> "value2"))
// After another trigger starts, the rates and latencies should not change until
// new rows are reported
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index cebb32a0a5..98f3bec708 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -84,7 +84,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
AssertOnLastQueryStatus { status: StreamingQueryStatus =>
// Check the correctness of the trigger info of the last completed batch reported by
// onQueryProgress
- assert(status.triggerDetails.containsKey("triggerId"))
+ assert(status.triggerDetails.containsKey("batchId"))
assert(status.triggerDetails.get("isTriggerActive") === "false")
assert(status.triggerDetails.get("isDataPresentInTrigger") === "true")
@@ -104,7 +104,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(status.triggerDetails.get("numRows.state.aggregation1.updated") === "1")
assert(status.sourceStatuses.length === 1)
- assert(status.sourceStatuses(0).triggerDetails.containsKey("triggerId"))
+ assert(status.sourceStatuses(0).triggerDetails.containsKey("batchId"))
assert(status.sourceStatuses(0).triggerDetails.get("latency.getOffset.source") === "100")
assert(status.sourceStatuses(0).triggerDetails.get("latency.getBatch.source") === "200")
assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "2")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
index 6af19fb0c2..50a7d92ede 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
@@ -48,12 +48,12 @@ class StreamingQueryStatusSuite extends SparkFunSuite {
| Processing rate 23.5 rows/sec
| Latency: 345.0 ms
| Trigger details:
+ | batchId: 5
| isDataPresentInTrigger: true
| isTriggerActive: true
| latency.getBatch.total: 20
| latency.getOffset.total: 10
| numRows.input.total: 100
- | triggerId: 5
| Source statuses [1 source]:
| Source 1 - MySource1
| Available offset: 0
@@ -72,7 +72,11 @@ class StreamingQueryStatusSuite extends SparkFunSuite {
test("json") {
assert(StreamingQueryStatus.testStatus.json ===
"""
- |{"sourceStatuses":[{"description":"MySource1","offsetDesc":"0","inputRate":15.5,
+ |{"name":"query","id":1,"timestamp":123,"inputRate":15.5,"processingRate":23.5,
+ |"latency":345.0,"triggerDetails":{"latency.getBatch.total":"20",
+ |"numRows.input.total":"100","isTriggerActive":"true","batchId":"5",
+ |"latency.getOffset.total":"10","isDataPresentInTrigger":"true"},
+ |"sourceStatuses":[{"description":"MySource1","offsetDesc":"0","inputRate":15.5,
|"processingRate":23.5,"triggerDetails":{"numRows.input.source":"100",
|"latency.getOffset.source":"10","latency.getBatch.source":"20"}}],
|"sinkStatus":{"description":"MySink","offsetDesc":"[1, -]"}}
@@ -84,6 +88,20 @@ class StreamingQueryStatusSuite extends SparkFunSuite {
StreamingQueryStatus.testStatus.prettyJson ===
"""
|{
+ | "name" : "query",
+ | "id" : 1,
+ | "timestamp" : 123,
+ | "inputRate" : 15.5,
+ | "processingRate" : 23.5,
+ | "latency" : 345.0,
+ | "triggerDetails" : {
+ | "latency.getBatch.total" : "20",
+ | "numRows.input.total" : "100",
+ | "isTriggerActive" : "true",
+ | "batchId" : "5",
+ | "latency.getOffset.total" : "10",
+ | "isDataPresentInTrigger" : "true"
+ | },
| "sourceStatuses" : [ {
| "description" : "MySource1",
| "offsetDesc" : "0",