From a000b5c3b0438c17e9973df4832c320210c29c27 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Tue, 6 May 2014 17:27:52 -0700 Subject: SPARK-1637: Clean up examples for 1.0 - [x] Move all of them into subpackages of org.apache.spark.examples (right now some are in org.apache.spark.streaming.examples, for instance, and others are in org.apache.spark.examples.mllib) - [x] Move Python examples into examples/src/main/python - [x] Update docs to reflect these changes Author: Sandeep This patch had conflicts when merged, resolved by Committer: Matei Zaharia Closes #571 from techaddict/SPARK-1637 and squashes the following commits: 47ef86c [Sandeep] Changes based on Discussions on PR, removing use of RawTextHelper from examples 8ed2d3f [Sandeep] Docs Updated for changes, Change for java examples 5f96121 [Sandeep] Move Python examples into examples/src/main/python 0a8dd77 [Sandeep] Move all Scala Examples to org.apache.spark.examples (some are in org.apache.spark.streaming.examples, for instance, and others are in org.apache.spark.examples.mllib) --- .../apache/spark/examples/sql/RDDRelation.scala | 71 +++++++++ .../spark/examples/sql/hive/HiveFromSpark.scala | 64 ++++++++ .../spark/examples/streaming/ActorWordCount.scala | 177 +++++++++++++++++++++ .../spark/examples/streaming/FlumeEventCount.scala | 65 ++++++++ .../spark/examples/streaming/HdfsWordCount.scala | 55 +++++++ .../spark/examples/streaming/KafkaWordCount.scala | 103 ++++++++++++ .../spark/examples/streaming/MQTTWordCount.scala | 109 +++++++++++++ .../examples/streaming/NetworkWordCount.scala | 61 +++++++ .../spark/examples/streaming/QueueStream.scala | 58 +++++++ .../spark/examples/streaming/RawNetworkGrep.scala | 62 ++++++++ .../streaming/RecoverableNetworkWordCount.scala | 122 ++++++++++++++ .../streaming/StatefulNetworkWordCount.scala | 73 +++++++++ .../examples/streaming/StreamingExamples.scala | 38 +++++ .../examples/streaming/TwitterAlgebirdCMS.scala | 119 ++++++++++++++ .../examples/streaming/TwitterAlgebirdHLL.scala | 96 +++++++++++ .../examples/streaming/TwitterPopularTags.scala | 74 +++++++++ .../spark/examples/streaming/ZeroMQWordCount.scala | 101 ++++++++++++ .../streaming/clickstream/PageViewGenerator.scala | 109 +++++++++++++ .../streaming/clickstream/PageViewStream.scala | 107 +++++++++++++ .../apache/spark/sql/examples/HiveFromSpark.scala | 64 -------- .../apache/spark/sql/examples/RDDRelation.scala | 71 --------- .../spark/streaming/examples/ActorWordCount.scala | 177 --------------------- .../spark/streaming/examples/FlumeEventCount.scala | 65 -------- .../spark/streaming/examples/HdfsWordCount.scala | 55 ------- .../spark/streaming/examples/KafkaWordCount.scala | 104 ------------ .../spark/streaming/examples/MQTTWordCount.scala | 109 ------------- .../streaming/examples/NetworkWordCount.scala | 61 ------- .../spark/streaming/examples/QueueStream.scala | 58 ------- .../spark/streaming/examples/RawNetworkGrep.scala | 66 -------- .../examples/RecoverableNetworkWordCount.scala | 122 -------------- .../examples/StatefulNetworkWordCount.scala | 73 --------- .../streaming/examples/StreamingExamples.scala | 38 ----- .../streaming/examples/TwitterAlgebirdCMS.scala | 119 -------------- .../streaming/examples/TwitterAlgebirdHLL.scala | 96 ----------- .../streaming/examples/TwitterPopularTags.scala | 74 --------- .../spark/streaming/examples/ZeroMQWordCount.scala | 101 ------------ .../examples/clickstream/PageViewGenerator.scala | 109 ------------- .../examples/clickstream/PageViewStream.scala | 107 ------------- 38 files changed, 1664 insertions(+), 1669 deletions(-) create mode 100644 examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/StreamingExamples.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala delete mode 100644 examples/src/main/scala/org/apache/spark/sql/examples/HiveFromSpark.scala delete mode 100644 examples/src/main/scala/org/apache/spark/sql/examples/RDDRelation.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala delete mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala (limited to 'examples/src/main/scala') diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala new file mode 100644 index 0000000000..ff9254b044 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.sql + +import org.apache.spark.SparkContext +import org.apache.spark.sql.SQLContext + +// One method for defining the schema of an RDD is to make a case class with the desired column +// names and types. +case class Record(key: Int, value: String) + +object RDDRelation { + def main(args: Array[String]) { + val sc = new SparkContext("local", "RDDRelation") + val sqlContext = new SQLContext(sc) + + // Importing the SQL context gives access to all the SQL functions and implicit conversions. + import sqlContext._ + + val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))) + // Any RDD containing case classes can be registered as a table. The schema of the table is + // automatically inferred using scala reflection. + rdd.registerAsTable("records") + + // Once tables have been registered, you can run SQL queries over them. + println("Result of SELECT *:") + sql("SELECT * FROM records").collect().foreach(println) + + // Aggregation queries are also supported. + val count = sql("SELECT COUNT(*) FROM records").collect().head.getInt(0) + println(s"COUNT(*): $count") + + // The results of SQL queries are themselves RDDs and support all normal RDD functions. The + // items in the RDD are of type Row, which allows you to access each column by ordinal. + val rddFromSql = sql("SELECT key, value FROM records WHERE key < 10") + + println("Result of RDD.map:") + rddFromSql.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect.foreach(println) + + // Queries can also be written using a LINQ-like Scala DSL. + rdd.where('key === 1).orderBy('value.asc).select('key).collect().foreach(println) + + // Write out an RDD as a parquet file. + rdd.saveAsParquetFile("pair.parquet") + + // Read in parquet file. Parquet files are self-describing so the schmema is preserved. + val parquetFile = sqlContext.parquetFile("pair.parquet") + + // Queries can be run using the DSL on parequet files just like the original RDD. + parquetFile.where('key === 1).select('value as 'a).collect().foreach(println) + + // These files can also be registered as tables. + parquetFile.registerAsTable("parquetFile") + sql("SELECT * FROM parquetFile").collect().foreach(println) + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala new file mode 100644 index 0000000000..66ce93a26e --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.sql.hive + +import org.apache.spark.SparkContext +import org.apache.spark.sql._ +import org.apache.spark.sql.hive.LocalHiveContext + +object HiveFromSpark { + case class Record(key: Int, value: String) + + def main(args: Array[String]) { + val sc = new SparkContext("local", "HiveFromSpark") + + // A local hive context creates an instance of the Hive Metastore in process, storing the + // the warehouse data in the current directory. This location can be overridden by + // specifying a second parameter to the constructor. + val hiveContext = new LocalHiveContext(sc) + import hiveContext._ + + hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") + hql("LOAD DATA LOCAL INPATH 'src/main/resources/kv1.txt' INTO TABLE src") + + // Queries are expressed in HiveQL + println("Result of 'SELECT *': ") + hql("SELECT * FROM src").collect.foreach(println) + + // Aggregation queries are also supported. + val count = hql("SELECT COUNT(*) FROM src").collect().head.getInt(0) + println(s"COUNT(*): $count") + + // The results of SQL queries are themselves RDDs and support all normal RDD functions. The + // items in the RDD are of type Row, which allows you to access each column by ordinal. + val rddFromSql = hql("SELECT key, value FROM src WHERE key < 10 ORDER BY key") + + println("Result of RDD.map:") + val rddAsStrings = rddFromSql.map { + case Row(key: Int, value: String) => s"Key: $key, Value: $value" + } + + // You can also register RDDs as temporary tables within a HiveContext. + val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))) + rdd.registerAsTable("records") + + // Queries can then join RDD data with data stored in Hive. + println("Result of SELECT *:") + hql("SELECT * FROM records r JOIN src s ON r.key = s.key").collect().foreach(println) + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala new file mode 100644 index 0000000000..84cf43df0f --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import scala.collection.mutable.LinkedList +import scala.reflect.ClassTag +import scala.util.Random + +import akka.actor.{Actor, ActorRef, Props, actorRef2Scala} + +import org.apache.spark.{SparkConf, SecurityManager} +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions +import org.apache.spark.util.AkkaUtils +import org.apache.spark.streaming.receiver.ActorHelper + +case class SubscribeReceiver(receiverActor: ActorRef) +case class UnsubscribeReceiver(receiverActor: ActorRef) + +/** + * Sends the random content to every receiver subscribed with 1/2 + * second delay. + */ +class FeederActor extends Actor { + + val rand = new Random() + var receivers: LinkedList[ActorRef] = new LinkedList[ActorRef]() + + val strings: Array[String] = Array("words ", "may ", "count ") + + def makeMessage(): String = { + val x = rand.nextInt(3) + strings(x) + strings(2 - x) + } + + /* + * A thread to generate random messages + */ + new Thread() { + override def run() { + while (true) { + Thread.sleep(500) + receivers.foreach(_ ! makeMessage) + } + } + }.start() + + def receive: Receive = { + + case SubscribeReceiver(receiverActor: ActorRef) => + println("received subscribe from %s".format(receiverActor.toString)) + receivers = LinkedList(receiverActor) ++ receivers + + case UnsubscribeReceiver(receiverActor: ActorRef) => + println("received unsubscribe from %s".format(receiverActor.toString)) + receivers = receivers.dropWhile(x => x eq receiverActor) + + } +} + +/** + * A sample actor as receiver, is also simplest. This receiver actor + * goes and subscribe to a typical publisher/feeder actor and receives + * data. + * + * @see [[org.apache.spark.examples.streaming.FeederActor]] + */ +class SampleActorReceiver[T: ClassTag](urlOfPublisher: String) +extends Actor with ActorHelper { + + lazy private val remotePublisher = context.actorSelection(urlOfPublisher) + + override def preStart = remotePublisher ! SubscribeReceiver(context.self) + + def receive = { + case msg => store(msg.asInstanceOf[T]) + } + + override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self) + +} + +/** + * A sample feeder actor + * + * Usage: FeederActor + * and describe the AkkaSystem that Spark Sample feeder would start on. + */ +object FeederActor { + + def main(args: Array[String]) { + if(args.length < 2){ + System.err.println( + "Usage: FeederActor \n" + ) + System.exit(1) + } + val Seq(host, port) = args.toSeq + + val conf = new SparkConf + val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, conf = conf, + securityManager = new SecurityManager(conf))._1 + val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor") + + println("Feeder started as:" + feeder) + + actorSystem.awaitTermination() + } +} + +/** + * A sample word count program demonstrating the use of plugging in + * Actor as Receiver + * Usage: ActorWordCount + * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. + * and describe the AkkaSystem that Spark Sample feeder is running on. + * + * To run this example locally, you may run Feeder Actor as + * `$ ./bin/run-example org.apache.spark.examples.streaming.FeederActor 127.0.1.1 9999` + * and then run the example + * `./bin/run-example org.apache.spark.examples.streaming.ActorWordCount local[2] 127.0.1.1 9999` + */ +object ActorWordCount { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println( + "Usage: ActorWordCount " + + "In local mode, should be 'local[n]' with n > 1") + System.exit(1) + } + + StreamingExamples.setStreamingLogLevels() + + val Seq(master, host, port) = args.toSeq + + // Create the context and set the batch size + val ssc = new StreamingContext(master, "ActorWordCount", Seconds(2), + System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + + /* + * Following is the use of actorStream to plug in custom actor as receiver + * + * An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e type of data received and InputDstream + * should be same. + * + * For example: Both actorStream and SampleActorReceiver are parameterized + * to same type to ensure type safety. + */ + + val lines = ssc.actorStream[String]( + Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format( + host, port.toInt))), "SampleReceiver") + + // compute wordcount + lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print() + + ssc.start() + ssc.awaitTermination() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala new file mode 100644 index 0000000000..5b2a1035fc --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ +import org.apache.spark.streaming.flume._ +import org.apache.spark.util.IntParam + +/** + * Produces a count of events received from Flume. + * + * This should be used in conjunction with an AvroSink in Flume. It will start + * an Avro server on at the request host:port address and listen for requests. + * Your Flume AvroSink should be pointed to this address. + * + * Usage: FlumeEventCount + * + * is a Spark master URL + * is the host the Flume receiver will be started on - a receiver + * creates a server and listens for flume events. + * is the port the Flume receiver will listen on. + */ +object FlumeEventCount { + def main(args: Array[String]) { + if (args.length != 3) { + System.err.println( + "Usage: FlumeEventCount ") + System.exit(1) + } + + StreamingExamples.setStreamingLogLevels() + + val Array(master, host, IntParam(port)) = args + + val batchInterval = Milliseconds(2000) + // Create the context and set the batch size + val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval, + System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + + // Create a flume stream + val stream = FlumeUtils.createStream(ssc, host,port,StorageLevel.MEMORY_ONLY_SER_2) + + // Print out the count of events received from this server in each batch + stream.count().map(cnt => "Received " + cnt + " flume events." ).print() + + ssc.start() + ssc.awaitTermination() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala new file mode 100644 index 0000000000..b440956ba3 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ + +/** + * Counts words in new text files created in the given directory + * Usage: HdfsWordCount + * is the Spark master URL. + * is the directory that Spark Streaming will use to find and read new text files. + * + * To run this on your local machine on directory `localdir`, run this example + * `$ ./bin/run-example org.apache.spark.examples.streaming.HdfsWordCount local[2] localdir` + * Then create a text file in `localdir` and the words in the file will get counted. + */ +object HdfsWordCount { + def main(args: Array[String]) { + if (args.length < 2) { + System.err.println("Usage: HdfsWordCount ") + System.exit(1) + } + + StreamingExamples.setStreamingLogLevels() + + // Create the context + val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2), + System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + + // Create the FileInputDStream on the directory and use the + // stream to count words in new files created + val lines = ssc.textFileStream(args(1)) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.print() + ssc.start() + ssc.awaitTermination() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala new file mode 100644 index 0000000000..c3aae5af05 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import java.util.Properties + +import kafka.producer._ + +import org.apache.spark.streaming._ +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.kafka._ + +// scalastyle:off +/** + * Consumes messages from one or more topics in Kafka and does wordcount. + * Usage: KafkaWordCount + * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. + * is a list of one or more zookeeper servers that make quorum + * is the name of kafka consumer group + * is a list of one or more kafka topics to consume from + * is the number of threads the kafka consumer should use + * + * Example: + * `./bin/run-example org.apache.spark.examples.streaming.KafkaWordCount local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1` + */ +// scalastyle:on +object KafkaWordCount { + def main(args: Array[String]) { + if (args.length < 5) { + System.err.println("Usage: KafkaWordCount ") + System.exit(1) + } + + StreamingExamples.setStreamingLogLevels() + + val Array(master, zkQuorum, group, topics, numThreads) = args + + val ssc = new StreamingContext(master, "KafkaWordCount", Seconds(2), + System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + ssc.checkpoint("checkpoint") + + val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap + val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1L)) + .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2) + wordCounts.print() + + ssc.start() + ssc.awaitTermination() + } +} + +// Produces some random words between 1 and 100. +object KafkaWordCountProducer { + + def main(args: Array[String]) { + if (args.length < 4) { + System.err.println("Usage: KafkaWordCountProducer " + + " ") + System.exit(1) + } + + val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args + + // Zookeper connection properties + val props = new Properties() + props.put("metadata.broker.list", brokers) + props.put("serializer.class", "kafka.serializer.StringEncoder") + + val config = new ProducerConfig(props) + val producer = new Producer[String, String](config) + + // Send some messages + while(true) { + val messages = (1 to messagesPerSec.toInt).map { messageNum => + val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) + .mkString(" ") + + new KeyedMessage[String, String](topic, str) + }.toArray + + producer.send(messages: _*) + Thread.sleep(100) + } + } + +} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala new file mode 100644 index 0000000000..47bf1e5a06 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import org.eclipse.paho.client.mqttv3.{MqttClient, MqttClientPersistence, MqttException, MqttMessage, MqttTopic} +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.mqtt._ + +/** + * A simple Mqtt publisher for demonstration purposes, repeatedly publishes + * Space separated String Message "hello mqtt demo for spark streaming" + */ +object MQTTPublisher { + + var client: MqttClient = _ + + def main(args: Array[String]) { + if (args.length < 2) { + System.err.println("Usage: MQTTPublisher ") + System.exit(1) + } + + StreamingExamples.setStreamingLogLevels() + + val Seq(brokerUrl, topic) = args.toSeq + + try { + var peristance:MqttClientPersistence =new MqttDefaultFilePersistence("/tmp") + client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance) + } catch { + case e: MqttException => println("Exception Caught: " + e) + } + + client.connect() + + val msgtopic: MqttTopic = client.getTopic(topic) + val msg: String = "hello mqtt demo for spark streaming" + + while (true) { + val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes("utf-8")) + msgtopic.publish(message) + println("Published data. topic: " + msgtopic.getName() + " Message: " + message) + } + client.disconnect() + } +} + +// scalastyle:off +/** + * A sample wordcount with MqttStream stream + * + * To work with Mqtt, Mqtt Message broker/server required. + * Mosquitto (http://mosquitto.org/) is an open source Mqtt Broker + * In ubuntu mosquitto can be installed using the command `$ sudo apt-get install mosquitto` + * Eclipse paho project provides Java library for Mqtt Client http://www.eclipse.org/paho/ + * Example Java code for Mqtt Publisher and Subscriber can be found here + * https://bitbucket.org/mkjinesh/mqttclient + * Usage: MQTTWordCount + * In local mode, should be 'local[n]' with n > 1 + * and describe where Mqtt publisher is running. + * + * To run this example locally, you may run publisher as + * `$ ./bin/run-example org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo` + * and run the example as + * `$ ./bin/run-example org.apache.spark.examples.streaming.MQTTWordCount local[2] tcp://localhost:1883 foo` + */ +// scalastyle:on +object MQTTWordCount { + + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println( + "Usage: MQTTWordCount " + + " In local mode, should be 'local[n]' with n > 1") + System.exit(1) + } + + val Seq(master, brokerUrl, topic) = args.toSeq + + val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"), + StreamingContext.jarOfClass(this.getClass).toSeq) + val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2) + + val words = lines.flatMap(x => x.toString.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.print() + ssc.start() + ssc.awaitTermination() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala new file mode 100644 index 0000000000..acfe9a4da3 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.storage.StorageLevel + +// scalastyle:off +/** + * Counts words in text encoded with UTF8 received from the network every second. + * + * Usage: NetworkWordCount + * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. + * and describe the TCP server that Spark Streaming would connect to receive data. + * + * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ ./bin/run-example org.apache.spark.examples.streaming.NetworkWordCount local[2] localhost 9999` + */ +// scalastyle:on +object NetworkWordCount { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: NetworkWordCount \n" + + "In local mode, should be 'local[n]' with n > 1") + System.exit(1) + } + + StreamingExamples.setStreamingLogLevels() + + // Create the context with a 1 second batch size + val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1), + System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + + // Create a NetworkInputDStream on target ip:port and count the + // words in input stream of \n delimited text (eg. generated by 'nc') + val lines = ssc.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_ONLY_SER) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.print() + ssc.start() + ssc.awaitTermination() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala new file mode 100644 index 0000000000..f92f72f2de --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/QueueStream.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import scala.collection.mutable.SynchronizedQueue + +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ + +object QueueStream { + + def main(args: Array[String]) { + if (args.length < 1) { + System.err.println("Usage: QueueStream ") + System.exit(1) + } + + StreamingExamples.setStreamingLogLevels() + + // Create the context + val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1), + System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + + // Create the queue through which RDDs can be pushed to + // a QueueInputDStream + val rddQueue = new SynchronizedQueue[RDD[Int]]() + + // Create the QueueInputDStream and use it do some processing + val inputStream = ssc.queueStream(rddQueue) + val mappedStream = inputStream.map(x => (x % 10, 1)) + val reducedStream = mappedStream.reduceByKey(_ + _) + reducedStream.print() + ssc.start() + + // Create and push some RDDs into + for (i <- 1 to 30) { + rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10) + Thread.sleep(1000) + } + ssc.stop() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala new file mode 100644 index 0000000000..1b0319a046 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RawNetworkGrep.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ +import org.apache.spark.util.IntParam + +/** + * Receives text from multiple rawNetworkStreams and counts how many '\n' delimited + * lines have the word 'the' in them. This is useful for benchmarking purposes. This + * will only work with spark.streaming.util.RawTextSender running on all worker nodes + * and with Spark using Kryo serialization (set Java property "spark.serializer" to + * "org.apache.spark.serializer.KryoSerializer"). + * Usage: RawNetworkGrep + * is the Spark master URL + * is the number rawNetworkStreams, which should be same as number + * of work nodes in the cluster + * is "localhost". + * is the port on which RawTextSender is running in the worker nodes. + * is the Spark Streaming batch duration in milliseconds. + */ + +object RawNetworkGrep { + def main(args: Array[String]) { + if (args.length != 5) { + System.err.println("Usage: RawNetworkGrep ") + System.exit(1) + } + + StreamingExamples.setStreamingLogLevels() + + val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args + + // Create the context + val ssc = new StreamingContext(master, "RawNetworkGrep", Milliseconds(batchMillis), + System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + + val rawStreams = (1 to numStreams).map(_ => + ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray + val union = ssc.union(rawStreams) + union.filter(_.contains("the")).count().foreachRDD(r => + println("Grep count: " + r.collect().mkString)) + ssc.start() + ssc.awaitTermination() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala new file mode 100644 index 0000000000..b0bc31cc66 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import org.apache.spark.streaming.{Time, Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.util.IntParam +import java.io.File +import org.apache.spark.rdd.RDD +import com.google.common.io.Files +import java.nio.charset.Charset + +/** + * Counts words in text encoded with UTF8 received from the network every second. + * + * Usage: NetworkWordCount + * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. + * and describe the TCP server that Spark Streaming would connect to receive + * data. directory to HDFS-compatible file system which checkpoint data + * file to which the word counts will be appended + * + * In local mode, should be 'local[n]' with n > 1 + * and must be absolute paths + * + * + * To run this on your local machine, you need to first run a Netcat server + * + * `$ nc -lk 9999` + * + * and run the example as + * + * `$ ./run-example org.apache.spark.examples.streaming.RecoverableNetworkWordCount \ + * local[2] localhost 9999 ~/checkpoint/ ~/out` + * + * If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create + * a new StreamingContext (will print "Creating new context" to the console). Otherwise, if + * checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from + * the checkpoint data. + * + * To run this example in a local standalone cluster with automatic driver recovery, + * + * `$ ./spark-class org.apache.spark.deploy.Client -s launch \ + * \ + * org.apache.spark.examples.streaming.RecoverableNetworkWordCount \ + * localhost 9999 ~/checkpoint ~/out` + * + * would typically be + * /examples/target/scala-XX/spark-examples....jar + * + * Refer to the online documentation for more details. + */ + +object RecoverableNetworkWordCount { + + def createContext(master: String, ip: String, port: Int, outputPath: String) = { + + // If you do not see this printed, that means the StreamingContext has been loaded + // from the new checkpoint + println("Creating new context") + val outputFile = new File(outputPath) + if (outputFile.exists()) outputFile.delete() + + // Create the context with a 1 second batch size + val ssc = new StreamingContext(master, "RecoverableNetworkWordCount", Seconds(1), + System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + + // Create a NetworkInputDStream on target ip:port and count the + // words in input stream of \n delimited text (eg. generated by 'nc') + val lines = ssc.socketTextStream(ip, port) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => { + val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]") + println(counts) + println("Appending to " + outputFile.getAbsolutePath) + Files.append(counts + "\n", outputFile, Charset.defaultCharset()) + }) + ssc + } + + def main(args: Array[String]) { + if (args.length != 5) { + System.err.println("You arguments were " + args.mkString("[", ", ", "]")) + System.err.println( + """ + |Usage: RecoverableNetworkWordCount + | is the Spark master URL. In local mode, should be + | 'local[n]' with n > 1. and describe the TCP server that Spark + | Streaming would connect to receive data. directory to + | HDFS-compatible file system which checkpoint data file to which the + | word counts will be appended + | + |In local mode, should be 'local[n]' with n > 1 + |Both and must be absolute paths + """.stripMargin + ) + System.exit(1) + } + val Array(master, ip, IntParam(port), checkpointDirectory, outputPath) = args + val ssc = StreamingContext.getOrCreate(checkpointDirectory, + () => { + createContext(master, ip, port, outputPath) + }) + ssc.start() + ssc.awaitTermination() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala new file mode 100644 index 0000000000..8001d56c98 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import org.apache.spark.streaming._ +import org.apache.spark.streaming.StreamingContext._ +// scalastyle:off +/** + * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every + * second. + * Usage: StatefulNetworkWordCount + * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. + * and describe the TCP server that Spark Streaming would connect to receive + * data. + * + * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ ./bin/run-example org.apache.spark.examples.streaming.StatefulNetworkWordCount local[2] localhost 9999` + */ +// scalastyle:on +object StatefulNetworkWordCount { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: StatefulNetworkWordCount \n" + + "In local mode, should be 'local[n]' with n > 1") + System.exit(1) + } + + StreamingExamples.setStreamingLogLevels() + + val updateFunc = (values: Seq[Int], state: Option[Int]) => { + val currentCount = values.foldLeft(0)(_ + _) + + val previousCount = state.getOrElse(0) + + Some(currentCount + previousCount) + } + + // Create the context with a 1 second batch size + val ssc = new StreamingContext(args(0), "NetworkWordCumulativeCountUpdateStateByKey", + Seconds(1), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + ssc.checkpoint(".") + + // Create a NetworkInputDStream on target ip:port and count the + // words in input stream of \n delimited test (eg. generated by 'nc') + val lines = ssc.socketTextStream(args(1), args(2).toInt) + val words = lines.flatMap(_.split(" ")) + val wordDstream = words.map(x => (x, 1)) + + // Update the cumulative count using updateStateByKey + // This will give a Dstream made of state (which is the cumulative count of the words) + val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) + stateDstream.print() + ssc.start() + ssc.awaitTermination() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/StreamingExamples.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/StreamingExamples.scala new file mode 100644 index 0000000000..8396e65d0d --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/StreamingExamples.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import org.apache.spark.Logging + +import org.apache.log4j.{Level, Logger} + +/** Utility functions for Spark Streaming examples. */ +object StreamingExamples extends Logging { + + /** Set reasonable logging levels for streaming if the user has not configured log4j. */ + def setStreamingLogLevels() { + val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements + if (!log4jInitialized) { + // We first log something to initialize Spark's default logging, then we override the + // logging level. + logInfo("Setting log level to [WARN] for streaming example." + + " To override add a custom log4j.properties to the classpath.") + Logger.getRootLogger.setLevel(Level.WARN) + } + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala new file mode 100644 index 0000000000..b12617d881 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import com.twitter.algebird._ + +import org.apache.spark.SparkContext._ +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.twitter._ +// scalastyle:off +/** + * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute + * windowed and global Top-K estimates of user IDs occurring in a Twitter stream. + *
+ * Note that since Algebird's implementation currently only supports Long inputs, + * the example operates on Long IDs. Once the implementation supports other inputs (such as String), + * the same approach could be used for computing popular topics for example. + *

+ *

+ * + * This blog post has a good overview of the Count-Min Sketch (CMS). The CMS is a data + * structure for approximate frequency estimation in data streams (e.g. Top-K elements, frequency + * of any given element, etc), that uses space sub-linear in the number of elements in the + * stream. Once elements are added to the CMS, the estimated count of an element can be computed, + * as well as "heavy-hitters" that occur more than a threshold percentage of the overall total + * count. + *

+ * Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the + * reduce operation. + */ +// scalastyle:on +object TwitterAlgebirdCMS { + def main(args: Array[String]) { + if (args.length < 1) { + System.err.println("Usage: TwitterAlgebirdCMS " + + " [filter1] [filter2] ... [filter n]") + System.exit(1) + } + + StreamingExamples.setStreamingLogLevels() + + // CMS parameters + val DELTA = 1E-3 + val EPS = 0.01 + val SEED = 1 + val PERC = 0.001 + // K highest frequency elements to take + val TOPK = 10 + + val (master, filters) = (args.head, args.tail) + + val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10), + System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER_2) + + val users = stream.map(status => status.getUser.getId) + + val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC) + var globalCMS = cms.zero + val mm = new MapMonoid[Long, Int]() + var globalExact = Map[Long, Int]() + + val approxTopUsers = users.mapPartitions(ids => { + ids.map(id => cms.create(id)) + }).reduce(_ ++ _) + + val exactTopUsers = users.map(id => (id, 1)) + .reduceByKey((a, b) => a + b) + + approxTopUsers.foreachRDD(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + val partialTopK = partial.heavyHitters.map(id => + (id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK) + globalCMS ++= partial + val globalTopK = globalCMS.heavyHitters.map(id => + (id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK) + println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC, + partialTopK.mkString("[", ",", "]"))) + println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC, + globalTopK.mkString("[", ",", "]"))) + } + }) + + exactTopUsers.foreachRDD(rdd => { + if (rdd.count() != 0) { + val partialMap = rdd.collect().toMap + val partialTopK = rdd.map( + {case (id, count) => (count, id)}) + .sortByKey(ascending = false).take(TOPK) + globalExact = mm.plus(globalExact.toMap, partialMap) + val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK) + println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]"))) + println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]"))) + } + }) + + ssc.start() + ssc.awaitTermination() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala new file mode 100644 index 0000000000..22f232c725 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdHLL.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import com.twitter.algebird.HyperLogLogMonoid +import com.twitter.algebird.HyperLogLog._ + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.twitter._ +// scalastyle:off +/** + * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute + * a windowed and global estimate of the unique user IDs occurring in a Twitter stream. + *

+ *

+ * This + * blog post and this + * + * blog post + * have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for + * estimating the cardinality of a data stream, i.e. the number of unique elements. + *

+ * Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the + * reduce operation. + */ +// scalastyle:on +object TwitterAlgebirdHLL { + def main(args: Array[String]) { + if (args.length < 1) { + System.err.println("Usage: TwitterAlgebirdHLL " + + " [filter1] [filter2] ... [filter n]") + System.exit(1) + } + + StreamingExamples.setStreamingLogLevels() + + /** Bit size parameter for HyperLogLog, trades off accuracy vs size */ + val BIT_SIZE = 12 + val (master, filters) = (args.head, args.tail) + + val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5), + System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER) + + val users = stream.map(status => status.getUser.getId) + + val hll = new HyperLogLogMonoid(BIT_SIZE) + var globalHll = hll.zero + var userSet: Set[Long] = Set() + + val approxUsers = users.mapPartitions(ids => { + ids.map(id => hll(id)) + }).reduce(_ + _) + + val exactUsers = users.map(id => Set(id)).reduce(_ ++ _) + + approxUsers.foreachRDD(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + globalHll += partial + println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt)) + println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt)) + } + }) + + exactUsers.foreachRDD(rdd => { + if (rdd.count() != 0) { + val partial = rdd.first() + userSet ++= partial + println("Exact distinct users this batch: %d".format(partial.size)) + println("Exact distinct users overall: %d".format(userSet.size)) + println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1 + ) * 100)) + } + }) + + ssc.start() + ssc.awaitTermination() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala new file mode 100644 index 0000000000..5b58e94600 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import org.apache.spark.streaming.{Seconds, StreamingContext} +import StreamingContext._ +import org.apache.spark.SparkContext._ +import org.apache.spark.streaming.twitter._ + +/** + * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter + * stream. The stream is instantiated with credentials and optionally filters supplied by the + * command line arguments. + * + */ +object TwitterPopularTags { + def main(args: Array[String]) { + if (args.length < 1) { + System.err.println("Usage: TwitterPopularTags " + + " [filter1] [filter2] ... [filter n]") + System.exit(1) + } + + StreamingExamples.setStreamingLogLevels() + + val (master, filters) = (args.head, args.tail) + + val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2), + System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + val stream = TwitterUtils.createStream(ssc, None, filters) + + val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) + + val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)) + .map{case (topic, count) => (count, topic)} + .transform(_.sortByKey(false)) + + val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10)) + .map{case (topic, count) => (count, topic)} + .transform(_.sortByKey(false)) + + + // Print popular hashtags + topCounts60.foreachRDD(rdd => { + val topList = rdd.take(5) + println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) + topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} + }) + + topCounts10.foreachRDD(rdd => { + val topList = rdd.take(5) + println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count())) + topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} + }) + + ssc.start() + ssc.awaitTermination() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala new file mode 100644 index 0000000000..de46e5f5b1 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import akka.actor.ActorSystem +import akka.actor.actorRef2Scala +import akka.zeromq._ +import akka.zeromq.Subscribe +import akka.util.ByteString + +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.zeromq._ + +import scala.language.implicitConversions + +/** + * A simple publisher for demonstration purposes, repeatedly publishes random Messages + * every one second. + */ +object SimpleZeroMQPublisher { + + def main(args: Array[String]) = { + if (args.length < 2) { + System.err.println("Usage: SimpleZeroMQPublisher ") + System.exit(1) + } + + val Seq(url, topic) = args.toSeq + val acs: ActorSystem = ActorSystem() + + val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url)) + implicit def stringToByteString(x: String) = ByteString(x) + val messages: List[ByteString] = List("words ", "may ", "count ") + while (true) { + Thread.sleep(1000) + pubSocket ! ZMQMessage(ByteString(topic) :: messages) + } + acs.awaitTermination() + } +} + +// scalastyle:off +/** + * A sample wordcount with ZeroMQStream stream + * + * To work with zeroMQ, some native libraries have to be installed. + * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide] + * (http://www.zeromq.org/intro:get-the-software) + * + * Usage: ZeroMQWordCount + * In local mode, should be 'local[n]' with n > 1 + * and describe where zeroMq publisher is running. + * + * To run this example locally, you may run publisher as + * `$ ./bin/run-example org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar` + * and run the example as + * `$ ./bin/run-example org.apache.spark.examples.streaming.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo` + */ +// scalastyle:on +object ZeroMQWordCount { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println( + "Usage: ZeroMQWordCount " + + "In local mode, should be 'local[n]' with n > 1") + System.exit(1) + } + StreamingExamples.setStreamingLogLevels() + val Seq(master, url, topic) = args.toSeq + + // Create the context and set the batch size + val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2), + System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + + def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator + + // For this stream, a zeroMQ publisher should be running. + val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.print() + ssc.start() + ssc.awaitTermination() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala new file mode 100644 index 0000000000..97e0cb9207 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming.clickstream + +import java.net.ServerSocket +import java.io.PrintWriter +import util.Random + +/** Represents a page view on a website with associated dimension data. */ +class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int) + extends Serializable { + override def toString() : String = { + "%s\t%s\t%s\t%s\n".format(url, status, zipCode, userID) + } +} + +object PageView extends Serializable { + def fromString(in : String) : PageView = { + val parts = in.split("\t") + new PageView(parts(0), parts(1).toInt, parts(2).toInt, parts(3).toInt) + } +} + +// scalastyle:off +/** Generates streaming events to simulate page views on a website. + * + * This should be used in tandem with PageViewStream.scala. Example: + * $ ./bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10 + * $ ./bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444 + * + * When running this, you may want to set the root logging level to ERROR in + * conf/log4j.properties to reduce the verbosity of the output. + */ +// scalastyle:on +object PageViewGenerator { + val pages = Map("http://foo.com/" -> .7, + "http://foo.com/news" -> 0.2, + "http://foo.com/contact" -> .1) + val httpStatus = Map(200 -> .95, + 404 -> .05) + val userZipCode = Map(94709 -> .5, + 94117 -> .5) + val userID = Map((1 to 100).map(_ -> .01):_*) + + + def pickFromDistribution[T](inputMap : Map[T, Double]) : T = { + val rand = new Random().nextDouble() + var total = 0.0 + for ((item, prob) <- inputMap) { + total = total + prob + if (total > rand) { + return item + } + } + inputMap.take(1).head._1 // Shouldn't get here if probabilities add up to 1.0 + } + + def getNextClickEvent() : String = { + val id = pickFromDistribution(userID) + val page = pickFromDistribution(pages) + val status = pickFromDistribution(httpStatus) + val zipCode = pickFromDistribution(userZipCode) + new PageView(page, status, zipCode, id).toString() + } + + def main(args : Array[String]) { + if (args.length != 2) { + System.err.println("Usage: PageViewGenerator ") + System.exit(1) + } + val port = args(0).toInt + val viewsPerSecond = args(1).toFloat + val sleepDelayMs = (1000.0 / viewsPerSecond).toInt + val listener = new ServerSocket(port) + println("Listening on port: " + port) + + while (true) { + val socket = listener.accept() + new Thread() { + override def run = { + println("Got client connected from: " + socket.getInetAddress) + val out = new PrintWriter(socket.getOutputStream(), true) + + while (true) { + Thread.sleep(sleepDelayMs) + out.write(getNextClickEvent()) + out.flush() + } + socket.close() + } + }.start() + } + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala new file mode 100644 index 0000000000..d30ceffbe2 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming.clickstream + +import org.apache.spark.SparkContext._ +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.examples.streaming.StreamingExamples +// scalastyle:off +/** Analyses a streaming dataset of web page views. This class demonstrates several types of + * operators available in Spark streaming. + * + * This should be used in tandem with PageViewStream.scala. Example: + * $ ./bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10 + * $ ./bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444 + */ +// scalastyle:on +object PageViewStream { + def main(args: Array[String]) { + if (args.length != 3) { + System.err.println("Usage: PageViewStream ") + System.err.println(" must be one of pageCounts, slidingPageCounts," + + " errorRatePerZipCode, activeUserCount, popularUsersSeen") + System.exit(1) + } + StreamingExamples.setStreamingLogLevels() + val metric = args(0) + val host = args(1) + val port = args(2).toInt + + // Create the context + val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1), + System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + + // Create a NetworkInputDStream on target host:port and convert each line to a PageView + val pageViews = ssc.socketTextStream(host, port) + .flatMap(_.split("\n")) + .map(PageView.fromString(_)) + + // Return a count of views per URL seen in each batch + val pageCounts = pageViews.map(view => view.url).countByValue() + + // Return a sliding window of page views per URL in the last ten seconds + val slidingPageCounts = pageViews.map(view => view.url) + .countByValueAndWindow(Seconds(10), Seconds(2)) + + + // Return the rate of error pages (a non 200 status) in each zip code over the last 30 seconds + val statusesPerZipCode = pageViews.window(Seconds(30), Seconds(2)) + .map(view => ((view.zipCode, view.status))) + .groupByKey() + val errorRatePerZipCode = statusesPerZipCode.map{ + case(zip, statuses) => + val normalCount = statuses.filter(_ == 200).size + val errorCount = statuses.size - normalCount + val errorRatio = errorCount.toFloat / statuses.size + if (errorRatio > 0.05) { + "%s: **%s**".format(zip, errorRatio) + } else { + "%s: %s".format(zip, errorRatio) + } + } + + // Return the number unique users in last 15 seconds + val activeUserCount = pageViews.window(Seconds(15), Seconds(2)) + .map(view => (view.userID, 1)) + .groupByKey() + .count() + .map("Unique active users: " + _) + + // An external dataset we want to join to this stream + val userList = ssc.sparkContext.parallelize( + Map(1 -> "Patrick Wendell", 2->"Reynold Xin", 3->"Matei Zaharia").toSeq) + + metric match { + case "pageCounts" => pageCounts.print() + case "slidingPageCounts" => slidingPageCounts.print() + case "errorRatePerZipCode" => errorRatePerZipCode.print() + case "activeUserCount" => activeUserCount.print() + case "popularUsersSeen" => + // Look for users in our existing dataset and print it out if we have a match + pageViews.map(view => (view.userID, 1)) + .foreachRDD((rdd, time) => rdd.join(userList) + .map(_._2._2) + .take(10) + .foreach(u => println("Saw user %s at time %s".format(u, time)))) + case _ => println("Invalid metric entered: " + metric) + } + + ssc.start() + } +} diff --git a/examples/src/main/scala/org/apache/spark/sql/examples/HiveFromSpark.scala b/examples/src/main/scala/org/apache/spark/sql/examples/HiveFromSpark.scala deleted file mode 100644 index 62329bde84..0000000000 --- a/examples/src/main/scala/org/apache/spark/sql/examples/HiveFromSpark.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.examples - -import org.apache.spark.SparkContext -import org.apache.spark.sql._ -import org.apache.spark.sql.hive.LocalHiveContext - -object HiveFromSpark { - case class Record(key: Int, value: String) - - def main(args: Array[String]) { - val sc = new SparkContext("local", "HiveFromSpark") - - // A local hive context creates an instance of the Hive Metastore in process, storing the - // the warehouse data in the current directory. This location can be overridden by - // specifying a second parameter to the constructor. - val hiveContext = new LocalHiveContext(sc) - import hiveContext._ - - hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") - hql("LOAD DATA LOCAL INPATH 'src/main/resources/kv1.txt' INTO TABLE src") - - // Queries are expressed in HiveQL - println("Result of 'SELECT *': ") - hql("SELECT * FROM src").collect.foreach(println) - - // Aggregation queries are also supported. - val count = hql("SELECT COUNT(*) FROM src").collect().head.getInt(0) - println(s"COUNT(*): $count") - - // The results of SQL queries are themselves RDDs and support all normal RDD functions. The - // items in the RDD are of type Row, which allows you to access each column by ordinal. - val rddFromSql = hql("SELECT key, value FROM src WHERE key < 10 ORDER BY key") - - println("Result of RDD.map:") - val rddAsStrings = rddFromSql.map { - case Row(key: Int, value: String) => s"Key: $key, Value: $value" - } - - // You can also register RDDs as temporary tables within a HiveContext. - val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))) - rdd.registerAsTable("records") - - // Queries can then join RDD data with data stored in Hive. - println("Result of SELECT *:") - hql("SELECT * FROM records r JOIN src s ON r.key = s.key").collect().foreach(println) - } -} diff --git a/examples/src/main/scala/org/apache/spark/sql/examples/RDDRelation.scala b/examples/src/main/scala/org/apache/spark/sql/examples/RDDRelation.scala deleted file mode 100644 index 8210ad977f..0000000000 --- a/examples/src/main/scala/org/apache/spark/sql/examples/RDDRelation.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.examples - -import org.apache.spark.SparkContext -import org.apache.spark.sql.SQLContext - -// One method for defining the schema of an RDD is to make a case class with the desired column -// names and types. -case class Record(key: Int, value: String) - -object RDDRelation { - def main(args: Array[String]) { - val sc = new SparkContext("local", "RDDRelation") - val sqlContext = new SQLContext(sc) - - // Importing the SQL context gives access to all the SQL functions and implicit conversions. - import sqlContext._ - - val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))) - // Any RDD containing case classes can be registered as a table. The schema of the table is - // automatically inferred using scala reflection. - rdd.registerAsTable("records") - - // Once tables have been registered, you can run SQL queries over them. - println("Result of SELECT *:") - sql("SELECT * FROM records").collect().foreach(println) - - // Aggregation queries are also supported. - val count = sql("SELECT COUNT(*) FROM records").collect().head.getInt(0) - println(s"COUNT(*): $count") - - // The results of SQL queries are themselves RDDs and support all normal RDD functions. The - // items in the RDD are of type Row, which allows you to access each column by ordinal. - val rddFromSql = sql("SELECT key, value FROM records WHERE key < 10") - - println("Result of RDD.map:") - rddFromSql.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect.foreach(println) - - // Queries can also be written using a LINQ-like Scala DSL. - rdd.where('key === 1).orderBy('value.asc).select('key).collect().foreach(println) - - // Write out an RDD as a parquet file. - rdd.saveAsParquetFile("pair.parquet") - - // Read in parquet file. Parquet files are self-describing so the schmema is preserved. - val parquetFile = sqlContext.parquetFile("pair.parquet") - - // Queries can be run using the DSL on parequet files just like the original RDD. - parquetFile.where('key === 1).select('value as 'a).collect().foreach(println) - - // These files can also be registered as tables. - parquetFile.registerAsTable("parquetFile") - sql("SELECT * FROM parquetFile").collect().foreach(println) - } -} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala deleted file mode 100644 index c845dd8904..0000000000 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.examples - -import scala.collection.mutable.LinkedList -import scala.reflect.ClassTag -import scala.util.Random - -import akka.actor.{Actor, ActorRef, Props, actorRef2Scala} - -import org.apache.spark.{SparkConf, SecurityManager} -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions -import org.apache.spark.util.AkkaUtils -import org.apache.spark.streaming.receiver.ActorHelper - -case class SubscribeReceiver(receiverActor: ActorRef) -case class UnsubscribeReceiver(receiverActor: ActorRef) - -/** - * Sends the random content to every receiver subscribed with 1/2 - * second delay. - */ -class FeederActor extends Actor { - - val rand = new Random() - var receivers: LinkedList[ActorRef] = new LinkedList[ActorRef]() - - val strings: Array[String] = Array("words ", "may ", "count ") - - def makeMessage(): String = { - val x = rand.nextInt(3) - strings(x) + strings(2 - x) - } - - /* - * A thread to generate random messages - */ - new Thread() { - override def run() { - while (true) { - Thread.sleep(500) - receivers.foreach(_ ! makeMessage) - } - } - }.start() - - def receive: Receive = { - - case SubscribeReceiver(receiverActor: ActorRef) => - println("received subscribe from %s".format(receiverActor.toString)) - receivers = LinkedList(receiverActor) ++ receivers - - case UnsubscribeReceiver(receiverActor: ActorRef) => - println("received unsubscribe from %s".format(receiverActor.toString)) - receivers = receivers.dropWhile(x => x eq receiverActor) - - } -} - -/** - * A sample actor as receiver, is also simplest. This receiver actor - * goes and subscribe to a typical publisher/feeder actor and receives - * data. - * - * @see [[org.apache.spark.streaming.examples.FeederActor]] - */ -class SampleActorReceiver[T: ClassTag](urlOfPublisher: String) -extends Actor with ActorHelper { - - lazy private val remotePublisher = context.actorSelection(urlOfPublisher) - - override def preStart = remotePublisher ! SubscribeReceiver(context.self) - - def receive = { - case msg => store(msg.asInstanceOf[T]) - } - - override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self) - -} - -/** - * A sample feeder actor - * - * Usage: FeederActor - * and describe the AkkaSystem that Spark Sample feeder would start on. - */ -object FeederActor { - - def main(args: Array[String]) { - if(args.length < 2){ - System.err.println( - "Usage: FeederActor \n" - ) - System.exit(1) - } - val Seq(host, port) = args.toSeq - - val conf = new SparkConf - val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, conf = conf, - securityManager = new SecurityManager(conf))._1 - val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor") - - println("Feeder started as:" + feeder) - - actorSystem.awaitTermination() - } -} - -/** - * A sample word count program demonstrating the use of plugging in - * Actor as Receiver - * Usage: ActorWordCount - * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. - * and describe the AkkaSystem that Spark Sample feeder is running on. - * - * To run this example locally, you may run Feeder Actor as - * `$ ./bin/run-example org.apache.spark.streaming.examples.FeederActor 127.0.1.1 9999` - * and then run the example - * `./bin/run-example org.apache.spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999` - */ -object ActorWordCount { - def main(args: Array[String]) { - if (args.length < 3) { - System.err.println( - "Usage: ActorWordCount " + - "In local mode, should be 'local[n]' with n > 1") - System.exit(1) - } - - StreamingExamples.setStreamingLogLevels() - - val Seq(master, host, port) = args.toSeq - - // Create the context and set the batch size - val ssc = new StreamingContext(master, "ActorWordCount", Seconds(2), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) - - /* - * Following is the use of actorStream to plug in custom actor as receiver - * - * An important point to note: - * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e type of data received and InputDstream - * should be same. - * - * For example: Both actorStream and SampleActorReceiver are parameterized - * to same type to ensure type safety. - */ - - val lines = ssc.actorStream[String]( - Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format( - host, port.toInt))), "SampleReceiver") - - // compute wordcount - lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print() - - ssc.start() - ssc.awaitTermination() - } -} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala deleted file mode 100644 index 26b6024534..0000000000 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.examples - -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming._ -import org.apache.spark.streaming.flume._ -import org.apache.spark.util.IntParam - -/** - * Produces a count of events received from Flume. - * - * This should be used in conjunction with an AvroSink in Flume. It will start - * an Avro server on at the request host:port address and listen for requests. - * Your Flume AvroSink should be pointed to this address. - * - * Usage: FlumeEventCount - * - * is a Spark master URL - * is the host the Flume receiver will be started on - a receiver - * creates a server and listens for flume events. - * is the port the Flume receiver will listen on. - */ -object FlumeEventCount { - def main(args: Array[String]) { - if (args.length != 3) { - System.err.println( - "Usage: FlumeEventCount ") - System.exit(1) - } - - StreamingExamples.setStreamingLogLevels() - - val Array(master, host, IntParam(port)) = args - - val batchInterval = Milliseconds(2000) - // Create the context and set the batch size - val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval, - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) - - // Create a flume stream - val stream = FlumeUtils.createStream(ssc, host,port,StorageLevel.MEMORY_ONLY_SER_2) - - // Print out the count of events received from this server in each batch - stream.count().map(cnt => "Received " + cnt + " flume events." ).print() - - ssc.start() - ssc.awaitTermination() - } -} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala deleted file mode 100644 index 7f86fc792a..0000000000 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.examples - -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.StreamingContext._ - -/** - * Counts words in new text files created in the given directory - * Usage: HdfsWordCount - * is the Spark master URL. - * is the directory that Spark Streaming will use to find and read new text files. - * - * To run this on your local machine on directory `localdir`, run this example - * `$ ./bin/run-example org.apache.spark.streaming.examples.HdfsWordCount local[2] localdir` - * Then create a text file in `localdir` and the words in the file will get counted. - */ -object HdfsWordCount { - def main(args: Array[String]) { - if (args.length < 2) { - System.err.println("Usage: HdfsWordCount ") - System.exit(1) - } - - StreamingExamples.setStreamingLogLevels() - - // Create the context - val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) - - // Create the FileInputDStream on the directory and use the - // stream to count words in new files created - val lines = ssc.textFileStream(args(1)) - val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) - wordCounts.print() - ssc.start() - ssc.awaitTermination() - } -} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala deleted file mode 100644 index 2aa4f1474a..0000000000 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.examples - -import java.util.Properties - -import kafka.producer._ - -import org.apache.spark.streaming._ -import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.kafka._ -import org.apache.spark.streaming.util.RawTextHelper._ - -// scalastyle:off -/** - * Consumes messages from one or more topics in Kafka and does wordcount. - * Usage: KafkaWordCount - * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. - * is a list of one or more zookeeper servers that make quorum - * is the name of kafka consumer group - * is a list of one or more kafka topics to consume from - * is the number of threads the kafka consumer should use - * - * Example: - * `./bin/run-example org.apache.spark.streaming.examples.KafkaWordCount local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1` - */ -// scalastyle:on -object KafkaWordCount { - def main(args: Array[String]) { - if (args.length < 5) { - System.err.println("Usage: KafkaWordCount ") - System.exit(1) - } - - StreamingExamples.setStreamingLogLevels() - - val Array(master, zkQuorum, group, topics, numThreads) = args - - val ssc = new StreamingContext(master, "KafkaWordCount", Seconds(2), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) - ssc.checkpoint("checkpoint") - - val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap - val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) - val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1L)) - .reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2) - wordCounts.print() - - ssc.start() - ssc.awaitTermination() - } -} - -// Produces some random words between 1 and 100. -object KafkaWordCountProducer { - - def main(args: Array[String]) { - if (args.length < 4) { - System.err.println("Usage: KafkaWordCountProducer " + - " ") - System.exit(1) - } - - val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args - - // Zookeper connection properties - val props = new Properties() - props.put("metadata.broker.list", brokers) - props.put("serializer.class", "kafka.serializer.StringEncoder") - - val config = new ProducerConfig(props) - val producer = new Producer[String, String](config) - - // Send some messages - while(true) { - val messages = (1 to messagesPerSec.toInt).map { messageNum => - val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) - .mkString(" ") - - new KeyedMessage[String, String](topic, str) - }.toArray - - producer.send(messages: _*) - Thread.sleep(100) - } - } - -} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala deleted file mode 100644 index 62aef0fb47..0000000000 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.examples - -import org.eclipse.paho.client.mqttv3.{MqttClient, MqttClientPersistence, MqttException, MqttMessage, MqttTopic} -import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence - -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.mqtt._ - -/** - * A simple Mqtt publisher for demonstration purposes, repeatedly publishes - * Space separated String Message "hello mqtt demo for spark streaming" - */ -object MQTTPublisher { - - var client: MqttClient = _ - - def main(args: Array[String]) { - if (args.length < 2) { - System.err.println("Usage: MQTTPublisher ") - System.exit(1) - } - - StreamingExamples.setStreamingLogLevels() - - val Seq(brokerUrl, topic) = args.toSeq - - try { - var peristance:MqttClientPersistence =new MqttDefaultFilePersistence("/tmp") - client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance) - } catch { - case e: MqttException => println("Exception Caught: " + e) - } - - client.connect() - - val msgtopic: MqttTopic = client.getTopic(topic) - val msg: String = "hello mqtt demo for spark streaming" - - while (true) { - val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes("utf-8")) - msgtopic.publish(message) - println("Published data. topic: " + msgtopic.getName() + " Message: " + message) - } - client.disconnect() - } -} - -// scalastyle:off -/** - * A sample wordcount with MqttStream stream - * - * To work with Mqtt, Mqtt Message broker/server required. - * Mosquitto (http://mosquitto.org/) is an open source Mqtt Broker - * In ubuntu mosquitto can be installed using the command `$ sudo apt-get install mosquitto` - * Eclipse paho project provides Java library for Mqtt Client http://www.eclipse.org/paho/ - * Example Java code for Mqtt Publisher and Subscriber can be found here - * https://bitbucket.org/mkjinesh/mqttclient - * Usage: MQTTWordCount - * In local mode, should be 'local[n]' with n > 1 - * and describe where Mqtt publisher is running. - * - * To run this example locally, you may run publisher as - * `$ ./bin/run-example org.apache.spark.streaming.examples.MQTTPublisher tcp://localhost:1883 foo` - * and run the example as - * `$ ./bin/run-example org.apache.spark.streaming.examples.MQTTWordCount local[2] tcp://localhost:1883 foo` - */ -// scalastyle:on -object MQTTWordCount { - - def main(args: Array[String]) { - if (args.length < 3) { - System.err.println( - "Usage: MQTTWordCount " + - " In local mode, should be 'local[n]' with n > 1") - System.exit(1) - } - - val Seq(master, brokerUrl, topic) = args.toSeq - - val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"), - StreamingContext.jarOfClass(this.getClass).toSeq) - val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2) - - val words = lines.flatMap(x => x.toString.split(" ")) - val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) - wordCounts.print() - ssc.start() - ssc.awaitTermination() - } -} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala deleted file mode 100644 index 272ab11212..0000000000 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.examples - -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.storage.StorageLevel - -// scalastyle:off -/** - * Counts words in text encoded with UTF8 received from the network every second. - * - * Usage: NetworkWordCount - * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. - * and describe the TCP server that Spark Streaming would connect to receive data. - * - * To run this on your local machine, you need to first run a Netcat server - * `$ nc -lk 9999` - * and then run the example - * `$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999` - */ -// scalastyle:on -object NetworkWordCount { - def main(args: Array[String]) { - if (args.length < 3) { - System.err.println("Usage: NetworkWordCount \n" + - "In local mode, should be 'local[n]' with n > 1") - System.exit(1) - } - - StreamingExamples.setStreamingLogLevels() - - // Create the context with a 1 second batch size - val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) - - // Create a NetworkInputDStream on target ip:port and count the - // words in input stream of \n delimited text (eg. generated by 'nc') - val lines = ssc.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_ONLY_SER) - val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) - wordCounts.print() - ssc.start() - ssc.awaitTermination() - } -} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala deleted file mode 100644 index ff2a205ec1..0000000000 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.examples - -import scala.collection.mutable.SynchronizedQueue - -import org.apache.spark.rdd.RDD -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.StreamingContext._ - -object QueueStream { - - def main(args: Array[String]) { - if (args.length < 1) { - System.err.println("Usage: QueueStream ") - System.exit(1) - } - - StreamingExamples.setStreamingLogLevels() - - // Create the context - val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) - - // Create the queue through which RDDs can be pushed to - // a QueueInputDStream - val rddQueue = new SynchronizedQueue[RDD[Int]]() - - // Create the QueueInputDStream and use it do some processing - val inputStream = ssc.queueStream(rddQueue) - val mappedStream = inputStream.map(x => (x % 10, 1)) - val reducedStream = mappedStream.reduceByKey(_ + _) - reducedStream.print() - ssc.start() - - // Create and push some RDDs into - for (i <- 1 to 30) { - rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10) - Thread.sleep(1000) - } - ssc.stop() - } -} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala deleted file mode 100644 index d915c0c39b..0000000000 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.examples - -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming._ -import org.apache.spark.streaming.util.RawTextHelper -import org.apache.spark.util.IntParam - -/** - * Receives text from multiple rawNetworkStreams and counts how many '\n' delimited - * lines have the word 'the' in them. This is useful for benchmarking purposes. This - * will only work with spark.streaming.util.RawTextSender running on all worker nodes - * and with Spark using Kryo serialization (set Java property "spark.serializer" to - * "org.apache.spark.serializer.KryoSerializer"). - * Usage: RawNetworkGrep - * is the Spark master URL - * is the number rawNetworkStreams, which should be same as number - * of work nodes in the cluster - * is "localhost". - * is the port on which RawTextSender is running in the worker nodes. - * is the Spark Streaming batch duration in milliseconds. - */ - -object RawNetworkGrep { - def main(args: Array[String]) { - if (args.length != 5) { - System.err.println("Usage: RawNetworkGrep ") - System.exit(1) - } - - StreamingExamples.setStreamingLogLevels() - - val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args - - // Create the context - val ssc = new StreamingContext(master, "RawNetworkGrep", Milliseconds(batchMillis), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) - - // Warm up the JVMs on master and slave for JIT compilation to kick in - RawTextHelper.warmUp(ssc.sparkContext) - - val rawStreams = (1 to numStreams).map(_ => - ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray - val union = ssc.union(rawStreams) - union.filter(_.contains("the")).count().foreachRDD(r => - println("Grep count: " + r.collect().mkString)) - ssc.start() - ssc.awaitTermination() - } -} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala deleted file mode 100644 index 4aacbb1991..0000000000 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.examples - -import org.apache.spark.streaming.{Time, Seconds, StreamingContext} -import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.util.IntParam -import java.io.File -import org.apache.spark.rdd.RDD -import com.google.common.io.Files -import java.nio.charset.Charset - -/** - * Counts words in text encoded with UTF8 received from the network every second. - * - * Usage: NetworkWordCount - * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. - * and describe the TCP server that Spark Streaming would connect to receive - * data. directory to HDFS-compatible file system which checkpoint data - * file to which the word counts will be appended - * - * In local mode, should be 'local[n]' with n > 1 - * and must be absolute paths - * - * - * To run this on your local machine, you need to first run a Netcat server - * - * `$ nc -lk 9999` - * - * and run the example as - * - * `$ ./run-example org.apache.spark.streaming.examples.RecoverableNetworkWordCount \ - * local[2] localhost 9999 ~/checkpoint/ ~/out` - * - * If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create - * a new StreamingContext (will print "Creating new context" to the console). Otherwise, if - * checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from - * the checkpoint data. - * - * To run this example in a local standalone cluster with automatic driver recovery, - * - * `$ ./spark-class org.apache.spark.deploy.Client -s launch \ - * \ - * org.apache.spark.streaming.examples.RecoverableNetworkWordCount \ - * localhost 9999 ~/checkpoint ~/out` - * - * would typically be - * /examples/target/scala-XX/spark-examples....jar - * - * Refer to the online documentation for more details. - */ - -object RecoverableNetworkWordCount { - - def createContext(master: String, ip: String, port: Int, outputPath: String) = { - - // If you do not see this printed, that means the StreamingContext has been loaded - // from the new checkpoint - println("Creating new context") - val outputFile = new File(outputPath) - if (outputFile.exists()) outputFile.delete() - - // Create the context with a 1 second batch size - val ssc = new StreamingContext(master, "RecoverableNetworkWordCount", Seconds(1), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) - - // Create a NetworkInputDStream on target ip:port and count the - // words in input stream of \n delimited text (eg. generated by 'nc') - val lines = ssc.socketTextStream(ip, port) - val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) - wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => { - val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]") - println(counts) - println("Appending to " + outputFile.getAbsolutePath) - Files.append(counts + "\n", outputFile, Charset.defaultCharset()) - }) - ssc - } - - def main(args: Array[String]) { - if (args.length != 5) { - System.err.println("You arguments were " + args.mkString("[", ", ", "]")) - System.err.println( - """ - |Usage: RecoverableNetworkWordCount - | is the Spark master URL. In local mode, should be - | 'local[n]' with n > 1. and describe the TCP server that Spark - | Streaming would connect to receive data. directory to - | HDFS-compatible file system which checkpoint data file to which the - | word counts will be appended - | - |In local mode, should be 'local[n]' with n > 1 - |Both and must be absolute paths - """.stripMargin - ) - System.exit(1) - } - val Array(master, ip, IntParam(port), checkpointDirectory, outputPath) = args - val ssc = StreamingContext.getOrCreate(checkpointDirectory, - () => { - createContext(master, ip, port, outputPath) - }) - ssc.start() - ssc.awaitTermination() - } -} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala deleted file mode 100644 index ef94c9298d..0000000000 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.examples - -import org.apache.spark.streaming._ -import org.apache.spark.streaming.StreamingContext._ -// scalastyle:off -/** - * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every - * second. - * Usage: StatefulNetworkWordCount - * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. - * and describe the TCP server that Spark Streaming would connect to receive - * data. - * - * To run this on your local machine, you need to first run a Netcat server - * `$ nc -lk 9999` - * and then run the example - * `$ ./bin/run-example org.apache.spark.streaming.examples.StatefulNetworkWordCount local[2] localhost 9999` - */ -// scalastyle:on -object StatefulNetworkWordCount { - def main(args: Array[String]) { - if (args.length < 3) { - System.err.println("Usage: StatefulNetworkWordCount \n" + - "In local mode, should be 'local[n]' with n > 1") - System.exit(1) - } - - StreamingExamples.setStreamingLogLevels() - - val updateFunc = (values: Seq[Int], state: Option[Int]) => { - val currentCount = values.foldLeft(0)(_ + _) - - val previousCount = state.getOrElse(0) - - Some(currentCount + previousCount) - } - - // Create the context with a 1 second batch size - val ssc = new StreamingContext(args(0), "NetworkWordCumulativeCountUpdateStateByKey", - Seconds(1), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) - ssc.checkpoint(".") - - // Create a NetworkInputDStream on target ip:port and count the - // words in input stream of \n delimited test (eg. generated by 'nc') - val lines = ssc.socketTextStream(args(1), args(2).toInt) - val words = lines.flatMap(_.split(" ")) - val wordDstream = words.map(x => (x, 1)) - - // Update the cumulative count using updateStateByKey - // This will give a Dstream made of state (which is the cumulative count of the words) - val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) - stateDstream.print() - ssc.start() - ssc.awaitTermination() - } -} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala deleted file mode 100644 index 99f1502046..0000000000 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.examples - -import org.apache.spark.Logging - -import org.apache.log4j.{Level, Logger} - -/** Utility functions for Spark Streaming examples. */ -object StreamingExamples extends Logging { - - /** Set reasonable logging levels for streaming if the user has not configured log4j. */ - def setStreamingLogLevels() { - val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements - if (!log4jInitialized) { - // We first log something to initialize Spark's default logging, then we override the - // logging level. - logInfo("Setting log level to [WARN] for streaming example." + - " To override add a custom log4j.properties to the classpath.") - Logger.getRootLogger.setLevel(Level.WARN) - } - } -} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala deleted file mode 100644 index c38905e8f3..0000000000 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.examples - -import com.twitter.algebird._ - -import org.apache.spark.SparkContext._ -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.twitter._ -// scalastyle:off -/** - * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute - * windowed and global Top-K estimates of user IDs occurring in a Twitter stream. - *
- * Note that since Algebird's implementation currently only supports Long inputs, - * the example operates on Long IDs. Once the implementation supports other inputs (such as String), - * the same approach could be used for computing popular topics for example. - *

- *

- * - * This blog post has a good overview of the Count-Min Sketch (CMS). The CMS is a data - * structure for approximate frequency estimation in data streams (e.g. Top-K elements, frequency - * of any given element, etc), that uses space sub-linear in the number of elements in the - * stream. Once elements are added to the CMS, the estimated count of an element can be computed, - * as well as "heavy-hitters" that occur more than a threshold percentage of the overall total - * count. - *

- * Algebird's implementation is a monoid, so we can succinctly merge two CMS instances in the - * reduce operation. - */ -// scalastyle:on -object TwitterAlgebirdCMS { - def main(args: Array[String]) { - if (args.length < 1) { - System.err.println("Usage: TwitterAlgebirdCMS " + - " [filter1] [filter2] ... [filter n]") - System.exit(1) - } - - StreamingExamples.setStreamingLogLevels() - - // CMS parameters - val DELTA = 1E-3 - val EPS = 0.01 - val SEED = 1 - val PERC = 0.001 - // K highest frequency elements to take - val TOPK = 10 - - val (master, filters) = (args.head, args.tail) - - val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) - val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER_2) - - val users = stream.map(status => status.getUser.getId) - - val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC) - var globalCMS = cms.zero - val mm = new MapMonoid[Long, Int]() - var globalExact = Map[Long, Int]() - - val approxTopUsers = users.mapPartitions(ids => { - ids.map(id => cms.create(id)) - }).reduce(_ ++ _) - - val exactTopUsers = users.map(id => (id, 1)) - .reduceByKey((a, b) => a + b) - - approxTopUsers.foreachRDD(rdd => { - if (rdd.count() != 0) { - val partial = rdd.first() - val partialTopK = partial.heavyHitters.map(id => - (id, partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK) - globalCMS ++= partial - val globalTopK = globalCMS.heavyHitters.map(id => - (id, globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK) - println("Approx heavy hitters at %2.2f%% threshold this batch: %s".format(PERC, - partialTopK.mkString("[", ",", "]"))) - println("Approx heavy hitters at %2.2f%% threshold overall: %s".format(PERC, - globalTopK.mkString("[", ",", "]"))) - } - }) - - exactTopUsers.foreachRDD(rdd => { - if (rdd.count() != 0) { - val partialMap = rdd.collect().toMap - val partialTopK = rdd.map( - {case (id, count) => (count, id)}) - .sortByKey(ascending = false).take(TOPK) - globalExact = mm.plus(globalExact.toMap, partialMap) - val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0, TOPK) - println("Exact heavy hitters this batch: %s".format(partialTopK.mkString("[", ",", "]"))) - println("Exact heavy hitters overall: %s".format(globalTopK.mkString("[", ",", "]"))) - } - }) - - ssc.start() - ssc.awaitTermination() - } -} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala deleted file mode 100644 index c067046f90..0000000000 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.examples - -import com.twitter.algebird.HyperLogLogMonoid -import com.twitter.algebird.HyperLogLog._ - -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.twitter._ -// scalastyle:off -/** - * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute - * a windowed and global estimate of the unique user IDs occurring in a Twitter stream. - *

- *

- * This - * blog post and this - * - * blog post - * have good overviews of HyperLogLog (HLL). HLL is a memory-efficient datastructure for - * estimating the cardinality of a data stream, i.e. the number of unique elements. - *

- * Algebird's implementation is a monoid, so we can succinctly merge two HLL instances in the - * reduce operation. - */ -// scalastyle:on -object TwitterAlgebirdHLL { - def main(args: Array[String]) { - if (args.length < 1) { - System.err.println("Usage: TwitterAlgebirdHLL " + - " [filter1] [filter2] ... [filter n]") - System.exit(1) - } - - StreamingExamples.setStreamingLogLevels() - - /** Bit size parameter for HyperLogLog, trades off accuracy vs size */ - val BIT_SIZE = 12 - val (master, filters) = (args.head, args.tail) - - val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) - val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER) - - val users = stream.map(status => status.getUser.getId) - - val hll = new HyperLogLogMonoid(BIT_SIZE) - var globalHll = hll.zero - var userSet: Set[Long] = Set() - - val approxUsers = users.mapPartitions(ids => { - ids.map(id => hll(id)) - }).reduce(_ + _) - - val exactUsers = users.map(id => Set(id)).reduce(_ ++ _) - - approxUsers.foreachRDD(rdd => { - if (rdd.count() != 0) { - val partial = rdd.first() - globalHll += partial - println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt)) - println("Approx distinct users overall: %d".format(globalHll.estimatedSize.toInt)) - } - }) - - exactUsers.foreachRDD(rdd => { - if (rdd.count() != 0) { - val partial = rdd.first() - userSet ++= partial - println("Exact distinct users this batch: %d".format(partial.size)) - println("Exact distinct users overall: %d".format(userSet.size)) - println("Error rate: %2.5f%%".format(((globalHll.estimatedSize / userSet.size.toDouble) - 1 - ) * 100)) - } - }) - - ssc.start() - ssc.awaitTermination() - } -} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala deleted file mode 100644 index 2597c81788..0000000000 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.examples - -import org.apache.spark.streaming.{Seconds, StreamingContext} -import StreamingContext._ -import org.apache.spark.SparkContext._ -import org.apache.spark.streaming.twitter._ - -/** - * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter - * stream. The stream is instantiated with credentials and optionally filters supplied by the - * command line arguments. - * - */ -object TwitterPopularTags { - def main(args: Array[String]) { - if (args.length < 1) { - System.err.println("Usage: TwitterPopularTags " + - " [filter1] [filter2] ... [filter n]") - System.exit(1) - } - - StreamingExamples.setStreamingLogLevels() - - val (master, filters) = (args.head, args.tail) - - val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) - val stream = TwitterUtils.createStream(ssc, None, filters) - - val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) - - val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)) - .map{case (topic, count) => (count, topic)} - .transform(_.sortByKey(false)) - - val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10)) - .map{case (topic, count) => (count, topic)} - .transform(_.sortByKey(false)) - - - // Print popular hashtags - topCounts60.foreachRDD(rdd => { - val topList = rdd.take(5) - println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) - topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} - }) - - topCounts10.foreachRDD(rdd => { - val topList = rdd.take(5) - println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count())) - topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} - }) - - ssc.start() - ssc.awaitTermination() - } -} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala deleted file mode 100644 index 109ff855b5..0000000000 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.examples - -import akka.actor.ActorSystem -import akka.actor.actorRef2Scala -import akka.zeromq._ -import akka.zeromq.Subscribe -import akka.util.ByteString - -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.zeromq._ - -import scala.language.implicitConversions - -/** - * A simple publisher for demonstration purposes, repeatedly publishes random Messages - * every one second. - */ -object SimpleZeroMQPublisher { - - def main(args: Array[String]) = { - if (args.length < 2) { - System.err.println("Usage: SimpleZeroMQPublisher ") - System.exit(1) - } - - val Seq(url, topic) = args.toSeq - val acs: ActorSystem = ActorSystem() - - val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url)) - implicit def stringToByteString(x: String) = ByteString(x) - val messages: List[ByteString] = List("words ", "may ", "count ") - while (true) { - Thread.sleep(1000) - pubSocket ! ZMQMessage(ByteString(topic) :: messages) - } - acs.awaitTermination() - } -} - -// scalastyle:off -/** - * A sample wordcount with ZeroMQStream stream - * - * To work with zeroMQ, some native libraries have to be installed. - * Install zeroMQ (release 2.1) core libraries. [ZeroMQ Install guide] - * (http://www.zeromq.org/intro:get-the-software) - * - * Usage: ZeroMQWordCount - * In local mode, should be 'local[n]' with n > 1 - * and describe where zeroMq publisher is running. - * - * To run this example locally, you may run publisher as - * `$ ./bin/run-example org.apache.spark.streaming.examples.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar` - * and run the example as - * `$ ./bin/run-example org.apache.spark.streaming.examples.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo` - */ -// scalastyle:on -object ZeroMQWordCount { - def main(args: Array[String]) { - if (args.length < 3) { - System.err.println( - "Usage: ZeroMQWordCount " + - "In local mode, should be 'local[n]' with n > 1") - System.exit(1) - } - StreamingExamples.setStreamingLogLevels() - val Seq(master, url, topic) = args.toSeq - - // Create the context and set the batch size - val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) - - def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator - - // For this stream, a zeroMQ publisher should be running. - val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _) - val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) - wordCounts.print() - ssc.start() - ssc.awaitTermination() - } -} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala deleted file mode 100644 index 251f65fe4d..0000000000 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.examples.clickstream - -import java.net.ServerSocket -import java.io.PrintWriter -import util.Random - -/** Represents a page view on a website with associated dimension data. */ -class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int) - extends Serializable { - override def toString() : String = { - "%s\t%s\t%s\t%s\n".format(url, status, zipCode, userID) - } -} - -object PageView extends Serializable { - def fromString(in : String) : PageView = { - val parts = in.split("\t") - new PageView(parts(0), parts(1).toInt, parts(2).toInt, parts(3).toInt) - } -} - -// scalastyle:off -/** Generates streaming events to simulate page views on a website. - * - * This should be used in tandem with PageViewStream.scala. Example: - * $ ./bin/run-example org.apache.spark.streaming.examples.clickstream.PageViewGenerator 44444 10 - * $ ./bin/run-example org.apache.spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444 - * - * When running this, you may want to set the root logging level to ERROR in - * conf/log4j.properties to reduce the verbosity of the output. - */ -// scalastyle:on -object PageViewGenerator { - val pages = Map("http://foo.com/" -> .7, - "http://foo.com/news" -> 0.2, - "http://foo.com/contact" -> .1) - val httpStatus = Map(200 -> .95, - 404 -> .05) - val userZipCode = Map(94709 -> .5, - 94117 -> .5) - val userID = Map((1 to 100).map(_ -> .01):_*) - - - def pickFromDistribution[T](inputMap : Map[T, Double]) : T = { - val rand = new Random().nextDouble() - var total = 0.0 - for ((item, prob) <- inputMap) { - total = total + prob - if (total > rand) { - return item - } - } - inputMap.take(1).head._1 // Shouldn't get here if probabilities add up to 1.0 - } - - def getNextClickEvent() : String = { - val id = pickFromDistribution(userID) - val page = pickFromDistribution(pages) - val status = pickFromDistribution(httpStatus) - val zipCode = pickFromDistribution(userZipCode) - new PageView(page, status, zipCode, id).toString() - } - - def main(args : Array[String]) { - if (args.length != 2) { - System.err.println("Usage: PageViewGenerator ") - System.exit(1) - } - val port = args(0).toInt - val viewsPerSecond = args(1).toFloat - val sleepDelayMs = (1000.0 / viewsPerSecond).toInt - val listener = new ServerSocket(port) - println("Listening on port: " + port) - - while (true) { - val socket = listener.accept() - new Thread() { - override def run = { - println("Got client connected from: " + socket.getInetAddress) - val out = new PrintWriter(socket.getOutputStream(), true) - - while (true) { - Thread.sleep(sleepDelayMs) - out.write(getNextClickEvent()) - out.flush() - } - socket.close() - } - }.start() - } - } -} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala deleted file mode 100644 index 673013f7cf..0000000000 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.examples.clickstream - -import org.apache.spark.SparkContext._ -import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.examples.StreamingExamples -// scalastyle:off -/** Analyses a streaming dataset of web page views. This class demonstrates several types of - * operators available in Spark streaming. - * - * This should be used in tandem with PageViewStream.scala. Example: - * $ ./bin/run-example org.apache.spark.streaming.examples.clickstream.PageViewGenerator 44444 10 - * $ ./bin/run-example org.apache.spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444 - */ -// scalastyle:on -object PageViewStream { - def main(args: Array[String]) { - if (args.length != 3) { - System.err.println("Usage: PageViewStream ") - System.err.println(" must be one of pageCounts, slidingPageCounts," + - " errorRatePerZipCode, activeUserCount, popularUsersSeen") - System.exit(1) - } - StreamingExamples.setStreamingLogLevels() - val metric = args(0) - val host = args(1) - val port = args(2).toInt - - // Create the context - val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) - - // Create a NetworkInputDStream on target host:port and convert each line to a PageView - val pageViews = ssc.socketTextStream(host, port) - .flatMap(_.split("\n")) - .map(PageView.fromString(_)) - - // Return a count of views per URL seen in each batch - val pageCounts = pageViews.map(view => view.url).countByValue() - - // Return a sliding window of page views per URL in the last ten seconds - val slidingPageCounts = pageViews.map(view => view.url) - .countByValueAndWindow(Seconds(10), Seconds(2)) - - - // Return the rate of error pages (a non 200 status) in each zip code over the last 30 seconds - val statusesPerZipCode = pageViews.window(Seconds(30), Seconds(2)) - .map(view => ((view.zipCode, view.status))) - .groupByKey() - val errorRatePerZipCode = statusesPerZipCode.map{ - case(zip, statuses) => - val normalCount = statuses.filter(_ == 200).size - val errorCount = statuses.size - normalCount - val errorRatio = errorCount.toFloat / statuses.size - if (errorRatio > 0.05) { - "%s: **%s**".format(zip, errorRatio) - } else { - "%s: %s".format(zip, errorRatio) - } - } - - // Return the number unique users in last 15 seconds - val activeUserCount = pageViews.window(Seconds(15), Seconds(2)) - .map(view => (view.userID, 1)) - .groupByKey() - .count() - .map("Unique active users: " + _) - - // An external dataset we want to join to this stream - val userList = ssc.sparkContext.parallelize( - Map(1 -> "Patrick Wendell", 2->"Reynold Xin", 3->"Matei Zaharia").toSeq) - - metric match { - case "pageCounts" => pageCounts.print() - case "slidingPageCounts" => slidingPageCounts.print() - case "errorRatePerZipCode" => errorRatePerZipCode.print() - case "activeUserCount" => activeUserCount.print() - case "popularUsersSeen" => - // Look for users in our existing dataset and print it out if we have a match - pageViews.map(view => (view.userID, 1)) - .foreachRDD((rdd, time) => rdd.join(userList) - .map(_._2._2) - .take(10) - .foreach(u => println("Saw user %s at time %s".format(u, time)))) - case _ => println("Invalid metric entered: " + metric) - } - - ssc.start() - } -} -- cgit v1.2.3