diff options
Diffstat (limited to 'examples/src/main')
17 files changed, 95 insertions, 72 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java index 7f558f3ee7..5622df5ce0 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java @@ -19,6 +19,7 @@ package org.apache.spark.examples.streaming; import com.google.common.collect.Lists; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; @@ -48,25 +49,23 @@ import java.util.regex.Pattern; * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example - * `$ ./run org.apache.spark.examples.streaming.JavaCustomReceiver local[2] localhost 9999` + * `$ bin/run-example org.apache.spark.examples.streaming.JavaCustomReceiver localhost 9999` */ public class JavaCustomReceiver extends Receiver<String> { private static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) { - if (args.length < 3) { - System.err.println("Usage: JavaNetworkWordCount <master> <hostname> <port>\n" + - "In local mode, <master> should be 'local[n]' with n > 1"); + if (args.length < 2) { + System.err.println("Usage: JavaNetworkWordCount <hostname> <port>"); System.exit(1); } StreamingExamples.setStreamingLogLevels(); // Create the context with a 1 second batch size - JavaStreamingContext ssc = new JavaStreamingContext(args[0], "JavaNetworkWordCount", - new Duration(1000), System.getenv("SPARK_HOME"), - JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class)); + SparkConf sparkConf = new SparkConf().setAppName("JavaCustomReceiver"); + JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000)); // Create a input stream with the custom receiver on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java index 400b68c221..da56637fe8 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java @@ -33,10 +33,12 @@ import org.apache.spark.streaming.flume.SparkFlumeEvent; * Your Flume AvroSink should be pointed to this address. * * Usage: JavaFlumeEventCount <host> <port> - * * <host> is the host the Flume receiver will be started on - a receiver * creates a server and listens for flume events. * <port> is the port the Flume receiver will listen on. + * + * To run this example: + * `$ bin/run-example org.apache.spark.examples.streaming.JavaFlumeEventCount <host> <port>` */ public final class JavaFlumeEventCount { private JavaFlumeEventCount() { @@ -56,7 +58,7 @@ public final class JavaFlumeEventCount { Duration batchInterval = new Duration(2000); SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval); - JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, "localhost", port); + JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port); flumeStream.count(); diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java index 6a74cc50d1..16ae9a3319 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java @@ -40,15 +40,15 @@ import org.apache.spark.streaming.kafka.KafkaUtils; /** * Consumes messages from one or more topics in Kafka and does wordcount. + * * Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads> * <zkQuorum> is a list of one or more zookeeper servers that make quorum * <group> is the name of kafka consumer group * <topics> is a list of one or more kafka topics to consume from * <numThreads> is the number of threads the kafka consumer should use * - * Example: - * `./bin/spark-submit examples.jar \ - * --class org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \ + * To run this example: + * `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \ * zoo03 my-consumer-group topic1,topic2 1` */ diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java index e5cbd39f43..45bcedebb4 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java @@ -24,7 +24,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.examples.streaming.StreamingExamples; +import org.apache.spark.api.java.StorageLevels; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; @@ -41,8 +41,7 @@ import java.util.regex.Pattern; * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example - * `$ ./bin/spark-submit examples.jar \ - * --class org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999` + * `$ bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999` */ public final class JavaNetworkWordCount { private static final Pattern SPACE = Pattern.compile(" "); @@ -54,13 +53,17 @@ public final class JavaNetworkWordCount { } StreamingExamples.setStreamingLogLevels(); - SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount"); + // Create the context with a 1 second batch size + SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000)); // Create a JavaReceiverInputDStream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') - JavaReceiverInputDStream<String> lines = ssc.socketTextStream(args[0], Integer.parseInt(args[1])); + // Note that no duplication in storage level only for running locally. + // Replication necessary in distributed scenario for fault tolerance. + JavaReceiverInputDStream<String> lines = ssc.socketTextStream( + args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala index e29e16a9c1..b433082dce 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala @@ -130,11 +130,9 @@ object FeederActor { * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on. * * To run this example locally, you may run Feeder Actor as - * `./bin/spark-submit examples.jar \ - * --class org.apache.spark.examples.streaming.FeederActor 127.0.1.1 9999` + * `$ bin/run-example org.apache.spark.examples.streaming.FeederActor 127.0.1.1 9999` * and then run the example - * `./bin/spark-submit examples.jar --class org.apache.spark.examples.streaming.ActorWordCount \ - * 127.0.1.1 9999` + * `$ bin/run-example org.apache.spark.examples.streaming.ActorWordCount 127.0.1.1 9999` */ object ActorWordCount { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala index e317e2d36a..6bb659fbd8 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala @@ -20,7 +20,7 @@ package org.apache.spark.examples.streaming import java.io.{InputStreamReader, BufferedReader, InputStream} import java.net.Socket -import org.apache.spark.Logging +import org.apache.spark.{SparkConf, Logging} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ @@ -30,32 +30,27 @@ import org.apache.spark.streaming.receiver.Receiver * Custom Receiver that receives data over a socket. Received bytes is interpreted as * text and \n delimited lines are considered as records. They are then counted and printed. * - * Usage: CustomReceiver <master> <hostname> <port> - * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. - * <hostname> and <port> of the TCP server that Spark Streaming would connect to receive data. - * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example - * `$ ./run org.apache.spark.examples.streaming.CustomReceiver local[2] localhost 9999` + * `$ bin/run-example org.apache.spark.examples.streaming.CustomReceiver localhost 9999` */ object CustomReceiver { def main(args: Array[String]) { - if (args.length < 3) { - System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" + - "In local mode, <master> should be 'local[n]' with n > 1") + if (args.length < 2) { + System.err.println("Usage: CustomReceiver <hostname> <port>") System.exit(1) } StreamingExamples.setStreamingLogLevels() // Create the context with a 1 second batch size - val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1), - System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq) + val sparkConf = new SparkConf().setAppName("CustomReceiver") + val ssc = new StreamingContext(sparkConf, Seconds(1)) // Create a input stream with the custom receiver on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') - val lines = ssc.receiverStream(new CustomReceiver(args(1), args(2).toInt)) + val lines = ssc.receiverStream(new CustomReceiver(args(0), args(1).toInt)) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala index 38362edac2..20e7df7c45 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala @@ -31,14 +31,16 @@ import org.apache.spark.util.IntParam * Your Flume AvroSink should be pointed to this address. * * Usage: FlumeEventCount <host> <port> - * * <host> is the host the Flume receiver will be started on - a receiver * creates a server and listens for flume events. * <port> is the port the Flume receiver will listen on. + * + * To run this example: + * `$ bin/run-example org.apache.spark.examples.streaming.FlumeEventCount <host> <port> ` */ object FlumeEventCount { def main(args: Array[String]) { - if (args.length != 3) { + if (args.length < 2) { System.err.println( "Usage: FlumeEventCount <host> <port>") System.exit(1) @@ -49,8 +51,9 @@ object FlumeEventCount { val Array(host, IntParam(port)) = args val batchInterval = Milliseconds(2000) - val sparkConf = new SparkConf().setAppName("FlumeEventCount") + // Create the context and set the batch size + val sparkConf = new SparkConf().setAppName("FlumeEventCount") val ssc = new StreamingContext(sparkConf, batchInterval) // Create a flume stream diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala index 55ac48cfb6..6c24bc3ad0 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala @@ -27,8 +27,9 @@ import org.apache.spark.streaming.StreamingContext._ * <directory> is the directory that Spark Streaming will use to find and read new text files. * * To run this on your local machine on directory `localdir`, run this example - * `$ ./bin/spark-submit examples.jar \ - * --class org.apache.spark.examples.streaming.HdfsWordCount localdir` + * $ bin/run-example \ + * org.apache.spark.examples.streaming.HdfsWordCount localdir + * * Then create a text file in `localdir` and the words in the file will get counted. */ object HdfsWordCount { diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala index 3af806981f..566ba6f911 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala @@ -35,9 +35,9 @@ import org.apache.spark.SparkConf * <numThreads> is the number of threads the kafka consumer should use * * Example: - * `./bin/spark-submit examples.jar \ - * --class org.apache.spark.examples.streaming.KafkaWordCount local[2] zoo01,zoo02,zoo03 \ - * my-consumer-group topic1,topic2 1` + * `$ bin/run-example \ + * org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \ + * my-consumer-group topic1,topic2 1` */ object KafkaWordCount { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala index 3a10daa9ab..e4283e04a1 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala @@ -75,14 +75,14 @@ object MQTTPublisher { * Example Java code for Mqtt Publisher and Subscriber can be found here * https://bitbucket.org/mkjinesh/mqttclient * Usage: MQTTWordCount <MqttbrokerUrl> <topic> -\ * <MqttbrokerUrl> and <topic> describe where Mqtt publisher is running. + * <MqttbrokerUrl> and <topic> describe where Mqtt publisher is running. * * To run this example locally, you may run publisher as - * `$ ./bin/spark-submit examples.jar \ - * --class org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo` + * `$ bin/run-example \ + * org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo` * and run the example as - * `$ ./bin/spark-submit examples.jar \ - * --class org.apache.spark.examples.streaming.MQTTWordCount tcp://localhost:1883 foo` + * `$ bin/run-example \ + * org.apache.spark.examples.streaming.MQTTWordCount tcp://localhost:1883 foo` */ object MQTTWordCount { diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala index ad7a199b2c..ae0a08c6cd 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala @@ -23,7 +23,7 @@ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel /** - * Counts words in text encoded with UTF8 received from the network every second. + * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. * * Usage: NetworkWordCount <hostname> <port> * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. @@ -31,8 +31,7 @@ import org.apache.spark.storage.StorageLevel * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example - * `$ ./bin/spark-submit examples.jar \ - * --class org.apache.spark.examples.streaming.NetworkWordCount localhost 9999` + * `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999` */ object NetworkWordCount { def main(args: Array[String]) { @@ -42,13 +41,16 @@ object NetworkWordCount { } StreamingExamples.setStreamingLogLevels() - val sparkConf = new SparkConf().setAppName("NetworkWordCount"); + // Create the context with a 1 second batch size + val sparkConf = new SparkConf().setAppName("NetworkWordCount") val ssc = new StreamingContext(sparkConf, Seconds(1)) - // Create a NetworkInputDStream on target ip:port and count the + // Create a socket stream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') - val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER) + // Note that no duplication in storage level only for running locally. + // Replication necessary in distributed scenario for fault tolerance. + val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala index ace785d9fe..6af3a0f33e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala @@ -46,8 +46,7 @@ import org.apache.spark.util.IntParam * * and run the example as * - * `$ ./bin/spark-submit examples.jar \ - * --class org.apache.spark.examples.streaming.RecoverableNetworkWordCount \ + * `$ ./bin/run-example org.apache.spark.examples.streaming.RecoverableNetworkWordCount \ * localhost 9999 ~/checkpoint/ ~/out` * * If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create @@ -57,7 +56,7 @@ import org.apache.spark.util.IntParam * * To run this example in a local standalone cluster with automatic driver recovery, * - * `$ ./spark-class org.apache.spark.deploy.Client -s launch <cluster-url> \ + * `$ bin/spark-class org.apache.spark.deploy.Client -s launch <cluster-url> \ * <path-to-examples-jar> \ * org.apache.spark.examples.streaming.RecoverableNetworkWordCount <cluster-url> \ * localhost 9999 ~/checkpoint ~/out` @@ -81,7 +80,7 @@ object RecoverableNetworkWordCount { // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Seconds(1)) - // Create a NetworkInputDStream on target ip:port and count the + // Create a socket stream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') val lines = ssc.socketTextStream(ip, port) val words = lines.flatMap(_.split(" ")) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala index 5e1415f3cc..daa1ced63c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala @@ -31,8 +31,8 @@ import org.apache.spark.streaming.StreamingContext._ * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example - * `$ ./bin/spark-submit examples.jar - * --class org.apache.spark.examples.streaming.StatefulNetworkWordCount localhost 9999` + * `$ bin/run-example + * org.apache.spark.examples.streaming.StatefulNetworkWordCount localhost 9999` */ object StatefulNetworkWordCount { def main(args: Array[String]) { @@ -51,7 +51,7 @@ object StatefulNetworkWordCount { Some(currentCount + previousCount) } - val sparkConf = new SparkConf().setAppName("NetworkWordCumulativeCountUpdateStateByKey") + val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount") // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Seconds(1)) ssc.checkpoint(".") diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala index 1ddff22cb8..f55d23ab39 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala @@ -28,13 +28,29 @@ import org.apache.spark.SparkConf * stream. The stream is instantiated with credentials and optionally filters supplied by the * command line arguments. * + * Run this on your local machine as + * */ object TwitterPopularTags { def main(args: Array[String]) { + if (args.length < 4) { + System.err.println("Usage: TwitterPopularTags <consumer key> <consumer secret> " + + "<access token> <access token secret> [<filters>]") + System.exit(1) + } StreamingExamples.setStreamingLogLevels() - val filters = args + val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4) + val filters = args.takeRight(args.length - 4) + + // Set the system properties so that Twitter4j library used by twitter stream + // can use them to generat OAuth credentials + System.setProperty("twitter4j.oauth.consumerKey", consumerKey) + System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret) + System.setProperty("twitter4j.oauth.accessToken", accessToken) + System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret) + val sparkConf = new SparkConf().setAppName("TwitterPopularTags") val ssc = new StreamingContext(sparkConf, Seconds(2)) val stream = TwitterUtils.createStream(ssc, None, filters) @@ -52,13 +68,13 @@ object TwitterPopularTags { // Print popular hashtags topCounts60.foreachRDD(rdd => { - val topList = rdd.take(5) + val topList = rdd.take(10) println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} }) topCounts10.foreachRDD(rdd => { - val topList = rdd.take(5) + val topList = rdd.take(10) println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count())) topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} }) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala index 7ade3f1018..79905af381 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala @@ -68,11 +68,11 @@ object SimpleZeroMQPublisher { * <zeroMQurl> and <topic> describe where zeroMq publisher is running. * * To run this example locally, you may run publisher as - * `$ ./bin/spark-submit examples.jar \ - * --class org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar` + * `$ bin/run-example \ + * org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar` * and run the example as - * `$ ./bin/spark-submit examples.jar \ - * --class org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.1.1:1234 foo` + * `$ bin/run-example \ + * org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.1.1:1234 foo` */ // scalastyle:on object ZeroMQWordCount { diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala index 97e0cb9207..8402491b62 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala @@ -40,11 +40,13 @@ object PageView extends Serializable { /** Generates streaming events to simulate page views on a website. * * This should be used in tandem with PageViewStream.scala. Example: - * $ ./bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10 - * $ ./bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444 * - * When running this, you may want to set the root logging level to ERROR in - * conf/log4j.properties to reduce the verbosity of the output. + * To run the generator + * `$ bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10` + * To process the generated stream + * `$ bin/run-example \ + * org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444` + * */ // scalastyle:on object PageViewGenerator { diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala index d30ceffbe2..d9b886eff7 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala @@ -26,8 +26,11 @@ import org.apache.spark.examples.streaming.StreamingExamples * operators available in Spark streaming. * * This should be used in tandem with PageViewStream.scala. Example: - * $ ./bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10 - * $ ./bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444 + * To run the generator + * `$ bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10` + * To process the generated stream + * `$ bin/run-example \ + * org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444` */ // scalastyle:on object PageViewStream { |