aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/java/org
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-03-11 11:19:51 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-03-11 11:19:51 -0700
commit51a79a770a8356bd0ed244af5ca7f1c44c9437d2 (patch)
treeaadbebaf509b00015a59b36521511cd0d0d61080 /examples/src/main/java/org
parent55c4831d68c8326380086b5540244f984ea9ec27 (diff)
downloadspark-51a79a770a8356bd0ed244af5ca7f1c44c9437d2.tar.gz
spark-51a79a770a8356bd0ed244af5ca7f1c44c9437d2.tar.bz2
spark-51a79a770a8356bd0ed244af5ca7f1c44c9437d2.zip
[SPARK-6274][Streaming][Examples] Added examples streaming + sql examples.
Added Scala, Java and Python streaming examples showing DataFrame and SQL operations within streaming. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #4975 from tdas/streaming-sql-examples and squashes the following commits: 705cba1 [Tathagata Das] Fixed python lint error 75a3fad [Tathagata Das] Fixed python lint error 5fbf789 [Tathagata Das] Removed empty lines at the end 874b943 [Tathagata Das] Added examples streaming + sql examples.
Diffstat (limited to 'examples/src/main/java/org')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaRecord.java31
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java122
2 files changed, 153 insertions, 0 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecord.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecord.java
new file mode 100644
index 0000000000..e63697a79f
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecord.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+/** Java Bean class to be used with the example JavaSqlNetworkWordCount. */
+public class JavaRecord implements java.io.Serializable {
+ private String word;
+
+ public String getWord() {
+ return word;
+ }
+
+ public void setWord(String word) {
+ this.word = word;
+ }
+}
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
new file mode 100644
index 0000000000..46562ddbbc
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+import java.util.regex.Pattern;
+
+import com.google.common.collect.Lists;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.api.java.StorageLevels;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.Time;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+/**
+ * Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text received from the
+ * network every second.
+ *
+ * Usage: JavaSqlNetworkWordCount <hostname> <port>
+ * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ bin/run-example org.apache.spark.examples.streaming.JavaSqlNetworkWordCount localhost 9999`
+ */
+
+public final class JavaSqlNetworkWordCount {
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ public static void main(String[] args) {
+ if (args.length < 2) {
+ System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
+ System.exit(1);
+ }
+
+ StreamingExamples.setStreamingLogLevels();
+
+ // Create the context with a 1 second batch size
+ SparkConf sparkConf = new SparkConf().setAppName("JavaSqlNetworkWordCount");
+ JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
+
+ // Create a JavaReceiverInputDStream on target ip:port and count the
+ // words in input stream of \n delimited text (eg. generated by 'nc')
+ // Note that no duplication in storage level only for running locally.
+ // Replication necessary in distributed scenario for fault tolerance.
+ JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
+ args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
+ JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+ @Override
+ public Iterable<String> call(String x) {
+ return Lists.newArrayList(SPACE.split(x));
+ }
+ });
+
+ // Convert RDDs of the words DStream to DataFrame and run SQL query
+ words.foreachRDD(new Function2<JavaRDD<String>, Time, Void>() {
+ @Override
+ public Void call(JavaRDD<String> rdd, Time time) {
+ SQLContext sqlContext = JavaSQLContextSingleton.getInstance(rdd.context());
+
+ // Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame
+ JavaRDD<JavaRecord> rowRDD = rdd.map(new Function<String, JavaRecord>() {
+ public JavaRecord call(String word) {
+ JavaRecord record = new JavaRecord();
+ record.setWord(word);
+ return record;
+ }
+ });
+ DataFrame wordsDataFrame = sqlContext.createDataFrame(rowRDD, JavaRecord.class);
+
+ // Register as table
+ wordsDataFrame.registerTempTable("words");
+
+ // Do word count on table using SQL and print it
+ DataFrame wordCountsDataFrame =
+ sqlContext.sql("select word, count(*) as total from words group by word");
+ System.out.println("========= " + time + "=========");
+ wordCountsDataFrame.show();
+ return null;
+ }
+ });
+
+ ssc.start();
+ ssc.awaitTermination();
+ }
+}
+
+/** Lazily instantiated singleton instance of SQLContext */
+class JavaSQLContextSingleton {
+ static private transient SQLContext instance = null;
+ static public SQLContext getInstance(SparkContext sparkContext) {
+ if (instance == null) {
+ instance = new SQLContext(sparkContext);
+ }
+ return instance;
+ }
+}