aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java6
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java30
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala4
4 files changed, 23 insertions, 21 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
index 346d2182c7..c913ee0658 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
@@ -53,15 +53,15 @@ public final class JavaStructuredNetworkWordCount {
.getOrCreate();
// Create DataFrame representing the stream of input lines from connection to host:port
- Dataset<String> lines = spark
+ Dataset<Row> lines = spark
.readStream()
.format("socket")
.option("host", host)
.option("port", port)
- .load().as(Encoders.STRING());
+ .load();
// Split the lines into words
- Dataset<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+ Dataset<String> words = lines.as(Encoders.STRING()).flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String x) {
return Arrays.asList(x.split(" ")).iterator();
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
index 557d36cff3..172d053c29 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
@@ -75,28 +75,30 @@ public final class JavaStructuredNetworkWordCountWindowed {
.getOrCreate();
// Create DataFrame representing the stream of input lines from connection to host:port
- Dataset<Tuple2<String, Timestamp>> lines = spark
+ Dataset<Row> lines = spark
.readStream()
.format("socket")
.option("host", host)
.option("port", port)
.option("includeTimestamp", true)
- .load().as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()));
+ .load();
// Split the lines into words, retaining timestamps
- Dataset<Row> words = lines.flatMap(
- new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() {
- @Override
- public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> t) {
- List<Tuple2<String, Timestamp>> result = new ArrayList<>();
- for (String word : t._1.split(" ")) {
- result.add(new Tuple2<>(word, t._2));
+ Dataset<Row> words = lines
+ .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()))
+ .flatMap(
+ new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() {
+ @Override
+ public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> t) {
+ List<Tuple2<String, Timestamp>> result = new ArrayList<>();
+ for (String word : t._1.split(" ")) {
+ result.add(new Tuple2<>(word, t._2));
+ }
+ return result.iterator();
}
- return result.iterator();
- }
- },
- Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())
- ).toDF("word", "timestamp");
+ },
+ Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())
+ ).toDF("word", "timestamp");
// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
index 364bff227b..f0756c4e18 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
@@ -56,10 +56,10 @@ object StructuredNetworkWordCount {
.format("socket")
.option("host", host)
.option("port", port)
- .load().as[String]
+ .load()
// Split the lines into words
- val words = lines.flatMap(_.split(" "))
+ val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count
val wordCounts = words.groupBy("value").count()
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala
index 333b0a9d24..b4dad21dd7 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala
@@ -78,10 +78,10 @@ object StructuredNetworkWordCountWindowed {
.option("host", host)
.option("port", port)
.option("includeTimestamp", true)
- .load().as[(String, Timestamp)]
+ .load()
// Split the lines into words, retaining timestamps
- val words = lines.flatMap(line =>
+ val words = lines.as[(String, Timestamp)].flatMap(line =>
line._1.split(" ").map(word => (word, line._2))
).toDF("word", "timestamp")