aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-08-11 11:31:52 +0100
committerSean Owen <sowen@cloudera.com>2016-08-11 11:31:52 +0100
commit7186e8c3180b7f38250cf2f2de791472bf5325a5 (patch)
tree1d1f430f81102ba889bf93566d7173e508134ec7 /examples
parenta45fefd17ec4a499b988a2f9931ce397918d3bef (diff)
downloadspark-7186e8c3180b7f38250cf2f2de791472bf5325a5.tar.gz
spark-7186e8c3180b7f38250cf2f2de791472bf5325a5.tar.bz2
spark-7186e8c3180b7f38250cf2f2de791472bf5325a5.zip
[SPARK-16886][EXAMPLES][DOC] Fix some examples to be consistent and indentation in documentation
## What changes were proposed in this pull request? Originally this PR was based on #14491 but I realised that fixing examples are more sensible rather than comments. This PR fixes three things below: - Fix two wrong examples in `structured-streaming-programming-guide.md`. Loading via `read.load(..)` without `as` will be `Dataset<Row>` not `Dataset<String>` in Java. - Fix indentation across `structured-streaming-programming-guide.md`. Python has 4 spaces and Scala and Java have double spaces. These are inconsistent across the examples. - Fix `StructuredNetworkWordCountWindowed` and `StructuredNetworkWordCount` in Java and Scala to initially load `DataFrame` and `Dataset<Row>` to be consistent with the comments and some examples in `structured-streaming-programming-guide.md` and to match Scala and Java to Python one (Python one loads it as `DataFrame` initially). ## How was this patch tested? N/A Closes https://github.com/apache/spark/pull/14491 Author: hyukjinkwon <gurwls223@gmail.com> Author: Ganesh Chand <ganeshchand@Ganeshs-MacBook-Pro-2.local> Closes #14564 from HyukjinKwon/SPARK-16886.
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java6
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java30
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala4
4 files changed, 23 insertions, 21 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
index 346d2182c7..c913ee0658 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
@@ -53,15 +53,15 @@ public final class JavaStructuredNetworkWordCount {
.getOrCreate();
// Create DataFrame representing the stream of input lines from connection to host:port
- Dataset<String> lines = spark
+ Dataset<Row> lines = spark
.readStream()
.format("socket")
.option("host", host)
.option("port", port)
- .load().as(Encoders.STRING());
+ .load();
// Split the lines into words
- Dataset<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+ Dataset<String> words = lines.as(Encoders.STRING()).flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String x) {
return Arrays.asList(x.split(" ")).iterator();
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
index 557d36cff3..172d053c29 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
@@ -75,28 +75,30 @@ public final class JavaStructuredNetworkWordCountWindowed {
.getOrCreate();
// Create DataFrame representing the stream of input lines from connection to host:port
- Dataset<Tuple2<String, Timestamp>> lines = spark
+ Dataset<Row> lines = spark
.readStream()
.format("socket")
.option("host", host)
.option("port", port)
.option("includeTimestamp", true)
- .load().as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()));
+ .load();
// Split the lines into words, retaining timestamps
- Dataset<Row> words = lines.flatMap(
- new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() {
- @Override
- public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> t) {
- List<Tuple2<String, Timestamp>> result = new ArrayList<>();
- for (String word : t._1.split(" ")) {
- result.add(new Tuple2<>(word, t._2));
+ Dataset<Row> words = lines
+ .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()))
+ .flatMap(
+ new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() {
+ @Override
+ public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> t) {
+ List<Tuple2<String, Timestamp>> result = new ArrayList<>();
+ for (String word : t._1.split(" ")) {
+ result.add(new Tuple2<>(word, t._2));
+ }
+ return result.iterator();
}
- return result.iterator();
- }
- },
- Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())
- ).toDF("word", "timestamp");
+ },
+ Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())
+ ).toDF("word", "timestamp");
// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
index 364bff227b..f0756c4e18 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
@@ -56,10 +56,10 @@ object StructuredNetworkWordCount {
.format("socket")
.option("host", host)
.option("port", port)
- .load().as[String]
+ .load()
// Split the lines into words
- val words = lines.flatMap(_.split(" "))
+ val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count
val wordCounts = words.groupBy("value").count()
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala
index 333b0a9d24..b4dad21dd7 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala
@@ -78,10 +78,10 @@ object StructuredNetworkWordCountWindowed {
.option("host", host)
.option("port", port)
.option("includeTimestamp", true)
- .load().as[(String, Timestamp)]
+ .load()
// Split the lines into words, retaining timestamps
- val words = lines.flatMap(line =>
+ val words = lines.as[(String, Timestamp)].flatMap(line =>
line._1.split(" ").map(word => (word, line._2))
).toDF("word", "timestamp")