From 808b84e2de3537a47dc1c5f426a89dd4ec6bf0c5 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Mon, 2 Jan 2017 14:40:06 +0000 Subject: [SPARK-19041][SS] Fix code snippet compilation issues in Structured Streaming Programming Guide ## What changes were proposed in this pull request? Currently some code snippets in the programming guide just do not compile. We should fix them. ## How was this patch tested? ``` SKIP_API=1 jekyll build ``` ## Screenshot from part of the change: ![snip20161231_37](https://cloud.githubusercontent.com/assets/15843379/21576864/cc52fcd8-cf7b-11e6-8bd6-f935d9ff4a6b.png) Author: Liwei Lin Closes #16442 from lw-lin/ss-pro-guide-. --- docs/structured-streaming-programming-guide.md | 87 +++++++++++++++----------- 1 file changed, 51 insertions(+), 36 deletions(-) (limited to 'docs') diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 3b7d0c4003..799f636505 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -537,9 +537,9 @@ Most of the common operations on DataFrame/Dataset are supported for streaming.
{% highlight scala %} -case class DeviceData(device: String, type: String, signal: Double, time: DateTime) +case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime) -val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: string } +val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string } val ds: Dataset[DeviceData] = df.as[DeviceData] // streaming Dataset with IOT device data // Select the devices which have signal more than 10 @@ -547,11 +547,11 @@ df.select("device").where("signal > 10") // using untyped APIs ds.filter(_.signal > 10).map(_.device) // using typed APIs // Running count of the number of updates for each device type -df.groupBy("type").count() // using untyped API +df.groupBy("deviceType").count() // using untyped API // Running average signal for each device type -import org.apache.spark.sql.expressions.scalalang.typed._ -ds.groupByKey(_.type).agg(typed.avg(_.signal)) // using typed API +import org.apache.spark.sql.expressions.scalalang.typed +ds.groupByKey(_.deviceType).agg(typed.avg(_.signal)) // using typed API {% endhighlight %}
@@ -565,7 +565,7 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; public class DeviceData { private String device; - private String type; + private String deviceType; private Double signal; private java.sql.Date time; ... @@ -590,13 +590,13 @@ ds.filter(new FilterFunction() { // using typed APIs }, Encoders.STRING()); // Running count of the number of updates for each device type -df.groupBy("type").count(); // using untyped API +df.groupBy("deviceType").count(); // using untyped API // Running average signal for each device type ds.groupByKey(new MapFunction() { // using typed API @Override public String call(DeviceData value) throws Exception { - return value.getType(); + return value.getDeviceType(); } }, Encoders.STRING()).agg(typed.avg(new MapFunction() { @Override @@ -611,13 +611,13 @@ ds.groupByKey(new MapFunction() { // using typed API
{% highlight python %} -df = ... # streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType } +df = ... # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType } # Select the devices which have signal more than 10 df.select("device").where("signal > 10") # Running count of the number of updates for each device type -df.groupBy("type").count() +df.groupBy("deviceType").count() {% endhighlight %}
@@ -973,7 +973,7 @@ Here is a table of all the sinks, and the corresponding settings. File Sink Append -
writeStream
.format("parquet")
.start()
+
writeStream
.format("parquet")
.option(
"checkpointLocation",
"path/to/checkpoint/dir")
.option(
"path",
"path/to/destination/dir")
.start()
Yes Supports writes to partitioned tables. Partitioning by time may be useful. @@ -1026,7 +1026,9 @@ noAggDF // Write new data to Parquet files noAggDF .writeStream - .parquet("path/to/destination/directory") + .format("parquet") + .option("checkpointLocation", "path/to/checkpoint/dir") + .option("path", "path/to/destination/dir") .start() // ========== DF with aggregation ========== @@ -1066,7 +1068,9 @@ noAggDF // Write new data to Parquet files noAggDF .writeStream() - .parquet("path/to/destination/directory") + .format("parquet") + .option("checkpointLocation", "path/to/checkpoint/dir") + .option("path", "path/to/destination/dir") .start(); // ========== DF with aggregation ========== @@ -1106,7 +1110,9 @@ noAggDF \ # Write new data to Parquet files noAggDF \ .writeStream() \ - .parquet("path/to/destination/directory") \ + .format("parquet") \ + .option("checkpointLocation", "path/to/checkpoint/dir") \ + .option("path", "path/to/destination/dir") \ .start() # ========== DF with aggregation ========== @@ -1120,11 +1126,11 @@ aggDF \ .start() # Have all the aggregates in an in memory table. The query name will be the table name -aggDF\ - .writeStream()\ - .queryName("aggregates")\ - .outputMode("complete")\ - .format("memory")\ +aggDF \ + .writeStream() \ + .queryName("aggregates") \ + .outputMode("complete") \ + .format("memory") \ .start() spark.sql("select * from aggregates").show() # interactively query in-memory table @@ -1159,7 +1165,9 @@ The `StreamingQuery` object created when a query is started can be used to monit {% highlight scala %} val query = df.writeStream.format("console").start() // get the query object -query.id // get the unique identifier of the running query +query.id // get the unique identifier of the running query that persists across restarts from checkpoint data + +query.runId // get the unique id of this run of the query, which will be generated at every start/restart query.name // get the name of the auto-generated or user-specified name @@ -1169,11 +1177,11 @@ query.stop() // stop the query query.awaitTermination() // block until query is terminated, with stop() or with error -query.exception() // the exception if the query has been terminated with error +query.exception // the exception if the query has been terminated with error -query.sourceStatus() // progress information about data has been read from the input sources +query.recentProgress // an array of the most recent progress updates for this query -query.sinkStatus() // progress information about data written to the output sink +query.lastProgress // the most recent progress update of this streaming query {% endhighlight %} @@ -1183,21 +1191,23 @@ query.sinkStatus() // progress information about data written to the output si {% highlight java %} StreamingQuery query = df.writeStream().format("console").start(); // get the query object -query.id(); // get the unique identifier of the running query +query.id(); // get the unique identifier of the running query that persists across restarts from checkpoint data + +query.runId(); // get the unique id of this run of the query, which will be generated at every start/restart query.name(); // get the name of the auto-generated or user-specified name query.explain(); // print detailed explanations of the query -query.stop(); // stop the query +query.stop(); // stop the query query.awaitTermination(); // block until query is terminated, with stop() or with error -query.exception(); // the exception if the query has been terminated with error +query.exception(); // the exception if the query has been terminated with error -query.sourceStatus(); // progress information about data has been read from the input sources +query.recentProgress(); // an array of the most recent progress updates for this query -query.sinkStatus(); // progress information about data written to the output sink +query.lastProgress(); // the most recent progress update of this streaming query {% endhighlight %} @@ -1207,7 +1217,9 @@ query.sinkStatus(); // progress information about data written to the output s {% highlight python %} query = df.writeStream().format("console").start() # get the query object -query.id() # get the unique identifier of the running query +query.id() # get the unique identifier of the running query that persists across restarts from checkpoint data + +query.runId() # get the unique id of this run of the query, which will be generated at every start/restart query.name() # get the name of the auto-generated or user-specified name @@ -1217,11 +1229,11 @@ query.stop() # stop the query query.awaitTermination() # block until query is terminated, with stop() or with error -query.exception() # the exception if the query has been terminated with error +query.exception() # the exception if the query has been terminated with error -query.sourceStatus() # progress information about data has been read from the input sources +query.recentProgress() # an array of the most recent progress updates for this query -query.sinkStatus() # progress information about data written to the output sink +query.lastProgress() # the most recent progress update of this streaming query {% endhighlight %} @@ -1491,14 +1503,17 @@ spark.streams.addListener(new StreamingQueryListener() { {% highlight java %} SparkSession spark = ... -spark.streams.addListener(new StreamingQueryListener() { - @Overrides void onQueryStarted(QueryStartedEvent queryStarted) { +spark.streams().addListener(new StreamingQueryListener() { + @Override + public void onQueryStarted(QueryStartedEvent queryStarted) { System.out.println("Query started: " + queryStarted.id()); } - @Overrides void onQueryTerminated(QueryTerminatedEvent queryTerminated) { + @Override + public void onQueryTerminated(QueryTerminatedEvent queryTerminated) { System.out.println("Query terminated: " + queryTerminated.id()); } - @Overrides void onQueryProgress(QueryProgressEvent queryProgress) { + @Override + public void onQueryProgress(QueryProgressEvent queryProgress) { System.out.println("Query made progress: " + queryProgress.progress()); } }); -- cgit v1.2.3