aboutsummaryrefslogtreecommitdiff
path: root/docs
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-08-11 11:31:52 +0100
committerSean Owen <sowen@cloudera.com>2016-08-11 11:31:52 +0100
commit7186e8c3180b7f38250cf2f2de791472bf5325a5 (patch)
tree1d1f430f81102ba889bf93566d7173e508134ec7 /docs
parenta45fefd17ec4a499b988a2f9931ce397918d3bef (diff)
downloadspark-7186e8c3180b7f38250cf2f2de791472bf5325a5.tar.gz
spark-7186e8c3180b7f38250cf2f2de791472bf5325a5.tar.bz2
spark-7186e8c3180b7f38250cf2f2de791472bf5325a5.zip
[SPARK-16886][EXAMPLES][DOC] Fix some examples to be consistent and indentation in documentation
## What changes were proposed in this pull request? Originally this PR was based on #14491 but I realised that fixing examples are more sensible rather than comments. This PR fixes three things below: - Fix two wrong examples in `structured-streaming-programming-guide.md`. Loading via `read.load(..)` without `as` will be `Dataset<Row>` not `Dataset<String>` in Java. - Fix indentation across `structured-streaming-programming-guide.md`. Python has 4 spaces and Scala and Java have double spaces. These are inconsistent across the examples. - Fix `StructuredNetworkWordCountWindowed` and `StructuredNetworkWordCount` in Java and Scala to initially load `DataFrame` and `Dataset<Row>` to be consistent with the comments and some examples in `structured-streaming-programming-guide.md` and to match Scala and Java to Python one (Python one loads it as `DataFrame` initially). ## How was this patch tested? N/A Closes https://github.com/apache/spark/pull/14491 Author: hyukjinkwon <gurwls223@gmail.com> Author: Ganesh Chand <ganeshchand@Ganeshs-MacBook-Pro-2.local> Closes #14564 from HyukjinKwon/SPARK-16886.
Diffstat (limited to 'docs')
-rw-r--r--docs/structured-streaming-programming-guide.md202
1 files changed, 101 insertions, 101 deletions
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 8c14c3d220..99d50e51e2 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -46,9 +46,9 @@ import java.util.Arrays;
import java.util.Iterator;
SparkSession spark = SparkSession
- .builder()
- .appName("JavaStructuredNetworkWordCount")
- .getOrCreate();
+ .builder()
+ .appName("JavaStructuredNetworkWordCount")
+ .getOrCreate();
{% endhighlight %}
</div>
@@ -95,7 +95,7 @@ This `lines` DataFrame represents an unbounded table containing the streaming te
{% highlight java %}
// Create DataFrame representing the stream of input lines from connection to localhost:9999
-Dataset<String> lines = spark
+Dataset<Row> lines = spark
.readStream()
.format("socket")
.option("host", "localhost")
@@ -104,14 +104,14 @@ Dataset<String> lines = spark
// Split the lines into words
Dataset<String> words = lines
- .as(Encoders.STRING())
- .flatMap(
- new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String x) {
- return Arrays.asList(x.split(" ")).iterator();
- }
- }, Encoders.STRING());
+ .as(Encoders.STRING())
+ .flatMap(
+ new FlatMapFunction<String, String>() {
+ @Override
+ public Iterator<String> call(String x) {
+ return Arrays.asList(x.split(" ")).iterator();
+ }
+ }, Encoders.STRING());
// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();
@@ -125,11 +125,11 @@ This `lines` DataFrame represents an unbounded table containing the streaming te
{% highlight python %}
# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark\
- .readStream\
- .format('socket')\
- .option('host', 'localhost')\
- .option('port', 9999)\
- .load()
+ .readStream\
+ .format('socket')\
+ .option('host', 'localhost')\
+ .option('port', 9999)\
+ .load()
# Split the lines into words
words = lines.select(
@@ -434,11 +434,11 @@ val spark: SparkSession = ...
// Read text from socket
val socketDF = spark
- .readStream
- .format("socket")
- .option("host", "localhost")
- .option("port", 9999)
- .load()
+ .readStream
+ .format("socket")
+ .option("host", "localhost")
+ .option("port", 9999)
+ .load()
socketDF.isStreaming // Returns True for DataFrames that have streaming sources
@@ -447,10 +447,10 @@ socketDF.printSchema
// Read all the csv files written atomically in a directory
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
- .readStream
- .option("sep", ";")
- .schema(userSchema) // Specify schema of the csv files
- .csv("/path/to/directory") // Equivalent to format("csv").load("/path/to/directory")
+ .readStream
+ .option("sep", ";")
+ .schema(userSchema) // Specify schema of the csv files
+ .csv("/path/to/directory") // Equivalent to format("csv").load("/path/to/directory")
{% endhighlight %}
</div>
@@ -461,11 +461,11 @@ SparkSession spark = ...
// Read text from socket
Dataset[Row] socketDF = spark
- .readStream()
- .format("socket")
- .option("host", "localhost")
- .option("port", 9999)
- .load();
+ .readStream()
+ .format("socket")
+ .option("host", "localhost")
+ .option("port", 9999)
+ .load();
socketDF.isStreaming(); // Returns True for DataFrames that have streaming sources
@@ -474,10 +474,10 @@ socketDF.printSchema();
// Read all the csv files written atomically in a directory
StructType userSchema = new StructType().add("name", "string").add("age", "integer");
Dataset[Row] csvDF = spark
- .readStream()
- .option("sep", ";")
- .schema(userSchema) // Specify schema of the csv files
- .csv("/path/to/directory"); // Equivalent to format("csv").load("/path/to/directory")
+ .readStream()
+ .option("sep", ";")
+ .schema(userSchema) // Specify schema of the csv files
+ .csv("/path/to/directory"); // Equivalent to format("csv").load("/path/to/directory")
{% endhighlight %}
</div>
@@ -549,12 +549,12 @@ import org.apache.spark.sql.expressions.javalang.typed;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
public class DeviceData {
- private String device;
- private String type;
- private Double signal;
- private java.sql.Date time;
- ...
- // Getter and setter methods for each field
+ private String device;
+ private String type;
+ private Double signal;
+ private java.sql.Date time;
+ ...
+ // Getter and setter methods for each field
}
Dataset<Row> df = ...; // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType }
@@ -828,33 +828,33 @@ val noAggDF = deviceDataDf.select("device").where("signal > 10")
// Print new data to console
noAggDF
- .writeStream
- .format("console")
- .start()
+ .writeStream
+ .format("console")
+ .start()
// Write new data to Parquet files
noAggDF
- .writeStream
- .parquet("path/to/destination/directory")
- .start()
+ .writeStream
+ .parquet("path/to/destination/directory")
+ .start()
// ========== DF with aggregation ==========
val aggDF = df.groupBy(“device”).count()
// Print updated aggregations to console
aggDF
- .writeStream
- .outputMode("complete")
- .format("console")
- .start()
+ .writeStream
+ .outputMode("complete")
+ .format("console")
+ .start()
// Have all the aggregates in an in-memory table
aggDF
- .writeStream
- .queryName("aggregates") // this query name will be the table name
- .outputMode("complete")
- .format("memory")
- .start()
+ .writeStream
+ .queryName("aggregates") // this query name will be the table name
+ .outputMode("complete")
+ .format("memory")
+ .start()
spark.sql("select * from aggregates").show() // interactively query in-memory table
{% endhighlight %}
@@ -868,33 +868,33 @@ Dataset<Row> noAggDF = deviceDataDf.select("device").where("signal > 10");
// Print new data to console
noAggDF
- .writeStream()
- .format("console")
- .start();
+ .writeStream()
+ .format("console")
+ .start();
// Write new data to Parquet files
noAggDF
- .writeStream()
- .parquet("path/to/destination/directory")
- .start();
+ .writeStream()
+ .parquet("path/to/destination/directory")
+ .start();
// ========== DF with aggregation ==========
Dataset<Row> aggDF = df.groupBy(“device”).count();
// Print updated aggregations to console
aggDF
- .writeStream()
- .outputMode("complete")
- .format("console")
- .start();
+ .writeStream()
+ .outputMode("complete")
+ .format("console")
+ .start();
// Have all the aggregates in an in-memory table
aggDF
- .writeStream()
- .queryName("aggregates") // this query name will be the table name
- .outputMode("complete")
- .format("memory")
- .start();
+ .writeStream()
+ .queryName("aggregates") // this query name will be the table name
+ .outputMode("complete")
+ .format("memory")
+ .start();
spark.sql("select * from aggregates").show(); // interactively query in-memory table
{% endhighlight %}
@@ -908,33 +908,33 @@ noAggDF = deviceDataDf.select("device").where("signal > 10")
# Print new data to console
noAggDF\
- .writeStream()\
- .format("console")\
- .start()
+ .writeStream()\
+ .format("console")\
+ .start()
# Write new data to Parquet files
noAggDF\
- .writeStream()\
- .parquet("path/to/destination/directory")\
- .start()
+ .writeStream()\
+ .parquet("path/to/destination/directory")\
+ .start()
# ========== DF with aggregation ==========
aggDF = df.groupBy(“device”).count()
# Print updated aggregations to console
aggDF\
- .writeStream()\
- .outputMode("complete")\
- .format("console")\
- .start()
+ .writeStream()\
+ .outputMode("complete")\
+ .format("console")\
+ .start()
# Have all the aggregates in an in memory table. The query name will be the table name
aggDF\
- .writeStream()\
- .queryName("aggregates")\
- .outputMode("complete")\
- .format("memory")\
- .start()
+ .writeStream()\
+ .queryName("aggregates")\
+ .outputMode("complete")\
+ .format("memory")\
+ .start()
spark.sql("select * from aggregates").show() # interactively query in-memory table
{% endhighlight %}
@@ -1093,11 +1093,11 @@ In case of a failure or intentional shutdown, you can recover the previous progr
{% highlight scala %}
aggDF
- .writeStream
- .outputMode("complete")
- .option(“checkpointLocation”, “path/to/HDFS/dir”)
- .format("memory")
- .start()
+ .writeStream
+ .outputMode("complete")
+ .option(“checkpointLocation”, “path/to/HDFS/dir”)
+ .format("memory")
+ .start()
{% endhighlight %}
</div>
@@ -1105,11 +1105,11 @@ aggDF
{% highlight java %}
aggDF
- .writeStream()
- .outputMode("complete")
- .option(“checkpointLocation”, “path/to/HDFS/dir”)
- .format("memory")
- .start();
+ .writeStream()
+ .outputMode("complete")
+ .option(“checkpointLocation”, “path/to/HDFS/dir”)
+ .format("memory")
+ .start();
{% endhighlight %}
</div>
@@ -1117,11 +1117,11 @@ aggDF
{% highlight python %}
aggDF\
- .writeStream()\
- .outputMode("complete")\
- .option(“checkpointLocation”, “path/to/HDFS/dir”)\
- .format("memory")\
- .start()
+ .writeStream()\
+ .outputMode("complete")\
+ .option(“checkpointLocation”, “path/to/HDFS/dir”)\
+ .format("memory")\
+ .start()
{% endhighlight %}
</div>