aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-10 15:47:01 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-10 15:47:01 -0800
commit4f39e79c23b32a411a0d5fdc86b5c17ab2250f8d (patch)
treee62d1139d89f69c2281659d0919a566303d5d7a7 /examples
parent82f07deeda89be2ad34e39ce83ac624c73b8d6e1 (diff)
parent7cef8435d7b6b43a33e8be684c769412186ad6ac (diff)
downloadspark-4f39e79c23b32a411a0d5fdc86b5c17ab2250f8d.tar.gz
spark-4f39e79c23b32a411a0d5fdc86b5c17ab2250f8d.tar.bz2
spark-4f39e79c23b32a411a0d5fdc86b5c17ab2250f8d.zip
Merge remote-tracking branch 'apache/master' into driver-test
Conflicts: streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java2
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java2
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java4
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala12
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala3
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala5
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala8
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala8
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala5
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala21
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala10
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala9
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala4
19 files changed, 76 insertions, 30 deletions
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
index b11cfa667e..7b5a243e26 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
@@ -47,6 +47,8 @@ public final class JavaFlumeEventCount {
System.exit(1);
}
+ StreamingExamples.setStreamingLogLevels();
+
String master = args[0];
String host = args[1];
int port = Integer.parseInt(args[2]);
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
index 16b8a948e6..04f62ee204 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
@@ -59,6 +59,8 @@ public final class JavaKafkaWordCount {
System.exit(1);
}
+ StreamingExamples.setStreamingLogLevels();
+
// Create the context with a 1 second batch size
JavaStreamingContext jssc = new JavaStreamingContext(args[0], "KafkaWordCount",
new Duration(2000), System.getenv("SPARK_HOME"),
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
index e96996aa75..349d826ab5 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
@@ -38,7 +38,7 @@ import java.util.regex.Pattern;
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
- * `$ ./run spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999`
+ * `$ ./run org.apache.spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999`
*/
public final class JavaNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
@@ -53,6 +53,8 @@ public final class JavaNetworkWordCount {
System.exit(1);
}
+ StreamingExamples.setStreamingLogLevels();
+
// Create the context with a 1 second batch size
JavaStreamingContext ssc = new JavaStreamingContext(args[0], "JavaNetworkWordCount",
new Duration(1000), System.getenv("SPARK_HOME"),
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
index e05551ab83..7ef9c6c8f4 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
@@ -41,6 +41,8 @@ public final class JavaQueueStream {
System.exit(1);
}
+ StreamingExamples.setStreamingLogLevels();
+
// Create the context
JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000),
System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaQueueStream.class));
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
index 4e0058cd70..57e1b1f806 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
@@ -18,17 +18,13 @@
package org.apache.spark.streaming.examples
import scala.collection.mutable.LinkedList
-import scala.util.Random
import scala.reflect.ClassTag
+import scala.util.Random
-import akka.actor.Actor
-import akka.actor.ActorRef
-import akka.actor.Props
-import akka.actor.actorRef2Scala
+import akka.actor.{Actor, ActorRef, Props, actorRef2Scala}
import org.apache.spark.SparkConf
-import org.apache.spark.streaming.Seconds
-import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
import org.apache.spark.streaming.receivers.Receiver
import org.apache.spark.util.AkkaUtils
@@ -147,6 +143,8 @@ object ActorWordCount {
System.exit(1)
}
+ StreamingExamples.setStreamingLogLevels()
+
val Seq(master, host, port) = args.toSeq
// Create the context and set the batch size
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
index ae3709b3d9..a59be7899d 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
@@ -17,10 +17,10 @@
package org.apache.spark.streaming.examples
-import org.apache.spark.util.IntParam
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
+import org.apache.spark.util.IntParam
/**
* Produces a count of events received from Flume.
@@ -44,6 +44,8 @@ object FlumeEventCount {
System.exit(1)
}
+ StreamingExamples.setStreamingLogLevels()
+
val Array(master, host, IntParam(port)) = args
val batchInterval = Milliseconds(2000)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala
index ea6ea67419..704b315ef8 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala
@@ -20,7 +20,6 @@ package org.apache.spark.streaming.examples
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
-
/**
* Counts words in new text files created in the given directory
* Usage: HdfsWordCount <master> <directory>
@@ -38,6 +37,8 @@ object HdfsWordCount {
System.exit(1)
}
+ StreamingExamples.setStreamingLogLevels()
+
// Create the context
val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
index 31a94bd224..4a3d81c09a 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
@@ -23,8 +23,8 @@ import kafka.producer._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.streaming.util.RawTextHelper._
import org.apache.spark.streaming.kafka._
+import org.apache.spark.streaming.util.RawTextHelper._
/**
* Consumes messages from one or more topics in Kafka and does wordcount.
@@ -40,12 +40,13 @@ import org.apache.spark.streaming.kafka._
*/
object KafkaWordCount {
def main(args: Array[String]) {
-
if (args.length < 5) {
System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
+ StreamingExamples.setStreamingLogLevels()
+
val Array(master, zkQuorum, group, topics, numThreads) = args
val ssc = new StreamingContext(master, "KafkaWordCount", Seconds(2),
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
index 325290b66f..78b49fdcf1 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
@@ -17,12 +17,8 @@
package org.apache.spark.streaming.examples
-import org.eclipse.paho.client.mqttv3.MqttClient
-import org.eclipse.paho.client.mqttv3.MqttClientPersistence
+import org.eclipse.paho.client.mqttv3.{MqttClient, MqttClientPersistence, MqttException, MqttMessage, MqttTopic}
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
-import org.eclipse.paho.client.mqttv3.MqttException
-import org.eclipse.paho.client.mqttv3.MqttMessage
-import org.eclipse.paho.client.mqttv3.MqttTopic
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
@@ -43,6 +39,8 @@ object MQTTPublisher {
System.exit(1)
}
+ StreamingExamples.setStreamingLogLevels()
+
val Seq(brokerUrl, topic) = args.toSeq
try {
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
index 4b896eaccb..25f7013307 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
@@ -40,6 +40,8 @@ object NetworkWordCount {
System.exit(1)
}
+ StreamingExamples.setStreamingLogLevels()
+
// Create the context with a 1 second batch size
val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala
index 9d640e716b..4d4968ba6a 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala
@@ -17,12 +17,12 @@
package org.apache.spark.streaming.examples
+import scala.collection.mutable.SynchronizedQueue
+
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
-import scala.collection.mutable.SynchronizedQueue
-
object QueueStream {
def main(args: Array[String]) {
@@ -30,7 +30,9 @@ object QueueStream {
System.err.println("Usage: QueueStream <master>")
System.exit(1)
}
-
+
+ StreamingExamples.setStreamingLogLevels()
+
// Create the context
val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
index c0706d0724..3d08d86567 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
@@ -17,11 +17,10 @@
package org.apache.spark.streaming.examples
-import org.apache.spark.util.IntParam
import org.apache.spark.storage.StorageLevel
-
import org.apache.spark.streaming._
import org.apache.spark.streaming.util.RawTextHelper
+import org.apache.spark.util.IntParam
/**
* Receives text from multiple rawNetworkStreams and counts how many '\n' delimited
@@ -45,6 +44,8 @@ object RawNetworkGrep {
System.exit(1)
}
+ StreamingExamples.setStreamingLogLevels()
+
val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args
// Create the context
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
index 002db57d59..1183eba846 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
@@ -39,6 +39,8 @@ object StatefulNetworkWordCount {
System.exit(1)
}
+ StreamingExamples.setStreamingLogLevels()
+
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala
new file mode 100644
index 0000000000..d41d84a980
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala
@@ -0,0 +1,21 @@
+package org.apache.spark.streaming.examples
+
+import org.apache.spark.Logging
+
+import org.apache.log4j.{Level, Logger}
+
+/** Utility functions for Spark Streaming examples. */
+object StreamingExamples extends Logging {
+
+ /** Set reasonable logging levels for streaming if the user has not configured log4j. */
+ def setStreamingLogLevels() {
+ val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
+ if (!log4jInitialized) {
+ // We first log something to initialize Spark's default logging, then we override the
+ // logging level.
+ logInfo("Setting log level to [WARN] for streaming example." +
+ " To override add a custom log4j.properties to the classpath.")
+ Logger.getRootLogger.setLevel(Level.WARN)
+ }
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
index 3ccdc908e2..80b5a98b14 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
@@ -17,12 +17,12 @@
package org.apache.spark.streaming.examples
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.storage.StorageLevel
import com.twitter.algebird._
-import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.SparkContext._
+import org.apache.spark.SparkContext._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.twitter._
/**
@@ -51,6 +51,8 @@ object TwitterAlgebirdCMS {
System.exit(1)
}
+ StreamingExamples.setStreamingLogLevels()
+
// CMS parameters
val DELTA = 1E-3
val EPS = 0.01
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
index c7e83e76b0..cb2f2c51a0 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
@@ -17,10 +17,11 @@
package org.apache.spark.streaming.examples
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.storage.StorageLevel
-import com.twitter.algebird.HyperLogLog._
import com.twitter.algebird.HyperLogLogMonoid
+import com.twitter.algebird.HyperLogLog._
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter._
/**
@@ -44,6 +45,8 @@ object TwitterAlgebirdHLL {
System.exit(1)
}
+ StreamingExamples.setStreamingLogLevels()
+
/** Bit size parameter for HyperLogLog, trades off accuracy vs size */
val BIT_SIZE = 12
val (master, filters) = (args.head, args.tail)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
index e2b0418d55..16c10feaba 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
@@ -36,6 +36,8 @@ object TwitterPopularTags {
System.exit(1)
}
+ StreamingExamples.setStreamingLogLevels()
+
val (master, filters) = (args.head, args.tail)
val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2),
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
index 03902ec353..12d2a1084f 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
@@ -76,6 +76,7 @@ object ZeroMQWordCount {
"In local mode, <master> should be 'local[n]' with n > 1")
System.exit(1)
}
+ StreamingExamples.setStreamingLogLevels()
val Seq(master, url, topic) = args.toSeq
// Create the context and set the batch size
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
index 807af199f4..da6b67bcce 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -17,9 +17,10 @@
package org.apache.spark.streaming.examples.clickstream
+import org.apache.spark.SparkContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.SparkContext._
+import org.apache.spark.streaming.examples.StreamingExamples
/** Analyses a streaming dataset of web page views. This class demonstrates several types of
* operators available in Spark streaming.
@@ -36,6 +37,7 @@ object PageViewStream {
" errorRatePerZipCode, activeUserCount, popularUsersSeen")
System.exit(1)
}
+ StreamingExamples.setStreamingLogLevels()
val metric = args(0)
val host = args(1)
val port = args(2).toInt