aboutsummaryrefslogtreecommitdiff
path: root/project
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-10-13 13:36:26 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-10-13 13:36:26 -0700
commit7106866c220c73960c6fe2a70e4911516617e21f (patch)
treeb468a9cd08d1f3ea8ef939f4b1d8e6b33cf76693 /project
parent08eac356095c7faa2b19d52f2fb0cbc47eb7d1d1 (diff)
downloadspark-7106866c220c73960c6fe2a70e4911516617e21f.tar.gz
spark-7106866c220c73960c6fe2a70e4911516617e21f.tar.bz2
spark-7106866c220c73960c6fe2a70e4911516617e21f.zip
[SPARK-17731][SQL][STREAMING] Metrics for structured streaming
## What changes were proposed in this pull request? Metrics are needed for monitoring structured streaming apps. Here is the design doc for implementing the necessary metrics. https://docs.google.com/document/d/1NIdcGuR1B3WIe8t7VxLrt58TJB4DtipWEbj5I_mzJys/edit?usp=sharing Specifically, this PR adds the following public APIs changes. ### New APIs - `StreamingQuery.status` returns a `StreamingQueryStatus` object (renamed from `StreamingQueryInfo`, see later) - `StreamingQueryStatus` has the following important fields - inputRate - Current rate (rows/sec) at which data is being generated by all the sources - processingRate - Current rate (rows/sec) at which the query is processing data from all the sources - ~~outputRate~~ - *Does not work with wholestage codegen* - latency - Current average latency between the data being available in source and the sink writing the corresponding output - sourceStatuses: Array[SourceStatus] - Current statuses of the sources - sinkStatus: SinkStatus - Current status of the sink - triggerStatus - Low-level detailed status of the last completed/currently active trigger - latencies - getOffset, getBatch, full trigger, wal writes - timestamps - trigger start, finish, after getOffset, after getBatch - numRows - input, output, state total/updated rows for aggregations - `SourceStatus` has the following important fields - inputRate - Current rate (rows/sec) at which data is being generated by the source - processingRate - Current rate (rows/sec) at which the query is processing data from the source - triggerStatus - Low-level detailed status of the last completed/currently active trigger - Python API for `StreamingQuery.status()` ### Breaking changes to existing APIs **Existing direct public facing APIs** - Deprecated direct public-facing APIs `StreamingQuery.sourceStatuses` and `StreamingQuery.sinkStatus` in favour of `StreamingQuery.status.sourceStatuses/sinkStatus`. - Branch 2.0 should have it deprecated, master should have it removed. **Existing advanced listener APIs** - `StreamingQueryInfo` renamed to `StreamingQueryStatus` for consistency with `SourceStatus`, `SinkStatus` - Earlier StreamingQueryInfo was used only in the advanced listener API, but now it is used in direct public-facing API (StreamingQuery.status) - Field `queryInfo` in listener events `QueryStarted`, `QueryProgress`, `QueryTerminated` changed have name `queryStatus` and return type `StreamingQueryStatus`. - Field `offsetDesc` in `SourceStatus` was Option[String], converted it to `String`. - For `SourceStatus` and `SinkStatus` made constructor private instead of private[sql] to make them more java-safe. Instead added `private[sql] object SourceStatus/SinkStatus.apply()` which are harder to accidentally use in Java. ## How was this patch tested? Old and new unit tests. - Rate calculation and other internal logic of StreamMetrics tested by StreamMetricsSuite. - New info in statuses returned through StreamingQueryListener is tested in StreamingQueryListenerSuite. - New and old info returned through StreamingQuery.status is tested in StreamingQuerySuite. - Source-specific tests for making sure input rows are counted are is source-specific test suites. - Additional tests to test minor additions in LocalTableScanExec, StateStore, etc. Metrics also manually tested using Ganglia sink Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #15307 from tdas/SPARK-17731.
Diffstat (limited to 'project')
-rw-r--r--project/MimaExcludes.scala13
1 files changed, 13 insertions, 0 deletions
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index ae72d37a0b..1349af4219 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -56,6 +56,19 @@ object MimaExcludes {
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.databaseExists"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.tableExists"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.functionExists"),
+
+ // [SPARK-17731][SQL][Streaming] Metrics for structured streaming
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.SourceStatus.this"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.SourceStatus.offsetDesc"),
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.status"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.SinkStatus.this"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryInfo"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryStarted.this"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryStarted.queryInfo"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryProgress.this"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryProgress.queryInfo"),
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryTerminated.queryInfo"),
+
// [SPARK-17338][SQL] add global temp view
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.dropGlobalTempView"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.catalog.Catalog.dropTempView"),