aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/scala
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-03-11 11:19:51 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-03-11 11:19:51 -0700
commit51a79a770a8356bd0ed244af5ca7f1c44c9437d2 (patch)
treeaadbebaf509b00015a59b36521511cd0d0d61080 /examples/src/main/scala
parent55c4831d68c8326380086b5540244f984ea9ec27 (diff)
downloadspark-51a79a770a8356bd0ed244af5ca7f1c44c9437d2.tar.gz
spark-51a79a770a8356bd0ed244af5ca7f1c44c9437d2.tar.bz2
spark-51a79a770a8356bd0ed244af5ca7f1c44c9437d2.zip
[SPARK-6274][Streaming][Examples] Added examples streaming + sql examples.
Added Scala, Java and Python streaming examples showing DataFrame and SQL operations within streaming. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #4975 from tdas/streaming-sql-examples and squashes the following commits: 705cba1 [Tathagata Das] Fixed python lint error 75a3fad [Tathagata Das] Fixed python lint error 5fbf789 [Tathagata Das] Removed empty lines at the end 874b943 [Tathagata Das] Added examples streaming + sql examples.
Diffstat (limited to 'examples/src/main/scala')
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala101
1 files changed, 101 insertions, 0 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
new file mode 100644
index 0000000000..5a6b9216a3
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
+import org.apache.spark.util.IntParam
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.storage.StorageLevel
+
+/**
+ * Use DataFrames and SQL to count words in UTF8 encoded, '\n' delimited text received from the
+ * network every second.
+ *
+ * Usage: SqlNetworkWordCount <hostname> <port>
+ * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ bin/run-example org.apache.spark.examples.streaming.SqlNetworkWordCount localhost 9999`
+ */
+
+object SqlNetworkWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ System.err.println("Usage: NetworkWordCount <hostname> <port>")
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ // Create the context with a 2 second batch size
+ val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount")
+ val ssc = new StreamingContext(sparkConf, Seconds(2))
+
+ // Create a socket stream on target ip:port and count the
+ // words in input stream of \n delimited text (eg. generated by 'nc')
+ // Note that no duplication in storage level only for running locally.
+ // Replication necessary in distributed scenario for fault tolerance.
+ val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
+ val words = lines.flatMap(_.split(" "))
+
+ // Convert RDDs of the words DStream to DataFrame and run SQL query
+ words.foreachRDD((rdd: RDD[String], time: Time) => {
+ // Get the singleton instance of SQLContext
+ val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
+ import sqlContext.implicits._
+
+ // Convert RDD[String] to RDD[case class] to DataFrame
+ val wordsDataFrame = rdd.map(w => Record(w)).toDF()
+
+ // Register as table
+ wordsDataFrame.registerTempTable("words")
+
+ // Do word count on table using SQL and print it
+ val wordCountsDataFrame =
+ sqlContext.sql("select word, count(*) as total from words group by word")
+ println(s"========= $time =========")
+ wordCountsDataFrame.show()
+ })
+
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
+
+
+/** Case class for converting RDD to DataFrame */
+case class Record(word: String)
+
+
+/** Lazily instantiated singleton instance of SQLContext */
+object SQLContextSingleton {
+
+ @transient private var instance: SQLContext = _
+
+ def getInstance(sparkContext: SparkContext): SQLContext = {
+ if (instance == null) {
+ instance = new SQLContext(sparkContext)
+ }
+ instance
+ }
+}