aboutsummaryrefslogtreecommitdiff
path: root/docs
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-11-16 11:03:10 -0800
committerMichael Armbrust <michael@databricks.com>2016-11-16 11:03:10 -0800
commitbb6cdfd9a6a6b6c91aada7c3174436146045ed1e (patch)
treec96d62195c806ed39051cadc6149c11491a4a615 /docs
parent0048ce7ce64b02cbb6a1c4a2963a0b1b9541047e (diff)
downloadspark-bb6cdfd9a6a6b6c91aada7c3174436146045ed1e.tar.gz
spark-bb6cdfd9a6a6b6c91aada7c3174436146045ed1e.tar.bz2
spark-bb6cdfd9a6a6b6c91aada7c3174436146045ed1e.zip
[SPARK-18461][DOCS][STRUCTUREDSTREAMING] Added more information about monitoring streaming queries
## What changes were proposed in this pull request? <img width="941" alt="screen shot 2016-11-15 at 6 27 32 pm" src="https://cloud.githubusercontent.com/assets/663212/20332521/4190b858-ab61-11e6-93a6-4bdc05105ed9.png"> <img width="940" alt="screen shot 2016-11-15 at 6 27 45 pm" src="https://cloud.githubusercontent.com/assets/663212/20332525/44a0d01e-ab61-11e6-8668-47f925490d4f.png"> Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #15897 from tdas/SPARK-18461.
Diffstat (limited to 'docs')
-rw-r--r--docs/structured-streaming-programming-guide.md182
1 files changed, 179 insertions, 3 deletions
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index d2545584ae..77b66b3b3a 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -1087,9 +1087,185 @@ spark.streams().awaitAnyTermination() # block until any one of them terminates
</div>
</div>
-Finally, for asynchronous monitoring of streaming queries, you can create and attach a `StreamingQueryListener`
-([Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryListener)/[Java](api/java/org/apache/spark/sql/streaming/StreamingQueryListener.html) docs),
-which will give you regular callback-based updates when queries are started and terminated.
+
+## Monitoring Streaming Queries
+There are two ways you can monitor queries. You can directly get the current status
+of an active query using `streamingQuery.status`, which will return a `StreamingQueryStatus` object
+([Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryStatus)/[Java](api/java/org/apache/spark/sql/streaming/StreamingQueryStatus.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.StreamingQueryStatus) docs)
+that has all the details like current ingestion rates, processing rates, average latency,
+details of the currently active trigger, etc.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+
+{% highlight scala %}
+val query: StreamingQuery = ...
+
+println(query.status)
+
+/* Will print the current status of the query
+
+Status of query 'queryName'
+ Query id: 1
+ Status timestamp: 123
+ Input rate: 15.5 rows/sec
+ Processing rate 23.5 rows/sec
+ Latency: 345.0 ms
+ Trigger details:
+ batchId: 5
+ isDataPresentInTrigger: true
+ isTriggerActive: true
+ latency.getBatch.total: 20
+ latency.getOffset.total: 10
+ numRows.input.total: 100
+ Source statuses [1 source]:
+ Source 1 - MySource1
+ Available offset: 0
+ Input rate: 15.5 rows/sec
+ Processing rate: 23.5 rows/sec
+ Trigger details:
+ numRows.input.source: 100
+ latency.getOffset.source: 10
+ latency.getBatch.source: 20
+ Sink status - MySink
+ Committed offsets: [1, -]
+*/
+{% endhighlight %}
+
+</div>
+<div data-lang="java" markdown="1">
+
+{% highlight java %}
+StreamingQuery query = ...
+
+System.out.println(query.status);
+
+/* Will print the current status of the query
+
+Status of query 'queryName'
+ Query id: 1
+ Status timestamp: 123
+ Input rate: 15.5 rows/sec
+ Processing rate 23.5 rows/sec
+ Latency: 345.0 ms
+ Trigger details:
+ batchId: 5
+ isDataPresentInTrigger: true
+ isTriggerActive: true
+ latency.getBatch.total: 20
+ latency.getOffset.total: 10
+ numRows.input.total: 100
+ Source statuses [1 source]:
+ Source 1 - MySource1
+ Available offset: 0
+ Input rate: 15.5 rows/sec
+ Processing rate: 23.5 rows/sec
+ Trigger details:
+ numRows.input.source: 100
+ latency.getOffset.source: 10
+ latency.getBatch.source: 20
+ Sink status - MySink
+ Committed offsets: [1, -]
+*/
+{% endhighlight %}
+
+</div>
+<div data-lang="python" markdown="1">
+
+{% highlight python %}
+query = ... // a StreamingQuery
+
+print(query.status)
+
+'''
+Will print the current status of the query
+
+Status of query 'queryName'
+ Query id: 1
+ Status timestamp: 123
+ Input rate: 15.5 rows/sec
+ Processing rate 23.5 rows/sec
+ Latency: 345.0 ms
+ Trigger details:
+ batchId: 5
+ isDataPresentInTrigger: true
+ isTriggerActive: true
+ latency.getBatch.total: 20
+ latency.getOffset.total: 10
+ numRows.input.total: 100
+ Source statuses [1 source]:
+ Source 1 - MySource1
+ Available offset: 0
+ Input rate: 15.5 rows/sec
+ Processing rate: 23.5 rows/sec
+ Trigger details:
+ numRows.input.source: 100
+ latency.getOffset.source: 10
+ latency.getBatch.source: 20
+ Sink status - MySink
+ Committed offsets: [1, -]
+'''
+{% endhighlight %}
+
+</div>
+</div>
+
+
+You can also asynchronously monitor all queries associated with a
+`SparkSession` by attaching a `StreamingQueryListener`
+([Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryListener)/[Java](api/java/org/apache/spark/sql/streaming/StreamingQueryListener.html) docs).
+Once you attach your custom `StreamingQueryListener` object with
+`sparkSession.streams.attachListener()`, you will get callbacks when a query is started and
+stopped and when there is progress made in an active query. Here is an example,
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+
+{% highlight scala %}
+val spark: SparkSession = ...
+
+spark.streams.addListener(new StreamingQueryListener() {
+
+ override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
+ println("Query started: " + queryTerminated.queryStatus.name)
+ }
+ override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
+ println("Query terminated: " + queryTerminated.queryStatus.name)
+ }
+ override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
+ println("Query made progress: " + queryProgress.queryStatus)
+ }
+})
+{% endhighlight %}
+
+</div>
+<div data-lang="java" markdown="1">
+
+{% highlight java %}
+SparkSession spark = ...
+
+spark.streams.addListener(new StreamingQueryListener() {
+
+ @Overrides void onQueryStarted(QueryStartedEvent queryStarted) {
+ System.out.println("Query started: " + queryTerminated.queryStatus.name);
+ }
+ @Overrides void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
+ System.out.println("Query terminated: " + queryTerminated.queryStatus.name);
+ }
+ @Overrides void onQueryProgress(QueryProgressEvent queryProgress) {
+ System.out.println("Query made progress: " + queryProgress.queryStatus);
+ }
+});
+{% endhighlight %}
+
+</div>
+<div data-lang="python" markdown="1">
+{% highlight bash %}
+Not available in Python.
+{% endhighlight %}
+
+</div>
+</div>
## Recovering from Failures with Checkpointing
In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information (i.e. range of offsets processed in each trigger) and the running aggregates (e.g. word counts in the [quick example](#quick-example)) to the checkpoint location. As of Spark 2.0, this checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when [starting a query](#starting-streaming-queries).