aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/java
diff options
context:
space:
mode:
authorJames Thomas <jamesjoethomas@gmail.com>2016-06-28 16:12:48 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-06-28 16:12:48 -0700
commit3554713a163c58ca176ffde87d2c6e4a91bacb50 (patch)
tree3e8e18f14eb0f1c5d00a535b17360414b1cf4fb3 /examples/src/main/java
parent8a977b065418f07d2bf4fe1607a5534c32d04c47 (diff)
downloadspark-3554713a163c58ca176ffde87d2c6e4a91bacb50.tar.gz
spark-3554713a163c58ca176ffde87d2c6e4a91bacb50.tar.bz2
spark-3554713a163c58ca176ffde87d2c6e4a91bacb50.zip
[SPARK-16114][SQL] structured streaming network word count examples
## What changes were proposed in this pull request? Network word count example for structured streaming ## How was this patch tested? Run locally Author: James Thomas <jamesjoethomas@gmail.com> Author: James Thomas <jamesthomas@Jamess-MacBook-Pro.local> Closes #13816 from jjthomas/master.
Diffstat (limited to 'examples/src/main/java')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java82
1 files changed, 82 insertions, 0 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
new file mode 100644
index 0000000000..a2cf938954
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql.streaming;
+
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.streaming.StreamingQuery;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
+ *
+ * Usage: JavaStructuredNetworkWordCount <hostname> <port>
+ * <hostname> and <port> describe the TCP server that Structured Streaming
+ * would connect to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ bin/run-example sql.streaming.JavaStructuredNetworkWordCount
+ * localhost 9999`
+ */
+public final class JavaStructuredNetworkWordCount {
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 2) {
+ System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
+ System.exit(1);
+ }
+
+ String host = args[0];
+ int port = Integer.parseInt(args[1]);
+
+ SparkSession spark = SparkSession
+ .builder()
+ .appName("JavaStructuredNetworkWordCount")
+ .getOrCreate();
+
+ // Create DataFrame representing the stream of input lines from connection to host:port
+ Dataset<String> lines = spark
+ .readStream()
+ .format("socket")
+ .option("host", host)
+ .option("port", port)
+ .load().as(Encoders.STRING());
+
+ // Split the lines into words
+ Dataset<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+ @Override
+ public Iterator<String> call(String x) {
+ return Arrays.asList(x.split(" ")).iterator();
+ }
+ }, Encoders.STRING());
+
+ // Generate running word count
+ Dataset<Row> wordCounts = words.groupBy("value").count();
+
+ // Start running the query that prints the running counts to the console
+ StreamingQuery query = wordCounts.writeStream()
+ .outputMode("complete")
+ .format("console")
+ .start();
+
+ query.awaitTermination();
+ }
+}