aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-01-13 16:21:26 -0800
committerReynold Xin <rxin@apache.org>2014-01-13 16:21:26 -0800
commite2d25d2dfeb1d43d1e36f169250d8efef4ac232a (patch)
treed911a37f5aacc89bc3a1c76d41842e1c156aec6a /examples
parent8038da232870fe016e73122a2ef110ac8e56ca1e (diff)
parentb93f9d42f21f03163734ef97b2871db945e166da (diff)
downloadspark-e2d25d2dfeb1d43d1e36f169250d8efef4ac232a.tar.gz
spark-e2d25d2dfeb1d43d1e36f169250d8efef4ac232a.tar.bz2
spark-e2d25d2dfeb1d43d1e36f169250d8efef4ac232a.zip
Merge branch 'master' into graphx
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java2
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java2
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java11
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala46
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/LocalALS.scala8
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkALS.scala6
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala12
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala12
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala3
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala5
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala8
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala7
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala8
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala7
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala118
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala21
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala14
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala13
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala6
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala6
28 files changed, 271 insertions, 61 deletions
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
index b11cfa667e..7b5a243e26 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
@@ -47,6 +47,8 @@ public final class JavaFlumeEventCount {
System.exit(1);
}
+ StreamingExamples.setStreamingLogLevels();
+
String master = args[0];
String host = args[1];
int port = Integer.parseInt(args[2]);
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
index 16b8a948e6..04f62ee204 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
@@ -59,6 +59,8 @@ public final class JavaKafkaWordCount {
System.exit(1);
}
+ StreamingExamples.setStreamingLogLevels();
+
// Create the context with a 1 second batch size
JavaStreamingContext jssc = new JavaStreamingContext(args[0], "KafkaWordCount",
new Duration(2000), System.getenv("SPARK_HOME"),
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
index 1e2efd359c..349d826ab5 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
@@ -38,7 +38,7 @@ import java.util.regex.Pattern;
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
- * `$ ./run spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999`
+ * `$ ./run org.apache.spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999`
*/
public final class JavaNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
@@ -48,18 +48,20 @@ public final class JavaNetworkWordCount {
public static void main(String[] args) {
if (args.length < 3) {
- System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
+ System.err.println("Usage: JavaNetworkWordCount <master> <hostname> <port>\n" +
"In local mode, <master> should be 'local[n]' with n > 1");
System.exit(1);
}
+ StreamingExamples.setStreamingLogLevels();
+
// Create the context with a 1 second batch size
- JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount",
+ JavaStreamingContext ssc = new JavaStreamingContext(args[0], "JavaNetworkWordCount",
new Duration(1000), System.getenv("SPARK_HOME"),
JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class));
// Create a NetworkInputDStream on target ip:port and count the
- // words in input stream of \n delimited test (eg. generated by 'nc')
+ // words in input stream of \n delimited text (eg. generated by 'nc')
JavaDStream<String> lines = ssc.socketTextStream(args[1], Integer.parseInt(args[2]));
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
@@ -82,6 +84,5 @@ public final class JavaNetworkWordCount {
wordCounts.print();
ssc.start();
-
}
}
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
index e05551ab83..7ef9c6c8f4 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
@@ -41,6 +41,8 @@ public final class JavaQueueStream {
System.exit(1);
}
+ StreamingExamples.setStreamingLogLevels();
+
// Create the context
JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000),
System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaQueueStream.class));
diff --git a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala
new file mode 100644
index 0000000000..65251e9319
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples
+
+import scala.collection.JavaConversions._
+
+/** Prints out environmental information, sleeps, and then exits. Made to
+ * test driver submission in the standalone scheduler. */
+object DriverSubmissionTest {
+ def main(args: Array[String]) {
+ if (args.size < 1) {
+ println("Usage: DriverSubmissionTest <seconds-to-sleep>")
+ System.exit(0)
+ }
+ val numSecondsToSleep = args(0).toInt
+
+ val env = System.getenv()
+ val properties = System.getProperties()
+
+ println("Environment variables containing SPARK_TEST:")
+ env.filter{case (k, v) => k.contains("SPARK_TEST")}.foreach(println)
+
+ println("System properties containing spark.test:")
+ properties.filter{case (k, v) => k.toString.contains("spark.test")}.foreach(println)
+
+ for (i <- 1 until numSecondsToSleep) {
+ println(s"Alive for $i out of $numSecondsToSleep seconds")
+ Thread.sleep(1000)
+ }
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
index 83db8b9e26..c8ecbb8e41 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
@@ -43,7 +43,7 @@ object LocalALS {
def generateR(): DoubleMatrix2D = {
val mh = factory2D.random(M, F)
val uh = factory2D.random(U, F)
- return algebra.mult(mh, algebra.transpose(uh))
+ algebra.mult(mh, algebra.transpose(uh))
}
def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D],
@@ -56,7 +56,7 @@ object LocalALS {
//println("R: " + r)
blas.daxpy(-1, targetR, r)
val sumSqs = r.aggregate(Functions.plus, Functions.square)
- return sqrt(sumSqs / (M * U))
+ sqrt(sumSqs / (M * U))
}
def updateMovie(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D],
@@ -80,7 +80,7 @@ object LocalALS {
val ch = new CholeskyDecomposition(XtX)
val Xty2D = factory2D.make(Xty.toArray, F)
val solved2D = ch.solve(Xty2D)
- return solved2D.viewColumn(0)
+ solved2D.viewColumn(0)
}
def updateUser(j: Int, u: DoubleMatrix1D, ms: Array[DoubleMatrix1D],
@@ -104,7 +104,7 @@ object LocalALS {
val ch = new CholeskyDecomposition(XtX)
val Xty2D = factory2D.make(Xty.toArray, F)
val solved2D = ch.solve(Xty2D)
- return solved2D.viewColumn(0)
+ solved2D.viewColumn(0)
}
def main(args: Array[String]) {
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
index fb130ea198..9ab5f5a486 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalFileLR.scala
@@ -28,7 +28,7 @@ object LocalFileLR {
def parsePoint(line: String): DataPoint = {
val nums = line.split(' ').map(_.toDouble)
- return DataPoint(new Vector(nums.slice(1, D+1)), nums(0))
+ DataPoint(new Vector(nums.slice(1, D+1)), nums(0))
}
def main(args: Array[String]) {
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
index f90ea35cd4..a730464ea1 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala
@@ -55,7 +55,7 @@ object LocalKMeans {
}
}
- return bestIndex
+ bestIndex
}
def main(args: Array[String]) {
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
index 30c86d83e6..17bafc2218 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
@@ -44,7 +44,7 @@ object SparkALS {
def generateR(): DoubleMatrix2D = {
val mh = factory2D.random(M, F)
val uh = factory2D.random(U, F)
- return algebra.mult(mh, algebra.transpose(uh))
+ algebra.mult(mh, algebra.transpose(uh))
}
def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D],
@@ -57,7 +57,7 @@ object SparkALS {
//println("R: " + r)
blas.daxpy(-1, targetR, r)
val sumSqs = r.aggregate(Functions.plus, Functions.square)
- return sqrt(sumSqs / (M * U))
+ sqrt(sumSqs / (M * U))
}
def update(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D],
@@ -83,7 +83,7 @@ object SparkALS {
val ch = new CholeskyDecomposition(XtX)
val Xty2D = factory2D.make(Xty.toArray, F)
val solved2D = ch.solve(Xty2D)
- return solved2D.viewColumn(0)
+ solved2D.viewColumn(0)
}
def main(args: Array[String]) {
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
index ff72532db1..39819064ed 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala
@@ -43,7 +43,7 @@ object SparkHdfsLR {
while (i < D) {
x(i) = tok.nextToken.toDouble; i += 1
}
- return DataPoint(new Vector(x), y)
+ DataPoint(new Vector(x), y)
}
def main(args: Array[String]) {
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
index 8c99025eaa..9fe2465235 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
@@ -30,7 +30,7 @@ object SparkKMeans {
val rand = new Random(42)
def parseVector(line: String): Vector = {
- return new Vector(line.split(' ').map(_.toDouble))
+ new Vector(line.split(' ').map(_.toDouble))
}
def closestPoint(p: Vector, centers: Array[Vector]): Int = {
@@ -46,7 +46,7 @@ object SparkKMeans {
}
}
- return bestIndex
+ bestIndex
}
def main(args: Array[String]) {
@@ -61,15 +61,15 @@ object SparkKMeans {
val K = args(2).toInt
val convergeDist = args(3).toDouble
- var kPoints = data.takeSample(false, K, 42).toArray
+ val kPoints = data.takeSample(withReplacement = false, K, 42).toArray
var tempDist = 1.0
while(tempDist > convergeDist) {
- var closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
+ val closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
- var pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)}
+ val pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)}
- var newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collectAsMap()
+ val newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collectAsMap()
tempDist = 0.0
for (i <- 0 until K) {
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
index 4e0058cd70..57e1b1f806 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
@@ -18,17 +18,13 @@
package org.apache.spark.streaming.examples
import scala.collection.mutable.LinkedList
-import scala.util.Random
import scala.reflect.ClassTag
+import scala.util.Random
-import akka.actor.Actor
-import akka.actor.ActorRef
-import akka.actor.Props
-import akka.actor.actorRef2Scala
+import akka.actor.{Actor, ActorRef, Props, actorRef2Scala}
import org.apache.spark.SparkConf
-import org.apache.spark.streaming.Seconds
-import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
import org.apache.spark.streaming.receivers.Receiver
import org.apache.spark.util.AkkaUtils
@@ -147,6 +143,8 @@ object ActorWordCount {
System.exit(1)
}
+ StreamingExamples.setStreamingLogLevels()
+
val Seq(master, host, port) = args.toSeq
// Create the context and set the batch size
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
index ae3709b3d9..a59be7899d 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
@@ -17,10 +17,10 @@
package org.apache.spark.streaming.examples
-import org.apache.spark.util.IntParam
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
+import org.apache.spark.util.IntParam
/**
* Produces a count of events received from Flume.
@@ -44,6 +44,8 @@ object FlumeEventCount {
System.exit(1)
}
+ StreamingExamples.setStreamingLogLevels()
+
val Array(master, host, IntParam(port)) = args
val batchInterval = Milliseconds(2000)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala
index ea6ea67419..704b315ef8 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala
@@ -20,7 +20,6 @@ package org.apache.spark.streaming.examples
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
-
/**
* Counts words in new text files created in the given directory
* Usage: HdfsWordCount <master> <directory>
@@ -38,6 +37,8 @@ object HdfsWordCount {
System.exit(1)
}
+ StreamingExamples.setStreamingLogLevels()
+
// Create the context
val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
index 31a94bd224..4a3d81c09a 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
@@ -23,8 +23,8 @@ import kafka.producer._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.streaming.util.RawTextHelper._
import org.apache.spark.streaming.kafka._
+import org.apache.spark.streaming.util.RawTextHelper._
/**
* Consumes messages from one or more topics in Kafka and does wordcount.
@@ -40,12 +40,13 @@ import org.apache.spark.streaming.kafka._
*/
object KafkaWordCount {
def main(args: Array[String]) {
-
if (args.length < 5) {
System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
+ StreamingExamples.setStreamingLogLevels()
+
val Array(master, zkQuorum, group, topics, numThreads) = args
val ssc = new StreamingContext(master, "KafkaWordCount", Seconds(2),
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
index 325290b66f..78b49fdcf1 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
@@ -17,12 +17,8 @@
package org.apache.spark.streaming.examples
-import org.eclipse.paho.client.mqttv3.MqttClient
-import org.eclipse.paho.client.mqttv3.MqttClientPersistence
+import org.eclipse.paho.client.mqttv3.{MqttClient, MqttClientPersistence, MqttException, MqttMessage, MqttTopic}
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
-import org.eclipse.paho.client.mqttv3.MqttException
-import org.eclipse.paho.client.mqttv3.MqttMessage
-import org.eclipse.paho.client.mqttv3.MqttTopic
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
@@ -43,6 +39,8 @@ object MQTTPublisher {
System.exit(1)
}
+ StreamingExamples.setStreamingLogLevels()
+
val Seq(brokerUrl, topic) = args.toSeq
try {
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
index 6a32c75373..25f7013307 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
@@ -21,7 +21,8 @@ import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
/**
- * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
+ * Counts words in text encoded with UTF8 received from the network every second.
+ *
* Usage: NetworkWordCount <master> <hostname> <port>
* <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
@@ -39,12 +40,14 @@ object NetworkWordCount {
System.exit(1)
}
+ StreamingExamples.setStreamingLogLevels()
+
// Create the context with a 1 second batch size
val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
// Create a NetworkInputDStream on target ip:port and count the
- // words in input stream of \n delimited test (eg. generated by 'nc')
+ // words in input stream of \n delimited text (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(1), args(2).toInt)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala
index 9d640e716b..4d4968ba6a 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/QueueStream.scala
@@ -17,12 +17,12 @@
package org.apache.spark.streaming.examples
+import scala.collection.mutable.SynchronizedQueue
+
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
-import scala.collection.mutable.SynchronizedQueue
-
object QueueStream {
def main(args: Array[String]) {
@@ -30,7 +30,9 @@ object QueueStream {
System.err.println("Usage: QueueStream <master>")
System.exit(1)
}
-
+
+ StreamingExamples.setStreamingLogLevels()
+
// Create the context
val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
index c0706d0724..99b79c3949 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
@@ -17,11 +17,10 @@
package org.apache.spark.streaming.examples
-import org.apache.spark.util.IntParam
import org.apache.spark.storage.StorageLevel
-
import org.apache.spark.streaming._
import org.apache.spark.streaming.util.RawTextHelper
+import org.apache.spark.util.IntParam
/**
* Receives text from multiple rawNetworkStreams and counts how many '\n' delimited
@@ -45,6 +44,8 @@ object RawNetworkGrep {
System.exit(1)
}
+ StreamingExamples.setStreamingLogLevels()
+
val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args
// Create the context
@@ -57,7 +58,7 @@ object RawNetworkGrep {
val rawStreams = (1 to numStreams).map(_ =>
ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray
val union = ssc.union(rawStreams)
- union.filter(_.contains("the")).count().foreach(r =>
+ union.filter(_.contains("the")).count().foreachRDD(r =>
println("Grep count: " + r.collect().mkString))
ssc.start()
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
new file mode 100644
index 0000000000..8c5d0bd568
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.examples
+
+import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.util.IntParam
+import java.io.File
+import org.apache.spark.rdd.RDD
+import com.google.common.io.Files
+import java.nio.charset.Charset
+
+/**
+ * Counts words in text encoded with UTF8 received from the network every second.
+ *
+ * Usage: NetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-file>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
+ * <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
+ * <output-file> file to which the word counts will be appended
+ *
+ * In local mode, <master> should be 'local[n]' with n > 1
+ * <checkpoint-directory> and <output-file> must be absolute paths
+ *
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ *
+ * `$ nc -lk 9999`
+ *
+ * and run the example as
+ *
+ * `$ ./run-example org.apache.spark.streaming.examples.RecoverableNetworkWordCount \
+ * local[2] localhost 9999 ~/checkpoint/ ~/out`
+ *
+ * If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create
+ * a new StreamingContext (will print "Creating new context" to the console). Otherwise, if
+ * checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
+ * the checkpoint data.
+ *
+ * To run this example in a local standalone cluster with automatic driver recovery,
+ *
+ * `$ ./spark-class org.apache.spark.deploy.Client -s launch <cluster-url> <path-to-examples-jar> \
+ * org.apache.spark.streaming.examples.RecoverableNetworkWordCount <cluster-url> \
+ * localhost 9999 ~/checkpoint ~/out`
+ *
+ * <path-to-examples-jar> would typically be <spark-dir>/examples/target/scala-XX/spark-examples....jar
+ *
+ * Refer to the online documentation for more details.
+ */
+
+object RecoverableNetworkWordCount {
+
+ def createContext(master: String, ip: String, port: Int, outputPath: String) = {
+
+ // If you do not see this printed, that means the StreamingContext has been loaded
+ // from the new checkpoint
+ println("Creating new context")
+ val outputFile = new File(outputPath)
+ if (outputFile.exists()) outputFile.delete()
+
+ // Create the context with a 1 second batch size
+ val ssc = new StreamingContext(master, "RecoverableNetworkWordCount", Seconds(1),
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
+
+ // Create a NetworkInputDStream on target ip:port and count the
+ // words in input stream of \n delimited text (eg. generated by 'nc')
+ val lines = ssc.socketTextStream(ip, port)
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
+ val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]")
+ println(counts)
+ println("Appending to " + outputFile.getAbsolutePath)
+ Files.append(counts + "\n", outputFile, Charset.defaultCharset())
+ })
+ ssc
+ }
+
+ def main(args: Array[String]) {
+ if (args.length != 5) {
+ System.err.println("You arguments were " + args.mkString("[", ", ", "]"))
+ System.err.println(
+ """
+ |Usage: RecoverableNetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-file>
+ | <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ | <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
+ | <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
+ | <output-file> file to which the word counts will be appended
+ |
+ |In local mode, <master> should be 'local[n]' with n > 1
+ |Both <checkpoint-directory> and <output-file> must be absolute paths
+ """.stripMargin
+ )
+ System.exit(1)
+ }
+ val Array(master, ip, IntParam(port), checkpointDirectory, outputPath) = args
+ val ssc = StreamingContext.getOrCreate(checkpointDirectory,
+ () => {
+ createContext(master, ip, port, outputPath)
+ })
+ ssc.start()
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
index 002db57d59..1183eba846 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
@@ -39,6 +39,8 @@ object StatefulNetworkWordCount {
System.exit(1)
}
+ StreamingExamples.setStreamingLogLevels()
+
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala
new file mode 100644
index 0000000000..d41d84a980
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/StreamingExamples.scala
@@ -0,0 +1,21 @@
+package org.apache.spark.streaming.examples
+
+import org.apache.spark.Logging
+
+import org.apache.log4j.{Level, Logger}
+
+/** Utility functions for Spark Streaming examples. */
+object StreamingExamples extends Logging {
+
+ /** Set reasonable logging levels for streaming if the user has not configured log4j. */
+ def setStreamingLogLevels() {
+ val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
+ if (!log4jInitialized) {
+ // We first log something to initialize Spark's default logging, then we override the
+ // logging level.
+ logInfo("Setting log level to [WARN] for streaming example." +
+ " To override add a custom log4j.properties to the classpath.")
+ Logger.getRootLogger.setLevel(Level.WARN)
+ }
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
index 3ccdc908e2..483c4d3118 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
@@ -17,12 +17,12 @@
package org.apache.spark.streaming.examples
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.storage.StorageLevel
import com.twitter.algebird._
-import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.SparkContext._
+import org.apache.spark.SparkContext._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.twitter._
/**
@@ -51,6 +51,8 @@ object TwitterAlgebirdCMS {
System.exit(1)
}
+ StreamingExamples.setStreamingLogLevels()
+
// CMS parameters
val DELTA = 1E-3
val EPS = 0.01
@@ -79,7 +81,7 @@ object TwitterAlgebirdCMS {
val exactTopUsers = users.map(id => (id, 1))
.reduceByKey((a, b) => a + b)
- approxTopUsers.foreach(rdd => {
+ approxTopUsers.foreachRDD(rdd => {
if (rdd.count() != 0) {
val partial = rdd.first()
val partialTopK = partial.heavyHitters.map(id =>
@@ -94,7 +96,7 @@ object TwitterAlgebirdCMS {
}
})
- exactTopUsers.foreach(rdd => {
+ exactTopUsers.foreachRDD(rdd => {
if (rdd.count() != 0) {
val partialMap = rdd.collect().toMap
val partialTopK = rdd.map(
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
index c7e83e76b0..94c2bf29ac 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
@@ -17,10 +17,11 @@
package org.apache.spark.streaming.examples
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.storage.StorageLevel
-import com.twitter.algebird.HyperLogLog._
import com.twitter.algebird.HyperLogLogMonoid
+import com.twitter.algebird.HyperLogLog._
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter._
/**
@@ -44,6 +45,8 @@ object TwitterAlgebirdHLL {
System.exit(1)
}
+ StreamingExamples.setStreamingLogLevels()
+
/** Bit size parameter for HyperLogLog, trades off accuracy vs size */
val BIT_SIZE = 12
val (master, filters) = (args.head, args.tail)
@@ -64,7 +67,7 @@ object TwitterAlgebirdHLL {
val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
- approxUsers.foreach(rdd => {
+ approxUsers.foreachRDD(rdd => {
if (rdd.count() != 0) {
val partial = rdd.first()
globalHll += partial
@@ -73,7 +76,7 @@ object TwitterAlgebirdHLL {
}
})
- exactUsers.foreach(rdd => {
+ exactUsers.foreachRDD(rdd => {
if (rdd.count() != 0) {
val partial = rdd.first()
userSet ++= partial
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
index e2b0418d55..8a70d4a978 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
@@ -36,6 +36,8 @@ object TwitterPopularTags {
System.exit(1)
}
+ StreamingExamples.setStreamingLogLevels()
+
val (master, filters) = (args.head, args.tail)
val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2),
@@ -54,13 +56,13 @@ object TwitterPopularTags {
// Print popular hashtags
- topCounts60.foreach(rdd => {
+ topCounts60.foreachRDD(rdd => {
val topList = rdd.take(5)
println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})
- topCounts10.foreach(rdd => {
+ topCounts10.foreachRDD(rdd => {
val topList = rdd.take(5)
println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
index 03902ec353..12d2a1084f 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
@@ -76,6 +76,7 @@ object ZeroMQWordCount {
"In local mode, <master> should be 'local[n]' with n > 1")
System.exit(1)
}
+ StreamingExamples.setStreamingLogLevels()
val Seq(master, url, topic) = args.toSeq
// Create the context and set the batch size
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
index 4fe57de4a4..a2600989ca 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala
@@ -65,7 +65,7 @@ object PageViewGenerator {
return item
}
}
- return inputMap.take(1).head._1 // Shouldn't get here if probabilities add up to 1.0
+ inputMap.take(1).head._1 // Shouldn't get here if probabilities add up to 1.0
}
def getNextClickEvent() : String = {
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
index 807af199f4..bb44bc3d06 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -17,9 +17,10 @@
package org.apache.spark.streaming.examples.clickstream
+import org.apache.spark.SparkContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.SparkContext._
+import org.apache.spark.streaming.examples.StreamingExamples
/** Analyses a streaming dataset of web page views. This class demonstrates several types of
* operators available in Spark streaming.
@@ -36,6 +37,7 @@ object PageViewStream {
" errorRatePerZipCode, activeUserCount, popularUsersSeen")
System.exit(1)
}
+ StreamingExamples.setStreamingLogLevels()
val metric = args(0)
val host = args(1)
val port = args(2).toInt
@@ -89,7 +91,7 @@ object PageViewStream {
case "popularUsersSeen" =>
// Look for users in our existing dataset and print it out if we have a match
pageViews.map(view => (view.userID, 1))
- .foreach((rdd, time) => rdd.join(userList)
+ .foreachRDD((rdd, time) => rdd.join(userList)
.map(_._2._2)
.take(10)
.foreach(u => println("Saw user %s at time %s".format(u, time))))