aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-03-11 11:19:51 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-03-11 11:19:51 -0700
commit51a79a770a8356bd0ed244af5ca7f1c44c9437d2 (patch)
treeaadbebaf509b00015a59b36521511cd0d0d61080
parent55c4831d68c8326380086b5540244f984ea9ec27 (diff)
downloadspark-51a79a770a8356bd0ed244af5ca7f1c44c9437d2.tar.gz
spark-51a79a770a8356bd0ed244af5ca7f1c44c9437d2.tar.bz2
spark-51a79a770a8356bd0ed244af5ca7f1c44c9437d2.zip
[SPARK-6274][Streaming][Examples] Added examples streaming + sql examples.
Added Scala, Java and Python streaming examples showing DataFrame and SQL operations within streaming. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #4975 from tdas/streaming-sql-examples and squashes the following commits: 705cba1 [Tathagata Das] Fixed python lint error 75a3fad [Tathagata Das] Fixed python lint error 5fbf789 [Tathagata Das] Removed empty lines at the end 874b943 [Tathagata Das] Added examples streaming + sql examples.
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaRecord.java31
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java122
-rw-r--r--examples/src/main/python/streaming/sql_network_wordcount.py82
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala101
4 files changed, 336 insertions, 0 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecord.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecord.java
new file mode 100644
index 0000000000..e63697a79f
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecord.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+/** Java Bean class to be used with the example JavaSqlNetworkWordCount. */
+public class JavaRecord implements java.io.Serializable {
+ private String word;
+
+ public String getWord() {
+ return word;
+ }
+
+ public void setWord(String word) {
+ this.word = word;
+ }
+}
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
new file mode 100644
index 0000000000..46562ddbbc
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+import java.util.regex.Pattern;
+
+import com.google.common.collect.Lists;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.api.java.StorageLevels;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.Time;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+
+/**
+ * Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text received from the
+ * network every second.
+ *
+ * Usage: JavaSqlNetworkWordCount <hostname> <port>
+ * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ bin/run-example org.apache.spark.examples.streaming.JavaSqlNetworkWordCount localhost 9999`
+ */
+
+public final class JavaSqlNetworkWordCount {
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ public static void main(String[] args) {
+ if (args.length < 2) {
+ System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
+ System.exit(1);
+ }
+
+ StreamingExamples.setStreamingLogLevels();
+
+ // Create the context with a 1 second batch size
+ SparkConf sparkConf = new SparkConf().setAppName("JavaSqlNetworkWordCount");
+ JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
+
+ // Create a JavaReceiverInputDStream on target ip:port and count the
+ // words in input stream of \n delimited text (eg. generated by 'nc')
+ // Note that no duplication in storage level only for running locally.
+ // Replication necessary in distributed scenario for fault tolerance.
+ JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
+ args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
+ JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+ @Override
+ public Iterable<String> call(String x) {
+ return Lists.newArrayList(SPACE.split(x));
+ }
+ });
+
+ // Convert RDDs of the words DStream to DataFrame and run SQL query
+ words.foreachRDD(new Function2<JavaRDD<String>, Time, Void>() {
+ @Override
+ public Void call(JavaRDD<String> rdd, Time time) {
+ SQLContext sqlContext = JavaSQLContextSingleton.getInstance(rdd.context());
+
+ // Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame
+ JavaRDD<JavaRecord> rowRDD = rdd.map(new Function<String, JavaRecord>() {
+ public JavaRecord call(String word) {
+ JavaRecord record = new JavaRecord();
+ record.setWord(word);
+ return record;
+ }
+ });
+ DataFrame wordsDataFrame = sqlContext.createDataFrame(rowRDD, JavaRecord.class);
+
+ // Register as table
+ wordsDataFrame.registerTempTable("words");
+
+ // Do word count on table using SQL and print it
+ DataFrame wordCountsDataFrame =
+ sqlContext.sql("select word, count(*) as total from words group by word");
+ System.out.println("========= " + time + "=========");
+ wordCountsDataFrame.show();
+ return null;
+ }
+ });
+
+ ssc.start();
+ ssc.awaitTermination();
+ }
+}
+
+/** Lazily instantiated singleton instance of SQLContext */
+class JavaSQLContextSingleton {
+ static private transient SQLContext instance = null;
+ static public SQLContext getInstance(SparkContext sparkContext) {
+ if (instance == null) {
+ instance = new SQLContext(sparkContext);
+ }
+ return instance;
+ }
+}
diff --git a/examples/src/main/python/streaming/sql_network_wordcount.py b/examples/src/main/python/streaming/sql_network_wordcount.py
new file mode 100644
index 0000000000..f89bc562d8
--- /dev/null
+++ b/examples/src/main/python/streaming/sql_network_wordcount.py
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+ Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text received from the
+ network every second.
+
+ Usage: sql_network_wordcount.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+ `$ nc -lk 9999`
+ and then run the example
+ `$ bin/spark-submit examples/src/main/python/streaming/sql_network_wordcount.py localhost 9999`
+"""
+
+import os
+import sys
+
+from pyspark import SparkContext
+from pyspark.streaming import StreamingContext
+from pyspark.sql import SQLContext, Row
+
+
+def getSqlContextInstance(sparkContext):
+ if ('sqlContextSingletonInstance' not in globals()):
+ globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
+ return globals()['sqlContextSingletonInstance']
+
+
+if __name__ == "__main__":
+ if len(sys.argv) != 3:
+ print >> sys.stderr, "Usage: sql_network_wordcount.py <hostname> <port> "
+ exit(-1)
+ host, port = sys.argv[1:]
+ sc = SparkContext(appName="PythonSqlNetworkWordCount")
+ ssc = StreamingContext(sc, 1)
+
+ # Create a socket stream on target ip:port and count the
+ # words in input stream of \n delimited text (eg. generated by 'nc')
+ lines = ssc.socketTextStream(host, int(port))
+ words = lines.flatMap(lambda line: line.split(" "))
+
+ # Convert RDDs of the words DStream to DataFrame and run SQL query
+ def process(time, rdd):
+ print "========= %s =========" % str(time)
+
+ try:
+ # Get the singleton instance of SQLContext
+ sqlContext = getSqlContextInstance(rdd.context)
+
+ # Convert RDD[String] to RDD[Row] to DataFrame
+ rowRdd = rdd.map(lambda w: Row(word=w))
+ wordsDataFrame = sqlContext.createDataFrame(rowRdd)
+
+ # Register as table
+ wordsDataFrame.registerTempTable("words")
+
+ # Do word count on table using SQL and print it
+ wordCountsDataFrame = \
+ sqlContext.sql("select word, count(*) as total from words group by word")
+ wordCountsDataFrame.show()
+ except:
+ pass
+
+ words.foreachRDD(process)
+ ssc.start()
+ ssc.awaitTermination()
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
new file mode 100644
index 0000000000..5a6b9216a3
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
+import org.apache.spark.util.IntParam
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text received from the
+ * network every second.
+ *
+ * Usage: SqlNetworkWordCount <hostname> <port>
+ * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ bin/run-example org.apache.spark.examples.streaming.SqlNetworkWordCount localhost 9999`
+ */
+
+object SqlNetworkWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ System.err.println("Usage: NetworkWordCount <hostname> <port>")
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ // Create the context with a 2 second batch size
+ val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount")
+ val ssc = new StreamingContext(sparkConf, Seconds(2))
+
+ // Create a socket stream on target ip:port and count the
+ // words in input stream of \n delimited text (eg. generated by 'nc')
+ // Note that no duplication in storage level only for running locally.
+ // Replication necessary in distributed scenario for fault tolerance.
+ val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
+ val words = lines.flatMap(_.split(" "))
+
+ // Convert RDDs of the words DStream to DataFrame and run SQL query
+ words.foreachRDD((rdd: RDD[String], time: Time) => {
+ // Get the singleton instance of SQLContext
+ val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
+ import sqlContext.implicits._
+
+ // Convert RDD[String] to RDD[case class] to DataFrame
+ val wordsDataFrame = rdd.map(w => Record(w)).toDF()
+
+ // Register as table
+ wordsDataFrame.registerTempTable("words")
+
+ // Do word count on table using SQL and print it
+ val wordCountsDataFrame =
+ sqlContext.sql("select word, count(*) as total from words group by word")
+ println(s"========= $time =========")
+ wordCountsDataFrame.show()
+ })
+
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
+
+
+/** Case class for converting RDD to DataFrame */
+case class Record(word: String)
+
+
+/** Lazily instantiated singleton instance of SQLContext */
+object SQLContextSingleton {
+
+ @transient private var instance: SQLContext = _
+
+ def getInstance(sparkContext: SparkContext): SQLContext = {
+ if (instance == null) {
+ instance = new SQLContext(sparkContext)
+ }
+ instance
+ }
+}