aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10-sql
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-10-13 13:36:26 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-10-13 13:36:26 -0700
commit7106866c220c73960c6fe2a70e4911516617e21f (patch)
treeb468a9cd08d1f3ea8ef939f4b1d8e6b33cf76693 /external/kafka-0-10-sql
parent08eac356095c7faa2b19d52f2fb0cbc47eb7d1d1 (diff)
downloadspark-7106866c220c73960c6fe2a70e4911516617e21f.tar.gz
spark-7106866c220c73960c6fe2a70e4911516617e21f.tar.bz2
spark-7106866c220c73960c6fe2a70e4911516617e21f.zip
[SPARK-17731][SQL][STREAMING] Metrics for structured streaming
## What changes were proposed in this pull request? Metrics are needed for monitoring structured streaming apps. Here is the design doc for implementing the necessary metrics. https://docs.google.com/document/d/1NIdcGuR1B3WIe8t7VxLrt58TJB4DtipWEbj5I_mzJys/edit?usp=sharing Specifically, this PR adds the following public APIs changes. ### New APIs - `StreamingQuery.status` returns a `StreamingQueryStatus` object (renamed from `StreamingQueryInfo`, see later) - `StreamingQueryStatus` has the following important fields - inputRate - Current rate (rows/sec) at which data is being generated by all the sources - processingRate - Current rate (rows/sec) at which the query is processing data from all the sources - ~~outputRate~~ - *Does not work with wholestage codegen* - latency - Current average latency between the data being available in source and the sink writing the corresponding output - sourceStatuses: Array[SourceStatus] - Current statuses of the sources - sinkStatus: SinkStatus - Current status of the sink - triggerStatus - Low-level detailed status of the last completed/currently active trigger - latencies - getOffset, getBatch, full trigger, wal writes - timestamps - trigger start, finish, after getOffset, after getBatch - numRows - input, output, state total/updated rows for aggregations - `SourceStatus` has the following important fields - inputRate - Current rate (rows/sec) at which data is being generated by the source - processingRate - Current rate (rows/sec) at which the query is processing data from the source - triggerStatus - Low-level detailed status of the last completed/currently active trigger - Python API for `StreamingQuery.status()` ### Breaking changes to existing APIs **Existing direct public facing APIs** - Deprecated direct public-facing APIs `StreamingQuery.sourceStatuses` and `StreamingQuery.sinkStatus` in favour of `StreamingQuery.status.sourceStatuses/sinkStatus`. - Branch 2.0 should have it deprecated, master should have it removed. **Existing advanced listener APIs** - `StreamingQueryInfo` renamed to `StreamingQueryStatus` for consistency with `SourceStatus`, `SinkStatus` - Earlier StreamingQueryInfo was used only in the advanced listener API, but now it is used in direct public-facing API (StreamingQuery.status) - Field `queryInfo` in listener events `QueryStarted`, `QueryProgress`, `QueryTerminated` changed have name `queryStatus` and return type `StreamingQueryStatus`. - Field `offsetDesc` in `SourceStatus` was Option[String], converted it to `String`. - For `SourceStatus` and `SinkStatus` made constructor private instead of private[sql] to make them more java-safe. Instead added `private[sql] object SourceStatus/SinkStatus.apply()` which are harder to accidentally use in Java. ## How was this patch tested? Old and new unit tests. - Rate calculation and other internal logic of StreamMetrics tested by StreamMetricsSuite. - New info in statuses returned through StreamingQueryListener is tested in StreamingQueryListenerSuite. - New and old info returned through StreamingQuery.status is tested in StreamingQuerySuite. - Source-specific tests for making sure input rows are counted are is source-specific test suites. - Additional tests to test minor additions in LocalTableScanExec, StateStore, etc. Metrics also manually tested using Ganglia sink Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #15307 from tdas/SPARK-17731.
Diffstat (limited to 'external/kafka-0-10-sql')
-rw-r--r--external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala27
1 files changed, 27 insertions, 0 deletions
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index c640b93b0a..8b5296ea13 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -264,6 +264,33 @@ class KafkaSourceSuite extends KafkaSourceTest {
testUnsupportedConfig("kafka.auto.offset.reset", "latest")
}
+ test("input row metrics") {
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 5)
+ testUtils.sendMessages(topic, Array("-1"))
+ require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+ val kafka = spark
+ .readStream
+ .format("kafka")
+ .option("subscribe", topic)
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+
+ val mapped = kafka.map(kv => kv._2.toInt + 1)
+ testStream(mapped)(
+ makeSureGetOffsetCalled,
+ AddKafkaData(Set(topic), 1, 2, 3),
+ CheckAnswer(2, 3, 4),
+ AssertOnLastQueryStatus { status =>
+ assert(status.triggerDetails.get("numRows.input.total").toInt > 0)
+ assert(status.sourceStatuses(0).processingRate > 0.0)
+ }
+ )
+ }
+
private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
private def testFromLatestOffsets(topic: String, options: (String, String)*): Unit = {