diff options
author | hyukjinkwon <gurwls223@gmail.com> | 2016-08-11 11:31:52 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-08-11 11:31:52 +0100 |
commit | 7186e8c3180b7f38250cf2f2de791472bf5325a5 (patch) | |
tree | 1d1f430f81102ba889bf93566d7173e508134ec7 /examples/src | |
parent | a45fefd17ec4a499b988a2f9931ce397918d3bef (diff) | |
download | spark-7186e8c3180b7f38250cf2f2de791472bf5325a5.tar.gz spark-7186e8c3180b7f38250cf2f2de791472bf5325a5.tar.bz2 spark-7186e8c3180b7f38250cf2f2de791472bf5325a5.zip |
[SPARK-16886][EXAMPLES][DOC] Fix some examples to be consistent and indentation in documentation
## What changes were proposed in this pull request?
Originally this PR was based on #14491 but I realised that fixing examples are more sensible rather than comments.
This PR fixes three things below:
- Fix two wrong examples in `structured-streaming-programming-guide.md`. Loading via `read.load(..)` without `as` will be `Dataset<Row>` not `Dataset<String>` in Java.
- Fix indentation across `structured-streaming-programming-guide.md`. Python has 4 spaces and Scala and Java have double spaces. These are inconsistent across the examples.
- Fix `StructuredNetworkWordCountWindowed` and `StructuredNetworkWordCount` in Java and Scala to initially load `DataFrame` and `Dataset<Row>` to be consistent with the comments and some examples in `structured-streaming-programming-guide.md` and to match Scala and Java to Python one (Python one loads it as `DataFrame` initially).
## How was this patch tested?
N/A
Closes https://github.com/apache/spark/pull/14491
Author: hyukjinkwon <gurwls223@gmail.com>
Author: Ganesh Chand <ganeshchand@Ganeshs-MacBook-Pro-2.local>
Closes #14564 from HyukjinKwon/SPARK-16886.
Diffstat (limited to 'examples/src')
4 files changed, 23 insertions, 21 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java index 346d2182c7..c913ee0658 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java @@ -53,15 +53,15 @@ public final class JavaStructuredNetworkWordCount { .getOrCreate(); // Create DataFrame representing the stream of input lines from connection to host:port - Dataset<String> lines = spark + Dataset<Row> lines = spark .readStream() .format("socket") .option("host", host) .option("port", port) - .load().as(Encoders.STRING()); + .load(); // Split the lines into words - Dataset<String> words = lines.flatMap(new FlatMapFunction<String, String>() { + Dataset<String> words = lines.as(Encoders.STRING()).flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String x) { return Arrays.asList(x.split(" ")).iterator(); diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java index 557d36cff3..172d053c29 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java @@ -75,28 +75,30 @@ public final class JavaStructuredNetworkWordCountWindowed { .getOrCreate(); // Create DataFrame representing the stream of input lines from connection to host:port - Dataset<Tuple2<String, Timestamp>> lines = spark + Dataset<Row> lines = spark .readStream() .format("socket") .option("host", host) .option("port", port) .option("includeTimestamp", true) - .load().as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())); + .load(); // Split the lines into words, retaining timestamps - Dataset<Row> words = lines.flatMap( - new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() { - @Override - public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> t) { - List<Tuple2<String, Timestamp>> result = new ArrayList<>(); - for (String word : t._1.split(" ")) { - result.add(new Tuple2<>(word, t._2)); + Dataset<Row> words = lines + .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())) + .flatMap( + new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() { + @Override + public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> t) { + List<Tuple2<String, Timestamp>> result = new ArrayList<>(); + for (String word : t._1.split(" ")) { + result.add(new Tuple2<>(word, t._2)); + } + return result.iterator(); } - return result.iterator(); - } - }, - Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()) - ).toDF("word", "timestamp"); + }, + Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()) + ).toDF("word", "timestamp"); // Group the data by window and word and compute the count of each group Dataset<Row> windowedCounts = words.groupBy( diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala index 364bff227b..f0756c4e18 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala @@ -56,10 +56,10 @@ object StructuredNetworkWordCount { .format("socket") .option("host", host) .option("port", port) - .load().as[String] + .load() // Split the lines into words - val words = lines.flatMap(_.split(" ")) + val words = lines.as[String].flatMap(_.split(" ")) // Generate running word count val wordCounts = words.groupBy("value").count() diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala index 333b0a9d24..b4dad21dd7 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala @@ -78,10 +78,10 @@ object StructuredNetworkWordCountWindowed { .option("host", host) .option("port", port) .option("includeTimestamp", true) - .load().as[(String, Timestamp)] + .load() // Split the lines into words, retaining timestamps - val words = lines.flatMap(line => + val words = lines.as[(String, Timestamp)].flatMap(line => line._1.split(" ").map(word => (word, line._2)) ).toDF("word", "timestamp") |