diff options
Diffstat (limited to 'examples')
4 files changed, 23 insertions, 21 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java index 346d2182c7..c913ee0658 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java @@ -53,15 +53,15 @@ public final class JavaStructuredNetworkWordCount { .getOrCreate(); // Create DataFrame representing the stream of input lines from connection to host:port - Dataset<String> lines = spark + Dataset<Row> lines = spark .readStream() .format("socket") .option("host", host) .option("port", port) - .load().as(Encoders.STRING()); + .load(); // Split the lines into words - Dataset<String> words = lines.flatMap(new FlatMapFunction<String, String>() { + Dataset<String> words = lines.as(Encoders.STRING()).flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String x) { return Arrays.asList(x.split(" ")).iterator(); diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java index 557d36cff3..172d053c29 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java @@ -75,28 +75,30 @@ public final class JavaStructuredNetworkWordCountWindowed { .getOrCreate(); // Create DataFrame representing the stream of input lines from connection to host:port - Dataset<Tuple2<String, Timestamp>> lines = spark + Dataset<Row> lines = spark .readStream() .format("socket") .option("host", host) .option("port", port) .option("includeTimestamp", true) - .load().as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())); + .load(); // Split the lines into words, retaining timestamps - Dataset<Row> words = lines.flatMap( - new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() { - @Override - public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> t) { - List<Tuple2<String, Timestamp>> result = new ArrayList<>(); - for (String word : t._1.split(" ")) { - result.add(new Tuple2<>(word, t._2)); + Dataset<Row> words = lines + .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())) + .flatMap( + new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() { + @Override + public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> t) { + List<Tuple2<String, Timestamp>> result = new ArrayList<>(); + for (String word : t._1.split(" ")) { + result.add(new Tuple2<>(word, t._2)); + } + return result.iterator(); } - return result.iterator(); - } - }, - Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()) - ).toDF("word", "timestamp"); + }, + Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()) + ).toDF("word", "timestamp"); // Group the data by window and word and compute the count of each group Dataset<Row> windowedCounts = words.groupBy( diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala index 364bff227b..f0756c4e18 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala @@ -56,10 +56,10 @@ object StructuredNetworkWordCount { .format("socket") .option("host", host) .option("port", port) - .load().as[String] + .load() // Split the lines into words - val words = lines.flatMap(_.split(" ")) + val words = lines.as[String].flatMap(_.split(" ")) // Generate running word count val wordCounts = words.groupBy("value").count() diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala index 333b0a9d24..b4dad21dd7 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala @@ -78,10 +78,10 @@ object StructuredNetworkWordCountWindowed { .option("host", host) .option("port", port) .option("includeTimestamp", true) - .load().as[(String, Timestamp)] + .load() // Split the lines into words, retaining timestamps - val words = lines.flatMap(line => + val words = lines.as[(String, Timestamp)].flatMap(line => line._1.split(" ").map(word => (word, line._2)) ).toDF("word", "timestamp") |