aboutsummaryrefslogtreecommitdiff
path: root/docs/structured-streaming-programming-guide.md
diff options
context:
space:
mode:
authorLiwei Lin <lwlin7@gmail.com>2017-01-02 14:40:06 +0000
committerSean Owen <sowen@cloudera.com>2017-01-02 14:40:06 +0000
commit808b84e2de3537a47dc1c5f426a89dd4ec6bf0c5 (patch)
treec6f8a6b5970d42801981f29edced2656ab5f18dd /docs/structured-streaming-programming-guide.md
parentba4881268e0c3b9dcf99b3077e6394259c705d55 (diff)
downloadspark-808b84e2de3537a47dc1c5f426a89dd4ec6bf0c5.tar.gz
spark-808b84e2de3537a47dc1c5f426a89dd4ec6bf0c5.tar.bz2
spark-808b84e2de3537a47dc1c5f426a89dd4ec6bf0c5.zip
[SPARK-19041][SS] Fix code snippet compilation issues in Structured Streaming Programming Guide
## What changes were proposed in this pull request? Currently some code snippets in the programming guide just do not compile. We should fix them. ## How was this patch tested? ``` SKIP_API=1 jekyll build ``` ## Screenshot from part of the change: ![snip20161231_37](https://cloud.githubusercontent.com/assets/15843379/21576864/cc52fcd8-cf7b-11e6-8bd6-f935d9ff4a6b.png) Author: Liwei Lin <lwlin7@gmail.com> Closes #16442 from lw-lin/ss-pro-guide-.
Diffstat (limited to 'docs/structured-streaming-programming-guide.md')
-rw-r--r--docs/structured-streaming-programming-guide.md87
1 files changed, 51 insertions, 36 deletions
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 3b7d0c4003..799f636505 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -537,9 +537,9 @@ Most of the common operations on DataFrame/Dataset are supported for streaming.
<div data-lang="scala" markdown="1">
{% highlight scala %}
-case class DeviceData(device: String, type: String, signal: Double, time: DateTime)
+case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)
-val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: string }
+val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData] // streaming Dataset with IOT device data
// Select the devices which have signal more than 10
@@ -547,11 +547,11 @@ df.select("device").where("signal > 10") // using untyped APIs
ds.filter(_.signal > 10).map(_.device) // using typed APIs
// Running count of the number of updates for each device type
-df.groupBy("type").count() // using untyped API
+df.groupBy("deviceType").count() // using untyped API
// Running average signal for each device type
-import org.apache.spark.sql.expressions.scalalang.typed._
-ds.groupByKey(_.type).agg(typed.avg(_.signal)) // using typed API
+import org.apache.spark.sql.expressions.scalalang.typed
+ds.groupByKey(_.deviceType).agg(typed.avg(_.signal)) // using typed API
{% endhighlight %}
</div>
@@ -565,7 +565,7 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
public class DeviceData {
private String device;
- private String type;
+ private String deviceType;
private Double signal;
private java.sql.Date time;
...
@@ -590,13 +590,13 @@ ds.filter(new FilterFunction<DeviceData>() { // using typed APIs
}, Encoders.STRING());
// Running count of the number of updates for each device type
-df.groupBy("type").count(); // using untyped API
+df.groupBy("deviceType").count(); // using untyped API
// Running average signal for each device type
ds.groupByKey(new MapFunction<DeviceData, String>() { // using typed API
@Override
public String call(DeviceData value) throws Exception {
- return value.getType();
+ return value.getDeviceType();
}
}, Encoders.STRING()).agg(typed.avg(new MapFunction<DeviceData, Double>() {
@Override
@@ -611,13 +611,13 @@ ds.groupByKey(new MapFunction<DeviceData, String>() { // using typed API
<div data-lang="python" markdown="1">
{% highlight python %}
-df = ... # streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType }
+df = ... # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }
# Select the devices which have signal more than 10
df.select("device").where("signal > 10")
# Running count of the number of updates for each device type
-df.groupBy("type").count()
+df.groupBy("deviceType").count()
{% endhighlight %}
</div>
</div>
@@ -973,7 +973,7 @@ Here is a table of all the sinks, and the corresponding settings.
<tr>
<td><b>File Sink</b></td>
<td>Append</td>
- <td><pre>writeStream<br/> .format("parquet")<br/> .start()</pre></td>
+ <td><pre>writeStream<br/> .format("parquet")<br/> .option(<br/> "checkpointLocation",<br/> "path/to/checkpoint/dir")<br/> .option(<br/> "path",<br/> "path/to/destination/dir")<br/> .start()</pre></td>
<td>Yes</td>
<td>Supports writes to partitioned tables. Partitioning by time may be useful.</td>
</tr>
@@ -1026,7 +1026,9 @@ noAggDF
// Write new data to Parquet files
noAggDF
.writeStream
- .parquet("path/to/destination/directory")
+ .format("parquet")
+ .option("checkpointLocation", "path/to/checkpoint/dir")
+ .option("path", "path/to/destination/dir")
.start()
// ========== DF with aggregation ==========
@@ -1066,7 +1068,9 @@ noAggDF
// Write new data to Parquet files
noAggDF
.writeStream()
- .parquet("path/to/destination/directory")
+ .format("parquet")
+ .option("checkpointLocation", "path/to/checkpoint/dir")
+ .option("path", "path/to/destination/dir")
.start();
// ========== DF with aggregation ==========
@@ -1106,7 +1110,9 @@ noAggDF \
# Write new data to Parquet files
noAggDF \
.writeStream() \
- .parquet("path/to/destination/directory") \
+ .format("parquet") \
+ .option("checkpointLocation", "path/to/checkpoint/dir") \
+ .option("path", "path/to/destination/dir") \
.start()
# ========== DF with aggregation ==========
@@ -1120,11 +1126,11 @@ aggDF \
.start()
# Have all the aggregates in an in memory table. The query name will be the table name
-aggDF\
- .writeStream()\
- .queryName("aggregates")\
- .outputMode("complete")\
- .format("memory")\
+aggDF \
+ .writeStream() \
+ .queryName("aggregates") \
+ .outputMode("complete") \
+ .format("memory") \
.start()
spark.sql("select * from aggregates").show() # interactively query in-memory table
@@ -1159,7 +1165,9 @@ The `StreamingQuery` object created when a query is started can be used to monit
{% highlight scala %}
val query = df.writeStream.format("console").start() // get the query object
-query.id // get the unique identifier of the running query
+query.id // get the unique identifier of the running query that persists across restarts from checkpoint data
+
+query.runId // get the unique id of this run of the query, which will be generated at every start/restart
query.name // get the name of the auto-generated or user-specified name
@@ -1169,11 +1177,11 @@ query.stop() // stop the query
query.awaitTermination() // block until query is terminated, with stop() or with error
-query.exception() // the exception if the query has been terminated with error
+query.exception // the exception if the query has been terminated with error
-query.sourceStatus() // progress information about data has been read from the input sources
+query.recentProgress // an array of the most recent progress updates for this query
-query.sinkStatus() // progress information about data written to the output sink
+query.lastProgress // the most recent progress update of this streaming query
{% endhighlight %}
@@ -1183,21 +1191,23 @@ query.sinkStatus() // progress information about data written to the output si
{% highlight java %}
StreamingQuery query = df.writeStream().format("console").start(); // get the query object
-query.id(); // get the unique identifier of the running query
+query.id(); // get the unique identifier of the running query that persists across restarts from checkpoint data
+
+query.runId(); // get the unique id of this run of the query, which will be generated at every start/restart
query.name(); // get the name of the auto-generated or user-specified name
query.explain(); // print detailed explanations of the query
-query.stop(); // stop the query
+query.stop(); // stop the query
query.awaitTermination(); // block until query is terminated, with stop() or with error
-query.exception(); // the exception if the query has been terminated with error
+query.exception(); // the exception if the query has been terminated with error
-query.sourceStatus(); // progress information about data has been read from the input sources
+query.recentProgress(); // an array of the most recent progress updates for this query
-query.sinkStatus(); // progress information about data written to the output sink
+query.lastProgress(); // the most recent progress update of this streaming query
{% endhighlight %}
@@ -1207,7 +1217,9 @@ query.sinkStatus(); // progress information about data written to the output s
{% highlight python %}
query = df.writeStream().format("console").start() # get the query object
-query.id() # get the unique identifier of the running query
+query.id() # get the unique identifier of the running query that persists across restarts from checkpoint data
+
+query.runId() # get the unique id of this run of the query, which will be generated at every start/restart
query.name() # get the name of the auto-generated or user-specified name
@@ -1217,11 +1229,11 @@ query.stop() # stop the query
query.awaitTermination() # block until query is terminated, with stop() or with error
-query.exception() # the exception if the query has been terminated with error
+query.exception() # the exception if the query has been terminated with error
-query.sourceStatus() # progress information about data has been read from the input sources
+query.recentProgress() # an array of the most recent progress updates for this query
-query.sinkStatus() # progress information about data written to the output sink
+query.lastProgress() # the most recent progress update of this streaming query
{% endhighlight %}
@@ -1491,14 +1503,17 @@ spark.streams.addListener(new StreamingQueryListener() {
{% highlight java %}
SparkSession spark = ...
-spark.streams.addListener(new StreamingQueryListener() {
- @Overrides void onQueryStarted(QueryStartedEvent queryStarted) {
+spark.streams().addListener(new StreamingQueryListener() {
+ @Override
+ public void onQueryStarted(QueryStartedEvent queryStarted) {
System.out.println("Query started: " + queryStarted.id());
}
- @Overrides void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
+ @Override
+ public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
System.out.println("Query terminated: " + queryTerminated.id());
}
- @Overrides void onQueryProgress(QueryProgressEvent queryProgress) {
+ @Override
+ public void onQueryProgress(QueryProgressEvent queryProgress) {
System.out.println("Query made progress: " + queryProgress.progress());
}
});