diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2016-11-29 17:24:17 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2016-11-29 17:24:17 -0800 |
commit | c3d08e2f29baeebe09bf4c059ace4336af9116b5 (patch) | |
tree | bc677be4760fdd9dfabcb5bb990c576fd5c65e12 /external | |
parent | 9a02f6821265ff67ba3f7b095cd1afaebd25a898 (diff) | |
download | spark-c3d08e2f29baeebe09bf4c059ace4336af9116b5.tar.gz spark-c3d08e2f29baeebe09bf4c059ace4336af9116b5.tar.bz2 spark-c3d08e2f29baeebe09bf4c059ace4336af9116b5.zip |
[SPARK-18516][SQL] Split state and progress in streaming
This PR separates the status of a `StreamingQuery` into two separate APIs:
- `status` - describes the status of a `StreamingQuery` at this moment, including what phase of processing is currently happening and if data is available.
- `recentProgress` - an array of statistics about the most recent microbatches that have executed.
A recent progress contains the following information:
```
{
"id" : "2be8670a-fce1-4859-a530-748f29553bb6",
"name" : "query-29",
"timestamp" : 1479705392724,
"inputRowsPerSecond" : 230.76923076923077,
"processedRowsPerSecond" : 10.869565217391303,
"durationMs" : {
"triggerExecution" : 276,
"queryPlanning" : 3,
"getBatch" : 5,
"getOffset" : 3,
"addBatch" : 234,
"walCommit" : 30
},
"currentWatermark" : 0,
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[topic-14]]",
"startOffset" : {
"topic-14" : {
"2" : 0,
"4" : 1,
"1" : 0,
"3" : 0,
"0" : 0
}
},
"endOffset" : {
"topic-14" : {
"2" : 1,
"4" : 2,
"1" : 0,
"3" : 0,
"0" : 1
}
},
"numRecords" : 3,
"inputRowsPerSecond" : 230.76923076923077,
"processedRowsPerSecond" : 10.869565217391303
} ]
}
```
Additionally, in order to make it possible to correlate progress updates across restarts, we change the `id` field from an integer that is unique with in the JVM to a `UUID` that is globally unique.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Michael Armbrust <michael@databricks.com>
Closes #15954 from marmbrus/queryProgress.
Diffstat (limited to 'external')
-rw-r--r-- | external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala | 7 |
1 files changed, 4 insertions, 3 deletions
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index e1af14f95d..2d6ccb22dd 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -442,12 +442,13 @@ class KafkaSourceSuite extends KafkaSourceTest { val mapped = kafka.map(kv => kv._2.toInt + 1) testStream(mapped)( + StartStream(trigger = ProcessingTime(1)), makeSureGetOffsetCalled, AddKafkaData(Set(topic), 1, 2, 3), CheckAnswer(2, 3, 4), - AssertOnLastQueryStatus { status => - assert(status.triggerDetails.get("numRows.input.total").toInt > 0) - assert(status.sourceStatuses(0).processingRate > 0.0) + AssertOnQuery { query => + val recordsRead = query.recentProgresses.map(_.numInputRows).sum + recordsRead == 3 } ) } |