aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-05-14 04:17:32 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-05-14 04:24:48 -0700
commitc7571d8c6ba058b67cca2b910fd0efacc06642cd (patch)
tree1f983cf200e6bef1ac3a3cee7d0f4051bf3a2655 /examples
parent69ec3149fb4d732935748b9afee4f9d8a7b1244e (diff)
downloadspark-c7571d8c6ba058b67cca2b910fd0efacc06642cd.tar.gz
spark-c7571d8c6ba058b67cca2b910fd0efacc06642cd.tar.bz2
spark-c7571d8c6ba058b67cca2b910fd0efacc06642cd.zip
Fixed streaming examples docs to use run-example instead of spark-submit
Pretty self-explanatory Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #722 from tdas/example-fix and squashes the following commits: 7839979 [Tathagata Das] Minor changes. 0673441 [Tathagata Das] Fixed java docs of java streaming example e687123 [Tathagata Das] Fixed scala style errors. 9b8d112 [Tathagata Das] Fixed streaming examples docs to use run-example instead of spark-submit.
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java13
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java6
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java6
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java13
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala6
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala19
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala9
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala5
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala6
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala10
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala14
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala7
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala6
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala22
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala8
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala10
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala7
17 files changed, 95 insertions, 72 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
index 7f558f3ee7..5622df5ce0 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
@@ -19,6 +19,7 @@ package org.apache.spark.examples.streaming;
import com.google.common.collect.Lists;
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
@@ -48,25 +49,23 @@ import java.util.regex.Pattern;
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
- * `$ ./run org.apache.spark.examples.streaming.JavaCustomReceiver local[2] localhost 9999`
+ * `$ bin/run-example org.apache.spark.examples.streaming.JavaCustomReceiver localhost 9999`
*/
public class JavaCustomReceiver extends Receiver<String> {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) {
- if (args.length < 3) {
- System.err.println("Usage: JavaNetworkWordCount <master> <hostname> <port>\n" +
- "In local mode, <master> should be 'local[n]' with n > 1");
+ if (args.length < 2) {
+ System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
System.exit(1);
}
StreamingExamples.setStreamingLogLevels();
// Create the context with a 1 second batch size
- JavaStreamingContext ssc = new JavaStreamingContext(args[0], "JavaNetworkWordCount",
- new Duration(1000), System.getenv("SPARK_HOME"),
- JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class));
+ SparkConf sparkConf = new SparkConf().setAppName("JavaCustomReceiver");
+ JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000));
// Create a input stream with the custom receiver on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
index 400b68c221..da56637fe8 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
@@ -33,10 +33,12 @@ import org.apache.spark.streaming.flume.SparkFlumeEvent;
* Your Flume AvroSink should be pointed to this address.
*
* Usage: JavaFlumeEventCount <host> <port>
- *
* <host> is the host the Flume receiver will be started on - a receiver
* creates a server and listens for flume events.
* <port> is the port the Flume receiver will listen on.
+ *
+ * To run this example:
+ * `$ bin/run-example org.apache.spark.examples.streaming.JavaFlumeEventCount <host> <port>`
*/
public final class JavaFlumeEventCount {
private JavaFlumeEventCount() {
@@ -56,7 +58,7 @@ public final class JavaFlumeEventCount {
Duration batchInterval = new Duration(2000);
SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
- JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, "localhost", port);
+ JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);
flumeStream.count();
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
index 6a74cc50d1..16ae9a3319 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
@@ -40,15 +40,15 @@ import org.apache.spark.streaming.kafka.KafkaUtils;
/**
* Consumes messages from one or more topics in Kafka and does wordcount.
+ *
* Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>
* <zkQuorum> is a list of one or more zookeeper servers that make quorum
* <group> is the name of kafka consumer group
* <topics> is a list of one or more kafka topics to consume from
* <numThreads> is the number of threads the kafka consumer should use
*
- * Example:
- * `./bin/spark-submit examples.jar \
- * --class org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \
+ * To run this example:
+ * `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \
* zoo03 my-consumer-group topic1,topic2 1`
*/
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
index e5cbd39f43..45bcedebb4 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
@@ -24,7 +24,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.examples.streaming.StreamingExamples;
+import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
@@ -41,8 +41,7 @@ import java.util.regex.Pattern;
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
- * `$ ./bin/spark-submit examples.jar \
- * --class org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999`
+ * `$ bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999`
*/
public final class JavaNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
@@ -54,13 +53,17 @@ public final class JavaNetworkWordCount {
}
StreamingExamples.setStreamingLogLevels();
- SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
+
// Create the context with a 1 second batch size
+ SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000));
// Create a JavaReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
- JavaReceiverInputDStream<String> lines = ssc.socketTextStream(args[0], Integer.parseInt(args[1]));
+ // Note that no duplication in storage level only for running locally.
+ // Replication necessary in distributed scenario for fault tolerance.
+ JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
+ args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
index e29e16a9c1..b433082dce 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
@@ -130,11 +130,9 @@ object FeederActor {
* <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
*
* To run this example locally, you may run Feeder Actor as
- * `./bin/spark-submit examples.jar \
- * --class org.apache.spark.examples.streaming.FeederActor 127.0.1.1 9999`
+ * `$ bin/run-example org.apache.spark.examples.streaming.FeederActor 127.0.1.1 9999`
* and then run the example
- * `./bin/spark-submit examples.jar --class org.apache.spark.examples.streaming.ActorWordCount \
- * 127.0.1.1 9999`
+ * `$ bin/run-example org.apache.spark.examples.streaming.ActorWordCount 127.0.1.1 9999`
*/
object ActorWordCount {
def main(args: Array[String]) {
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala
index e317e2d36a..6bb659fbd8 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala
@@ -20,7 +20,7 @@ package org.apache.spark.examples.streaming
import java.io.{InputStreamReader, BufferedReader, InputStream}
import java.net.Socket
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
@@ -30,32 +30,27 @@ import org.apache.spark.streaming.receiver.Receiver
* Custom Receiver that receives data over a socket. Received bytes is interpreted as
* text and \n delimited lines are considered as records. They are then counted and printed.
*
- * Usage: CustomReceiver <master> <hostname> <port>
- * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
- * <hostname> and <port> of the TCP server that Spark Streaming would connect to receive data.
- *
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
- * `$ ./run org.apache.spark.examples.streaming.CustomReceiver local[2] localhost 9999`
+ * `$ bin/run-example org.apache.spark.examples.streaming.CustomReceiver localhost 9999`
*/
object CustomReceiver {
def main(args: Array[String]) {
- if (args.length < 3) {
- System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
- "In local mode, <master> should be 'local[n]' with n > 1")
+ if (args.length < 2) {
+ System.err.println("Usage: CustomReceiver <hostname> <port>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
// Create the context with a 1 second batch size
- val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
- System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
+ val sparkConf = new SparkConf().setAppName("CustomReceiver")
+ val ssc = new StreamingContext(sparkConf, Seconds(1))
// Create a input stream with the custom receiver on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
- val lines = ssc.receiverStream(new CustomReceiver(args(1), args(2).toInt))
+ val lines = ssc.receiverStream(new CustomReceiver(args(0), args(1).toInt))
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
index 38362edac2..20e7df7c45 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
@@ -31,14 +31,16 @@ import org.apache.spark.util.IntParam
* Your Flume AvroSink should be pointed to this address.
*
* Usage: FlumeEventCount <host> <port>
- *
* <host> is the host the Flume receiver will be started on - a receiver
* creates a server and listens for flume events.
* <port> is the port the Flume receiver will listen on.
+ *
+ * To run this example:
+ * `$ bin/run-example org.apache.spark.examples.streaming.FlumeEventCount <host> <port> `
*/
object FlumeEventCount {
def main(args: Array[String]) {
- if (args.length != 3) {
+ if (args.length < 2) {
System.err.println(
"Usage: FlumeEventCount <host> <port>")
System.exit(1)
@@ -49,8 +51,9 @@ object FlumeEventCount {
val Array(host, IntParam(port)) = args
val batchInterval = Milliseconds(2000)
- val sparkConf = new SparkConf().setAppName("FlumeEventCount")
+
// Create the context and set the batch size
+ val sparkConf = new SparkConf().setAppName("FlumeEventCount")
val ssc = new StreamingContext(sparkConf, batchInterval)
// Create a flume stream
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala
index 55ac48cfb6..6c24bc3ad0 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala
@@ -27,8 +27,9 @@ import org.apache.spark.streaming.StreamingContext._
* <directory> is the directory that Spark Streaming will use to find and read new text files.
*
* To run this on your local machine on directory `localdir`, run this example
- * `$ ./bin/spark-submit examples.jar \
- * --class org.apache.spark.examples.streaming.HdfsWordCount localdir`
+ * $ bin/run-example \
+ * org.apache.spark.examples.streaming.HdfsWordCount localdir
+ *
* Then create a text file in `localdir` and the words in the file will get counted.
*/
object HdfsWordCount {
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
index 3af806981f..566ba6f911 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala
@@ -35,9 +35,9 @@ import org.apache.spark.SparkConf
* <numThreads> is the number of threads the kafka consumer should use
*
* Example:
- * `./bin/spark-submit examples.jar \
- * --class org.apache.spark.examples.streaming.KafkaWordCount local[2] zoo01,zoo02,zoo03 \
- * my-consumer-group topic1,topic2 1`
+ * `$ bin/run-example \
+ * org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03 \
+ * my-consumer-group topic1,topic2 1`
*/
object KafkaWordCount {
def main(args: Array[String]) {
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
index 3a10daa9ab..e4283e04a1 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/MQTTWordCount.scala
@@ -75,14 +75,14 @@ object MQTTPublisher {
* Example Java code for Mqtt Publisher and Subscriber can be found here
* https://bitbucket.org/mkjinesh/mqttclient
* Usage: MQTTWordCount <MqttbrokerUrl> <topic>
-\ * <MqttbrokerUrl> and <topic> describe where Mqtt publisher is running.
+ * <MqttbrokerUrl> and <topic> describe where Mqtt publisher is running.
*
* To run this example locally, you may run publisher as
- * `$ ./bin/spark-submit examples.jar \
- * --class org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo`
+ * `$ bin/run-example \
+ * org.apache.spark.examples.streaming.MQTTPublisher tcp://localhost:1883 foo`
* and run the example as
- * `$ ./bin/spark-submit examples.jar \
- * --class org.apache.spark.examples.streaming.MQTTWordCount tcp://localhost:1883 foo`
+ * `$ bin/run-example \
+ * org.apache.spark.examples.streaming.MQTTWordCount tcp://localhost:1883 foo`
*/
object MQTTWordCount {
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala
index ad7a199b2c..ae0a08c6cd 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala
@@ -23,7 +23,7 @@ import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel
/**
- * Counts words in text encoded with UTF8 received from the network every second.
+ * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
*
* Usage: NetworkWordCount <hostname> <port>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
@@ -31,8 +31,7 @@ import org.apache.spark.storage.StorageLevel
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
- * `$ ./bin/spark-submit examples.jar \
- * --class org.apache.spark.examples.streaming.NetworkWordCount localhost 9999`
+ * `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999`
*/
object NetworkWordCount {
def main(args: Array[String]) {
@@ -42,13 +41,16 @@ object NetworkWordCount {
}
StreamingExamples.setStreamingLogLevels()
- val sparkConf = new SparkConf().setAppName("NetworkWordCount");
+
// Create the context with a 1 second batch size
+ val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))
- // Create a NetworkInputDStream on target ip:port and count the
+ // Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
- val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER)
+ // Note that no duplication in storage level only for running locally.
+ // Replication necessary in distributed scenario for fault tolerance.
+ val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
index ace785d9fe..6af3a0f33e 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
@@ -46,8 +46,7 @@ import org.apache.spark.util.IntParam
*
* and run the example as
*
- * `$ ./bin/spark-submit examples.jar \
- * --class org.apache.spark.examples.streaming.RecoverableNetworkWordCount \
+ * `$ ./bin/run-example org.apache.spark.examples.streaming.RecoverableNetworkWordCount \
* localhost 9999 ~/checkpoint/ ~/out`
*
* If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create
@@ -57,7 +56,7 @@ import org.apache.spark.util.IntParam
*
* To run this example in a local standalone cluster with automatic driver recovery,
*
- * `$ ./spark-class org.apache.spark.deploy.Client -s launch <cluster-url> \
+ * `$ bin/spark-class org.apache.spark.deploy.Client -s launch <cluster-url> \
* <path-to-examples-jar> \
* org.apache.spark.examples.streaming.RecoverableNetworkWordCount <cluster-url> \
* localhost 9999 ~/checkpoint ~/out`
@@ -81,7 +80,7 @@ object RecoverableNetworkWordCount {
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))
- // Create a NetworkInputDStream on target ip:port and count the
+ // Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
val lines = ssc.socketTextStream(ip, port)
val words = lines.flatMap(_.split(" "))
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
index 5e1415f3cc..daa1ced63c 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
@@ -31,8 +31,8 @@ import org.apache.spark.streaming.StreamingContext._
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
- * `$ ./bin/spark-submit examples.jar
- * --class org.apache.spark.examples.streaming.StatefulNetworkWordCount localhost 9999`
+ * `$ bin/run-example
+ * org.apache.spark.examples.streaming.StatefulNetworkWordCount localhost 9999`
*/
object StatefulNetworkWordCount {
def main(args: Array[String]) {
@@ -51,7 +51,7 @@ object StatefulNetworkWordCount {
Some(currentCount + previousCount)
}
- val sparkConf = new SparkConf().setAppName("NetworkWordCumulativeCountUpdateStateByKey")
+ val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint(".")
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
index 1ddff22cb8..f55d23ab39 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
@@ -28,13 +28,29 @@ import org.apache.spark.SparkConf
* stream. The stream is instantiated with credentials and optionally filters supplied by the
* command line arguments.
*
+ * Run this on your local machine as
+ *
*/
object TwitterPopularTags {
def main(args: Array[String]) {
+ if (args.length < 4) {
+ System.err.println("Usage: TwitterPopularTags <consumer key> <consumer secret> " +
+ "<access token> <access token secret> [<filters>]")
+ System.exit(1)
+ }
StreamingExamples.setStreamingLogLevels()
- val filters = args
+ val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
+ val filters = args.takeRight(args.length - 4)
+
+ // Set the system properties so that Twitter4j library used by twitter stream
+ // can use them to generat OAuth credentials
+ System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
+ System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
+ System.setProperty("twitter4j.oauth.accessToken", accessToken)
+ System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
+
val sparkConf = new SparkConf().setAppName("TwitterPopularTags")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val stream = TwitterUtils.createStream(ssc, None, filters)
@@ -52,13 +68,13 @@ object TwitterPopularTags {
// Print popular hashtags
topCounts60.foreachRDD(rdd => {
- val topList = rdd.take(5)
+ val topList = rdd.take(10)
println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})
topCounts10.foreachRDD(rdd => {
- val topList = rdd.take(5)
+ val topList = rdd.take(10)
println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
index 7ade3f1018..79905af381 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
@@ -68,11 +68,11 @@ object SimpleZeroMQPublisher {
* <zeroMQurl> and <topic> describe where zeroMq publisher is running.
*
* To run this example locally, you may run publisher as
- * `$ ./bin/spark-submit examples.jar \
- * --class org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar`
+ * `$ bin/run-example \
+ * org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar`
* and run the example as
- * `$ ./bin/spark-submit examples.jar \
- * --class org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.1.1:1234 foo`
+ * `$ bin/run-example \
+ * org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.1.1:1234 foo`
*/
// scalastyle:on
object ZeroMQWordCount {
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
index 97e0cb9207..8402491b62 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
@@ -40,11 +40,13 @@ object PageView extends Serializable {
/** Generates streaming events to simulate page views on a website.
*
* This should be used in tandem with PageViewStream.scala. Example:
- * $ ./bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10
- * $ ./bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444
*
- * When running this, you may want to set the root logging level to ERROR in
- * conf/log4j.properties to reduce the verbosity of the output.
+ * To run the generator
+ * `$ bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10`
+ * To process the generated stream
+ * `$ bin/run-example \
+ * org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444`
+ *
*/
// scalastyle:on
object PageViewGenerator {
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala
index d30ceffbe2..d9b886eff7 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala
@@ -26,8 +26,11 @@ import org.apache.spark.examples.streaming.StreamingExamples
* operators available in Spark streaming.
*
* This should be used in tandem with PageViewStream.scala. Example:
- * $ ./bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10
- * $ ./bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444
+ * To run the generator
+ * `$ bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10`
+ * To process the generated stream
+ * `$ bin/run-example \
+ * org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444`
*/
// scalastyle:on
object PageViewStream {