From 68f28dabe9c7679be82e684385be216319beb610 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 14 May 2014 04:17:32 -0700 Subject: Fixed streaming examples docs to use run-example instead of spark-submit Pretty self-explanatory Author: Tathagata Das Closes #722 from tdas/example-fix and squashes the following commits: 7839979 [Tathagata Das] Minor changes. 0673441 [Tathagata Das] Fixed java docs of java streaming example e687123 [Tathagata Das] Fixed scala style errors. 9b8d112 [Tathagata Das] Fixed streaming examples docs to use run-example instead of spark-submit. --- .../spark/examples/streaming/ActorWordCount.scala | 6 ++---- .../spark/examples/streaming/CustomReceiver.scala | 19 +++++++------------ .../spark/examples/streaming/FlumeEventCount.scala | 9 ++++++--- .../spark/examples/streaming/HdfsWordCount.scala | 5 +++-- .../spark/examples/streaming/KafkaWordCount.scala | 6 +++--- .../spark/examples/streaming/MQTTWordCount.scala | 10 +++++----- .../examples/streaming/NetworkWordCount.scala | 14 ++++++++------ .../streaming/RecoverableNetworkWordCount.scala | 7 +++---- .../streaming/StatefulNetworkWordCount.scala | 6 +++--- .../examples/streaming/TwitterPopularTags.scala | 22 +++++++++++++++++++--- .../spark/examples/streaming/ZeroMQWordCount.scala | 8 ++++---- .../streaming/clickstream/PageViewGenerator.scala | 10 ++++++---- .../streaming/clickstream/PageViewStream.scala | 7 +++++-- 13 files changed, 74 insertions(+), 55 deletions(-) (limited to 'examples/src/main/scala') diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala index e29e16a9c1..b433082dce 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala @@ -130,11 +130,9 @@ object FeederActor { * and describe the AkkaSystem that Spark Sample feeder is running on. * * To run this example locally, you may run Feeder Actor as - * `./bin/spark-submit examples.jar \ - * --class org.apache.spark.examples.streaming.FeederActor 127.0.1.1 9999` + * `$ bin/run-example org.apache.spark.examples.streaming.FeederActor 127.0.1.1 9999` * and then run the example - * `./bin/spark-submit examples.jar --class org.apache.spark.examples.streaming.ActorWordCount \ - * 127.0.1.1 9999` + * `$ bin/run-example org.apache.spark.examples.streaming.ActorWordCount 127.0.1.1 9999` */ object ActorWordCount { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala index e317e2d36a..6bb659fbd8 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala @@ -20,7 +20,7 @@ package org.apache.spark.examples.streaming import java.io.{InputStreamReader, BufferedReader, InputStream} import java.net.Socket -import org.apache.spark.Logging +import org.apache.spark.{SparkConf, Logging} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ @@ -30,32 +30,27 @@ import org.apache.spark.streaming.receiver.Receiver * Custom Receiver that receives data over a socket. Received bytes is interpreted as * text and \n delimited lines are considered as records. They are then counted and printed. * - * Usage: CustomReceiver - * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. - * and of the TCP server that Spark Streaming would connect to receive data. - * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example - * `$ ./run org.apache.spark.examples.streaming.CustomReceiver local[2] localhost 9999` + * `$ bin/run-example org.apache.spark.examples.streaming.CustomReceiver localhost 9999` */ object CustomReceiver { def main(args: Array[String]) { - if (args.length < 3) { - System.err.println("Usage: NetworkWordCount \n" + - "In local mode, should be 'local[n]' with n > 1") + if (args.length < 2) { + System.err.println("Usage: CustomReceiver ") System.exit(1) } StreamingExamples.setStreamingLogLevels() // Create the context with a 1 second batch size - val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + val sparkConf = new SparkConf().setAppName("CustomReceiver") + val ssc = new StreamingContext(sparkConf, Seconds(1)) // Create a input stream with the custom receiver on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') - val lines = ssc.receiverStream(new CustomReceiver(args(1), args(2).toInt)) + val lines = ssc.receiverStream(new CustomReceiver(args(0), args(1).toInt)) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala index 38362edac2..20e7df7c45 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala @@ -31,14 +31,16 @@ import org.apache.spark.util.IntParam * Your Flume AvroSink should be pointed to this address. * * Usage: FlumeEventCount - * * is the host the Flume receiver will be started on - a receiver * creates a server and listens for flume events. * is the port the Flume receiver will listen on. + * + * To run this example: + * `$ bin/run-example org.apache.spark.examples.streaming.FlumeEventCount ` */ object FlumeEventCount { def main(args: Array[String]) { - if (args.length != 3) { + if (args.length < 2) { System.err.println( "Usage: FlumeEventCount ") System.exit(1) @@ -49,8 +51,9 @@ object FlumeEventCount { val Array(host, IntParam(port)) = args val batchInterval = Milliseconds(2000) - val sparkConf = new SparkConf().setAppName("FlumeEventCount") + // Create the context and set the batch size + val sparkConf = new SparkConf().setAppName("FlumeEventCount") val ssc = new StreamingContext(sparkConf, batchInterval) // Create a flume stream diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala index 55ac48cfb6..6c24bc3ad0 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala @@ -27,8 +27,9 @@ import org.apache.spark.streaming.StreamingContext._ * is the directory that Spark Streaming will use to find and read new text files. * * To run this on your local machine on directory `localdir`, run this example - * `$ ./bin/spark-submit examples.jar \ - * --class org.apache.spark.examples.streaming.HdfsWordCount localdir` + * $ bin/run-example \ + * org.apache.spark.examples.streaming.HdfsWordCount localdir + * * Then create a text file in `localdir` and the words in the file will get counted. */ object HdfsWordCount { diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala index 3af806981f..566ba6f911 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala @@ -35,9 +35,9 @@ import org.apache.spark.SparkConf * is the number of threads the kafka consumer should use * * Example: - * `./bin/spark-submit examples.jar \ - * --class org.apache.spark.examples.streaming.KafkaWordCount local[2] zoo01,zoo02,zoo03 \ - * my-consumer-group topic1,topic2 1` + * `$ bin/run-example \ + * org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \ + * my-consumer-group topic1,topic2 1` */ object KafkaWordCount { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala index 3a10daa9ab..e4283e04a1 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala @@ -75,14 +75,14 @@ object MQTTPublisher { * Example Java code for Mqtt Publisher and Subscriber can be found here * https://bitbucket.org/mkjinesh/mqttclient * Usage: MQTTWordCount -\ * and describe where Mqtt publisher is running. + * and describe where Mqtt publisher is running. * * To run this example locally, you may run publisher as - * `$ ./bin/spark-submit examples.jar \ - * --class org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo` + * `$ bin/run-example \ + * org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo` * and run the example as - * `$ ./bin/spark-submit examples.jar \ - * --class org.apache.spark.examples.streaming.MQTTWordCount tcp://localhost:1883 foo` + * `$ bin/run-example \ + * org.apache.spark.examples.streaming.MQTTWordCount tcp://localhost:1883 foo` */ object MQTTWordCount { diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala index ad7a199b2c..ae0a08c6cd 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala @@ -23,7 +23,7 @@ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel /** - * Counts words in text encoded with UTF8 received from the network every second. + * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. * * Usage: NetworkWordCount * and describe the TCP server that Spark Streaming would connect to receive data. @@ -31,8 +31,7 @@ import org.apache.spark.storage.StorageLevel * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example - * `$ ./bin/spark-submit examples.jar \ - * --class org.apache.spark.examples.streaming.NetworkWordCount localhost 9999` + * `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999` */ object NetworkWordCount { def main(args: Array[String]) { @@ -42,13 +41,16 @@ object NetworkWordCount { } StreamingExamples.setStreamingLogLevels() - val sparkConf = new SparkConf().setAppName("NetworkWordCount"); + // Create the context with a 1 second batch size + val sparkConf = new SparkConf().setAppName("NetworkWordCount") val ssc = new StreamingContext(sparkConf, Seconds(1)) - // Create a NetworkInputDStream on target ip:port and count the + // Create a socket stream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') - val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER) + // Note that no duplication in storage level only for running locally. + // Replication necessary in distributed scenario for fault tolerance. + val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala index ace785d9fe..6af3a0f33e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala @@ -46,8 +46,7 @@ import org.apache.spark.util.IntParam * * and run the example as * - * `$ ./bin/spark-submit examples.jar \ - * --class org.apache.spark.examples.streaming.RecoverableNetworkWordCount \ + * `$ ./bin/run-example org.apache.spark.examples.streaming.RecoverableNetworkWordCount \ * localhost 9999 ~/checkpoint/ ~/out` * * If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create @@ -57,7 +56,7 @@ import org.apache.spark.util.IntParam * * To run this example in a local standalone cluster with automatic driver recovery, * - * `$ ./spark-class org.apache.spark.deploy.Client -s launch \ + * `$ bin/spark-class org.apache.spark.deploy.Client -s launch \ * \ * org.apache.spark.examples.streaming.RecoverableNetworkWordCount \ * localhost 9999 ~/checkpoint ~/out` @@ -81,7 +80,7 @@ object RecoverableNetworkWordCount { // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Seconds(1)) - // Create a NetworkInputDStream on target ip:port and count the + // Create a socket stream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') val lines = ssc.socketTextStream(ip, port) val words = lines.flatMap(_.split(" ")) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala index 5e1415f3cc..daa1ced63c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala @@ -31,8 +31,8 @@ import org.apache.spark.streaming.StreamingContext._ * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example - * `$ ./bin/spark-submit examples.jar - * --class org.apache.spark.examples.streaming.StatefulNetworkWordCount localhost 9999` + * `$ bin/run-example + * org.apache.spark.examples.streaming.StatefulNetworkWordCount localhost 9999` */ object StatefulNetworkWordCount { def main(args: Array[String]) { @@ -51,7 +51,7 @@ object StatefulNetworkWordCount { Some(currentCount + previousCount) } - val sparkConf = new SparkConf().setAppName("NetworkWordCumulativeCountUpdateStateByKey") + val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount") // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Seconds(1)) ssc.checkpoint(".") diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala index 1ddff22cb8..f55d23ab39 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala @@ -28,13 +28,29 @@ import org.apache.spark.SparkConf * stream. The stream is instantiated with credentials and optionally filters supplied by the * command line arguments. * + * Run this on your local machine as + * */ object TwitterPopularTags { def main(args: Array[String]) { + if (args.length < 4) { + System.err.println("Usage: TwitterPopularTags " + + " []") + System.exit(1) + } StreamingExamples.setStreamingLogLevels() - val filters = args + val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4) + val filters = args.takeRight(args.length - 4) + + // Set the system properties so that Twitter4j library used by twitter stream + // can use them to generat OAuth credentials + System.setProperty("twitter4j.oauth.consumerKey", consumerKey) + System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret) + System.setProperty("twitter4j.oauth.accessToken", accessToken) + System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret) + val sparkConf = new SparkConf().setAppName("TwitterPopularTags") val ssc = new StreamingContext(sparkConf, Seconds(2)) val stream = TwitterUtils.createStream(ssc, None, filters) @@ -52,13 +68,13 @@ object TwitterPopularTags { // Print popular hashtags topCounts60.foreachRDD(rdd => { - val topList = rdd.take(5) + val topList = rdd.take(10) println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} }) topCounts10.foreachRDD(rdd => { - val topList = rdd.take(5) + val topList = rdd.take(10) println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count())) topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} }) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala index 7ade3f1018..79905af381 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala @@ -68,11 +68,11 @@ object SimpleZeroMQPublisher { * and describe where zeroMq publisher is running. * * To run this example locally, you may run publisher as - * `$ ./bin/spark-submit examples.jar \ - * --class org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar` + * `$ bin/run-example \ + * org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar` * and run the example as - * `$ ./bin/spark-submit examples.jar \ - * --class org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.1.1:1234 foo` + * `$ bin/run-example \ + * org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.1.1:1234 foo` */ // scalastyle:on object ZeroMQWordCount { diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala index 97e0cb9207..8402491b62 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala @@ -40,11 +40,13 @@ object PageView extends Serializable { /** Generates streaming events to simulate page views on a website. * * This should be used in tandem with PageViewStream.scala. Example: - * $ ./bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10 - * $ ./bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444 * - * When running this, you may want to set the root logging level to ERROR in - * conf/log4j.properties to reduce the verbosity of the output. + * To run the generator + * `$ bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10` + * To process the generated stream + * `$ bin/run-example \ + * org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444` + * */ // scalastyle:on object PageViewGenerator { diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala index d30ceffbe2..d9b886eff7 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala @@ -26,8 +26,11 @@ import org.apache.spark.examples.streaming.StreamingExamples * operators available in Spark streaming. * * This should be used in tandem with PageViewStream.scala. Example: - * $ ./bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10 - * $ ./bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444 + * To run the generator + * `$ bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10` + * To process the generated stream + * `$ bin/run-example \ + * org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444` */ // scalastyle:on object PageViewStream { -- cgit v1.2.3