diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2016-10-13 13:36:26 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2016-10-13 13:36:26 -0700 |
commit | 7106866c220c73960c6fe2a70e4911516617e21f (patch) | |
tree | b468a9cd08d1f3ea8ef939f4b1d8e6b33cf76693 /sql/catalyst | |
parent | 08eac356095c7faa2b19d52f2fb0cbc47eb7d1d1 (diff) | |
download | spark-7106866c220c73960c6fe2a70e4911516617e21f.tar.gz spark-7106866c220c73960c6fe2a70e4911516617e21f.tar.bz2 spark-7106866c220c73960c6fe2a70e4911516617e21f.zip |
[SPARK-17731][SQL][STREAMING] Metrics for structured streaming
## What changes were proposed in this pull request?
Metrics are needed for monitoring structured streaming apps. Here is the design doc for implementing the necessary metrics.
https://docs.google.com/document/d/1NIdcGuR1B3WIe8t7VxLrt58TJB4DtipWEbj5I_mzJys/edit?usp=sharing
Specifically, this PR adds the following public APIs changes.
### New APIs
- `StreamingQuery.status` returns a `StreamingQueryStatus` object (renamed from `StreamingQueryInfo`, see later)
- `StreamingQueryStatus` has the following important fields
- inputRate - Current rate (rows/sec) at which data is being generated by all the sources
- processingRate - Current rate (rows/sec) at which the query is processing data from
all the sources
- ~~outputRate~~ - *Does not work with wholestage codegen*
- latency - Current average latency between the data being available in source and the sink writing the corresponding output
- sourceStatuses: Array[SourceStatus] - Current statuses of the sources
- sinkStatus: SinkStatus - Current status of the sink
- triggerStatus - Low-level detailed status of the last completed/currently active trigger
- latencies - getOffset, getBatch, full trigger, wal writes
- timestamps - trigger start, finish, after getOffset, after getBatch
- numRows - input, output, state total/updated rows for aggregations
- `SourceStatus` has the following important fields
- inputRate - Current rate (rows/sec) at which data is being generated by the source
- processingRate - Current rate (rows/sec) at which the query is processing data from the source
- triggerStatus - Low-level detailed status of the last completed/currently active trigger
- Python API for `StreamingQuery.status()`
### Breaking changes to existing APIs
**Existing direct public facing APIs**
- Deprecated direct public-facing APIs `StreamingQuery.sourceStatuses` and `StreamingQuery.sinkStatus` in favour of `StreamingQuery.status.sourceStatuses/sinkStatus`.
- Branch 2.0 should have it deprecated, master should have it removed.
**Existing advanced listener APIs**
- `StreamingQueryInfo` renamed to `StreamingQueryStatus` for consistency with `SourceStatus`, `SinkStatus`
- Earlier StreamingQueryInfo was used only in the advanced listener API, but now it is used in direct public-facing API (StreamingQuery.status)
- Field `queryInfo` in listener events `QueryStarted`, `QueryProgress`, `QueryTerminated` changed have name `queryStatus` and return type `StreamingQueryStatus`.
- Field `offsetDesc` in `SourceStatus` was Option[String], converted it to `String`.
- For `SourceStatus` and `SinkStatus` made constructor private instead of private[sql] to make them more java-safe. Instead added `private[sql] object SourceStatus/SinkStatus.apply()` which are harder to accidentally use in Java.
## How was this patch tested?
Old and new unit tests.
- Rate calculation and other internal logic of StreamMetrics tested by StreamMetricsSuite.
- New info in statuses returned through StreamingQueryListener is tested in StreamingQueryListenerSuite.
- New and old info returned through StreamingQuery.status is tested in StreamingQuerySuite.
- Source-specific tests for making sure input rows are counted are is source-specific test suites.
- Additional tests to test minor additions in LocalTableScanExec, StateStore, etc.
Metrics also manually tested using Ganglia sink
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #15307 from tdas/SPARK-17731.
Diffstat (limited to 'sql/catalyst')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala | 7 |
1 files changed, 7 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 83cb375525..ea8d8fef7b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -165,6 +165,13 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { } /** + * Returns a Seq containing the leaves in this tree. + */ + def collectLeaves(): Seq[BaseType] = { + this.collect { case p if p.children.isEmpty => p } + } + + /** * Finds and returns the first [[TreeNode]] of the tree for which the given partial function * is defined (pre-order), and applies the partial function to it. */ |