diff options
author | hyukjinkwon <gurwls223@gmail.com> | 2016-08-11 11:31:52 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-08-11 11:31:52 +0100 |
commit | 7186e8c3180b7f38250cf2f2de791472bf5325a5 (patch) | |
tree | 1d1f430f81102ba889bf93566d7173e508134ec7 /docs | |
parent | a45fefd17ec4a499b988a2f9931ce397918d3bef (diff) | |
download | spark-7186e8c3180b7f38250cf2f2de791472bf5325a5.tar.gz spark-7186e8c3180b7f38250cf2f2de791472bf5325a5.tar.bz2 spark-7186e8c3180b7f38250cf2f2de791472bf5325a5.zip |
[SPARK-16886][EXAMPLES][DOC] Fix some examples to be consistent and indentation in documentation
## What changes were proposed in this pull request?
Originally this PR was based on #14491 but I realised that fixing examples are more sensible rather than comments.
This PR fixes three things below:
- Fix two wrong examples in `structured-streaming-programming-guide.md`. Loading via `read.load(..)` without `as` will be `Dataset<Row>` not `Dataset<String>` in Java.
- Fix indentation across `structured-streaming-programming-guide.md`. Python has 4 spaces and Scala and Java have double spaces. These are inconsistent across the examples.
- Fix `StructuredNetworkWordCountWindowed` and `StructuredNetworkWordCount` in Java and Scala to initially load `DataFrame` and `Dataset<Row>` to be consistent with the comments and some examples in `structured-streaming-programming-guide.md` and to match Scala and Java to Python one (Python one loads it as `DataFrame` initially).
## How was this patch tested?
N/A
Closes https://github.com/apache/spark/pull/14491
Author: hyukjinkwon <gurwls223@gmail.com>
Author: Ganesh Chand <ganeshchand@Ganeshs-MacBook-Pro-2.local>
Closes #14564 from HyukjinKwon/SPARK-16886.
Diffstat (limited to 'docs')
-rw-r--r-- | docs/structured-streaming-programming-guide.md | 202 |
1 files changed, 101 insertions, 101 deletions
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 8c14c3d220..99d50e51e2 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -46,9 +46,9 @@ import java.util.Arrays; import java.util.Iterator; SparkSession spark = SparkSession - .builder() - .appName("JavaStructuredNetworkWordCount") - .getOrCreate(); + .builder() + .appName("JavaStructuredNetworkWordCount") + .getOrCreate(); {% endhighlight %} </div> @@ -95,7 +95,7 @@ This `lines` DataFrame represents an unbounded table containing the streaming te {% highlight java %} // Create DataFrame representing the stream of input lines from connection to localhost:9999 -Dataset<String> lines = spark +Dataset<Row> lines = spark .readStream() .format("socket") .option("host", "localhost") @@ -104,14 +104,14 @@ Dataset<String> lines = spark // Split the lines into words Dataset<String> words = lines - .as(Encoders.STRING()) - .flatMap( - new FlatMapFunction<String, String>() { - @Override - public Iterator<String> call(String x) { - return Arrays.asList(x.split(" ")).iterator(); - } - }, Encoders.STRING()); + .as(Encoders.STRING()) + .flatMap( + new FlatMapFunction<String, String>() { + @Override + public Iterator<String> call(String x) { + return Arrays.asList(x.split(" ")).iterator(); + } + }, Encoders.STRING()); // Generate running word count Dataset<Row> wordCounts = words.groupBy("value").count(); @@ -125,11 +125,11 @@ This `lines` DataFrame represents an unbounded table containing the streaming te {% highlight python %} # Create DataFrame representing the stream of input lines from connection to localhost:9999 lines = spark\ - .readStream\ - .format('socket')\ - .option('host', 'localhost')\ - .option('port', 9999)\ - .load() + .readStream\ + .format('socket')\ + .option('host', 'localhost')\ + .option('port', 9999)\ + .load() # Split the lines into words words = lines.select( @@ -434,11 +434,11 @@ val spark: SparkSession = ... // Read text from socket val socketDF = spark - .readStream - .format("socket") - .option("host", "localhost") - .option("port", 9999) - .load() + .readStream + .format("socket") + .option("host", "localhost") + .option("port", 9999) + .load() socketDF.isStreaming // Returns True for DataFrames that have streaming sources @@ -447,10 +447,10 @@ socketDF.printSchema // Read all the csv files written atomically in a directory val userSchema = new StructType().add("name", "string").add("age", "integer") val csvDF = spark - .readStream - .option("sep", ";") - .schema(userSchema) // Specify schema of the csv files - .csv("/path/to/directory") // Equivalent to format("csv").load("/path/to/directory") + .readStream + .option("sep", ";") + .schema(userSchema) // Specify schema of the csv files + .csv("/path/to/directory") // Equivalent to format("csv").load("/path/to/directory") {% endhighlight %} </div> @@ -461,11 +461,11 @@ SparkSession spark = ... // Read text from socket Dataset[Row] socketDF = spark - .readStream() - .format("socket") - .option("host", "localhost") - .option("port", 9999) - .load(); + .readStream() + .format("socket") + .option("host", "localhost") + .option("port", 9999) + .load(); socketDF.isStreaming(); // Returns True for DataFrames that have streaming sources @@ -474,10 +474,10 @@ socketDF.printSchema(); // Read all the csv files written atomically in a directory StructType userSchema = new StructType().add("name", "string").add("age", "integer"); Dataset[Row] csvDF = spark - .readStream() - .option("sep", ";") - .schema(userSchema) // Specify schema of the csv files - .csv("/path/to/directory"); // Equivalent to format("csv").load("/path/to/directory") + .readStream() + .option("sep", ";") + .schema(userSchema) // Specify schema of the csv files + .csv("/path/to/directory"); // Equivalent to format("csv").load("/path/to/directory") {% endhighlight %} </div> @@ -549,12 +549,12 @@ import org.apache.spark.sql.expressions.javalang.typed; import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; public class DeviceData { - private String device; - private String type; - private Double signal; - private java.sql.Date time; - ... - // Getter and setter methods for each field + private String device; + private String type; + private Double signal; + private java.sql.Date time; + ... + // Getter and setter methods for each field } Dataset<Row> df = ...; // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType } @@ -828,33 +828,33 @@ val noAggDF = deviceDataDf.select("device").where("signal > 10") // Print new data to console noAggDF - .writeStream - .format("console") - .start() + .writeStream + .format("console") + .start() // Write new data to Parquet files noAggDF - .writeStream - .parquet("path/to/destination/directory") - .start() + .writeStream + .parquet("path/to/destination/directory") + .start() // ========== DF with aggregation ========== val aggDF = df.groupBy(“device”).count() // Print updated aggregations to console aggDF - .writeStream - .outputMode("complete") - .format("console") - .start() + .writeStream + .outputMode("complete") + .format("console") + .start() // Have all the aggregates in an in-memory table aggDF - .writeStream - .queryName("aggregates") // this query name will be the table name - .outputMode("complete") - .format("memory") - .start() + .writeStream + .queryName("aggregates") // this query name will be the table name + .outputMode("complete") + .format("memory") + .start() spark.sql("select * from aggregates").show() // interactively query in-memory table {% endhighlight %} @@ -868,33 +868,33 @@ Dataset<Row> noAggDF = deviceDataDf.select("device").where("signal > 10"); // Print new data to console noAggDF - .writeStream() - .format("console") - .start(); + .writeStream() + .format("console") + .start(); // Write new data to Parquet files noAggDF - .writeStream() - .parquet("path/to/destination/directory") - .start(); + .writeStream() + .parquet("path/to/destination/directory") + .start(); // ========== DF with aggregation ========== Dataset<Row> aggDF = df.groupBy(“device”).count(); // Print updated aggregations to console aggDF - .writeStream() - .outputMode("complete") - .format("console") - .start(); + .writeStream() + .outputMode("complete") + .format("console") + .start(); // Have all the aggregates in an in-memory table aggDF - .writeStream() - .queryName("aggregates") // this query name will be the table name - .outputMode("complete") - .format("memory") - .start(); + .writeStream() + .queryName("aggregates") // this query name will be the table name + .outputMode("complete") + .format("memory") + .start(); spark.sql("select * from aggregates").show(); // interactively query in-memory table {% endhighlight %} @@ -908,33 +908,33 @@ noAggDF = deviceDataDf.select("device").where("signal > 10") # Print new data to console noAggDF\ - .writeStream()\ - .format("console")\ - .start() + .writeStream()\ + .format("console")\ + .start() # Write new data to Parquet files noAggDF\ - .writeStream()\ - .parquet("path/to/destination/directory")\ - .start() + .writeStream()\ + .parquet("path/to/destination/directory")\ + .start() # ========== DF with aggregation ========== aggDF = df.groupBy(“device”).count() # Print updated aggregations to console aggDF\ - .writeStream()\ - .outputMode("complete")\ - .format("console")\ - .start() + .writeStream()\ + .outputMode("complete")\ + .format("console")\ + .start() # Have all the aggregates in an in memory table. The query name will be the table name aggDF\ - .writeStream()\ - .queryName("aggregates")\ - .outputMode("complete")\ - .format("memory")\ - .start() + .writeStream()\ + .queryName("aggregates")\ + .outputMode("complete")\ + .format("memory")\ + .start() spark.sql("select * from aggregates").show() # interactively query in-memory table {% endhighlight %} @@ -1093,11 +1093,11 @@ In case of a failure or intentional shutdown, you can recover the previous progr {% highlight scala %} aggDF - .writeStream - .outputMode("complete") - .option(“checkpointLocation”, “path/to/HDFS/dir”) - .format("memory") - .start() + .writeStream + .outputMode("complete") + .option(“checkpointLocation”, “path/to/HDFS/dir”) + .format("memory") + .start() {% endhighlight %} </div> @@ -1105,11 +1105,11 @@ aggDF {% highlight java %} aggDF - .writeStream() - .outputMode("complete") - .option(“checkpointLocation”, “path/to/HDFS/dir”) - .format("memory") - .start(); + .writeStream() + .outputMode("complete") + .option(“checkpointLocation”, “path/to/HDFS/dir”) + .format("memory") + .start(); {% endhighlight %} </div> @@ -1117,11 +1117,11 @@ aggDF {% highlight python %} aggDF\ - .writeStream()\ - .outputMode("complete")\ - .option(“checkpointLocation”, “path/to/HDFS/dir”)\ - .format("memory")\ - .start() + .writeStream()\ + .outputMode("complete")\ + .option(“checkpointLocation”, “path/to/HDFS/dir”)\ + .format("memory")\ + .start() {% endhighlight %} </div> |