aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-11-29 17:24:17 -0800
committerMichael Armbrust <michael@databricks.com>2016-11-29 17:24:17 -0800
commitc3d08e2f29baeebe09bf4c059ace4336af9116b5 (patch)
treebc677be4760fdd9dfabcb5bb990c576fd5c65e12 /external
parent9a02f6821265ff67ba3f7b095cd1afaebd25a898 (diff)
downloadspark-c3d08e2f29baeebe09bf4c059ace4336af9116b5.tar.gz
spark-c3d08e2f29baeebe09bf4c059ace4336af9116b5.tar.bz2
spark-c3d08e2f29baeebe09bf4c059ace4336af9116b5.zip
[SPARK-18516][SQL] Split state and progress in streaming
This PR separates the status of a `StreamingQuery` into two separate APIs: - `status` - describes the status of a `StreamingQuery` at this moment, including what phase of processing is currently happening and if data is available. - `recentProgress` - an array of statistics about the most recent microbatches that have executed. A recent progress contains the following information: ``` { "id" : "2be8670a-fce1-4859-a530-748f29553bb6", "name" : "query-29", "timestamp" : 1479705392724, "inputRowsPerSecond" : 230.76923076923077, "processedRowsPerSecond" : 10.869565217391303, "durationMs" : { "triggerExecution" : 276, "queryPlanning" : 3, "getBatch" : 5, "getOffset" : 3, "addBatch" : 234, "walCommit" : 30 }, "currentWatermark" : 0, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaSource[Subscribe[topic-14]]", "startOffset" : { "topic-14" : { "2" : 0, "4" : 1, "1" : 0, "3" : 0, "0" : 0 } }, "endOffset" : { "topic-14" : { "2" : 1, "4" : 2, "1" : 0, "3" : 0, "0" : 1 } }, "numRecords" : 3, "inputRowsPerSecond" : 230.76923076923077, "processedRowsPerSecond" : 10.869565217391303 } ] } ``` Additionally, in order to make it possible to correlate progress updates across restarts, we change the `id` field from an integer that is unique with in the JVM to a `UUID` that is globally unique. Author: Tathagata Das <tathagata.das1565@gmail.com> Author: Michael Armbrust <michael@databricks.com> Closes #15954 from marmbrus/queryProgress.
Diffstat (limited to 'external')
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala7
1 files changed, 4 insertions, 3 deletions
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index e1af14f95d..2d6ccb22dd 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -442,12 +442,13 @@ class KafkaSourceSuite extends KafkaSourceTest {
val mapped = kafka.map(kv => kv._2.toInt + 1)
testStream(mapped)(
+ StartStream(trigger = ProcessingTime(1)),
makeSureGetOffsetCalled,
AddKafkaData(Set(topic), 1, 2, 3),
CheckAnswer(2, 3, 4),
- AssertOnLastQueryStatus { status =>
- assert(status.triggerDetails.get("numRows.input.total").toInt > 0)
- assert(status.sourceStatuses(0).processingRate > 0.0)
+ AssertOnQuery { query =>
+ val recordsRead = query.recentProgresses.map(_.numInputRows).sum
+ recordsRead == 3
}
)
}