aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2013-10-30 15:59:09 -0700
committerAnkur Dave <ankurdave@gmail.com>2013-10-30 15:59:09 -0700
commit5064f9b2d22b9d28734bf19d825d20292a3b0fd9 (patch)
tree5dc2b23dcb54091585dccd3d11810e3110706428 /examples
parenta0c86c36896c20cd70a8fecfe23284486f898883 (diff)
parent618c1f6cf3008caae7a8c0202721a6bd77d29a0f (diff)
downloadspark-5064f9b2d22b9d28734bf19d825d20292a3b0fd9.tar.gz
spark-5064f9b2d22b9d28734bf19d825d20292a3b0fd9.tar.bz2
spark-5064f9b2d22b9d28734bf19d825d20292a3b0fd9.zip
Merge remote-tracking branch 'spark-upstream/master'
Conflicts: project/SparkBuild.scala
Diffstat (limited to 'examples')
-rw-r--r--examples/pom.xml36
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java98
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala15
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkPi.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala28
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala107
6 files changed, 263 insertions, 23 deletions
diff --git a/examples/pom.xml b/examples/pom.xml
index b8c020a321..aee371fbc7 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -32,13 +32,20 @@
<url>http://spark.incubator.apache.org/</url>
<repositories>
- <!-- A repository in the local filesystem for the Kafka JAR, which we modified for Scala 2.9 -->
<repository>
- <id>lib</id>
- <url>file://${project.basedir}/lib</url>
+ <id>apache-repo</id>
+ <name>Apache Repository</name>
+ <url>https://repository.apache.org/content/repositories/releases</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
</repository>
</repositories>
+
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
@@ -81,9 +88,18 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka</artifactId>
- <version>0.7.2-spark</version> <!-- Comes from our in-project repository -->
- <scope>provided</scope>
+ <artifactId>kafka_2.9.2</artifactId>
+ <version>0.8.0-beta1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
@@ -137,6 +153,14 @@
<groupId>org.apache.cassandra.deps</groupId>
<artifactId>avro</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.sonatype.sisu.inject</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
</exclusions>
</dependency>
</dependencies>
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
new file mode 100644
index 0000000000..9a8e4209ed
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.examples;
+
+import java.util.Map;
+import java.util.HashMap;
+
+import com.google.common.collect.Lists;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import scala.Tuple2;
+
+/**
+ * Consumes messages from one or more topics in Kafka and does wordcount.
+ * Usage: JavaKafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <zkQuorum> is a list of one or more zookeeper servers that make quorum
+ * <group> is the name of kafka consumer group
+ * <topics> is a list of one or more kafka topics to consume from
+ * <numThreads> is the number of threads the kafka consumer should use
+ *
+ * Example:
+ * `./run-example org.apache.spark.streaming.examples.JavaKafkaWordCount local[2] zoo01,zoo02,
+ * zoo03 my-consumer-group topic1,topic2 1`
+ */
+
+public class JavaKafkaWordCount {
+ public static void main(String[] args) {
+ if (args.length < 5) {
+ System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>");
+ System.exit(1);
+ }
+
+ // Create the context with a 1 second batch size
+ JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount",
+ new Duration(2000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+
+ int numThreads = Integer.parseInt(args[4]);
+ Map<String, Integer> topicMap = new HashMap<String, Integer>();
+ String[] topics = args[3].split(",");
+ for (String topic: topics) {
+ topicMap.put(topic, numThreads);
+ }
+
+ JavaPairDStream<String, String> messages = ssc.kafkaStream(args[1], args[2], topicMap);
+
+ JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
+ @Override
+ public String call(Tuple2<String, String> tuple2) throws Exception {
+ return tuple2._2();
+ }
+ });
+
+ JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+ @Override
+ public Iterable<String> call(String x) {
+ return Lists.newArrayList(x.split(" "));
+ }
+ });
+
+ JavaPairDStream<String, Integer> wordCounts = words.map(
+ new PairFunction<String, String, Integer>() {
+ @Override
+ public Tuple2<String, Integer> call(String s) throws Exception {
+ return new Tuple2<String, Integer>(s, 1);
+ }
+ }).reduceByKey(new Function2<Integer, Integer, Integer>() {
+ @Override
+ public Integer call(Integer i1, Integer i2) throws Exception {
+ return i1 + i2;
+ }
+ });
+
+ wordCounts.print();
+ ssc.start();
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
index 868ff81f67..529709c2f9 100644
--- a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
@@ -22,12 +22,19 @@ import org.apache.spark.SparkContext
object BroadcastTest {
def main(args: Array[String]) {
if (args.length == 0) {
- System.err.println("Usage: BroadcastTest <master> [<slices>] [numElem]")
+ System.err.println("Usage: BroadcastTest <master> [slices] [numElem] [broadcastAlgo] [blockSize]")
System.exit(1)
}
- val sc = new SparkContext(args(0), "Broadcast Test",
+ val bcName = if (args.length > 3) args(3) else "Http"
+ val blockSize = if (args.length > 4) args(4) else "4096"
+
+ System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName + "BroadcastFactory")
+ System.setProperty("spark.broadcast.blockSize", blockSize)
+
+ val sc = new SparkContext(args(0), "Broadcast Test 2",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+
val slices = if (args.length > 1) args(1).toInt else 2
val num = if (args.length > 2) args(2).toInt else 1000000
@@ -36,13 +43,15 @@ object BroadcastTest {
arr1(i) = i
}
- for (i <- 0 until 2) {
+ for (i <- 0 until 3) {
println("Iteration " + i)
println("===========")
+ val startTime = System.nanoTime
val barr1 = sc.broadcast(arr1)
sc.parallelize(1 to 10, slices).foreach {
i => println(barr1.value.size)
}
+ println("Iteration %d took %.0f milliseconds".format(i, (System.nanoTime - startTime) / 1E6))
}
System.exit(0)
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala
index 5a2bc9b0d0..a689e5a360 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala
@@ -38,6 +38,6 @@ object SparkPi {
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / n)
- System.exit(0)
+ spark.stop()
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
index 12f939d5a7..570ba4c81a 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
@@ -18,13 +18,11 @@
package org.apache.spark.streaming.examples
import java.util.Properties
-import kafka.message.Message
-import kafka.producer.SyncProducerConfig
+
import kafka.producer._
-import org.apache.spark.SparkContext
+
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.util.RawTextHelper._
/**
@@ -54,9 +52,10 @@ object KafkaWordCount {
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
- val lines = ssc.kafkaStream(zkQuorum, group, topicpMap)
+ val lines = ssc.kafkaStream(zkQuorum, group, topicpMap).map(_._2)
val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
+ val wordCounts = words.map(x => (x, 1l))
+ .reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
@@ -68,15 +67,16 @@ object KafkaWordCountProducer {
def main(args: Array[String]) {
if (args.length < 2) {
- System.err.println("Usage: KafkaWordCountProducer <zkQuorum> <topic> <messagesPerSec> <wordsPerMessage>")
+ System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
+ "<messagesPerSec> <wordsPerMessage>")
System.exit(1)
}
- val Array(zkQuorum, topic, messagesPerSec, wordsPerMessage) = args
+ val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
// Zookeper connection properties
val props = new Properties()
- props.put("zk.connect", zkQuorum)
+ props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
val config = new ProducerConfig(props)
@@ -85,11 +85,13 @@ object KafkaWordCountProducer {
// Send some messages
while(true) {
val messages = (1 to messagesPerSec.toInt).map { messageNum =>
- (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ")
+ val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
+ .mkString(" ")
+
+ new KeyedMessage[String, String](topic, str)
}.toArray
- println(messages.mkString(","))
- val data = new ProducerData[String, String](topic, messages)
- producer.send(data)
+
+ producer.send(messages: _*)
Thread.sleep(100)
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
new file mode 100644
index 0000000000..af698a01d5
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.examples
+
+import org.apache.spark.streaming.{ Seconds, StreamingContext }
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.dstream.MQTTReceiver
+import org.apache.spark.storage.StorageLevel
+
+import org.eclipse.paho.client.mqttv3.MqttClient
+import org.eclipse.paho.client.mqttv3.MqttClientPersistence
+import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
+import org.eclipse.paho.client.mqttv3.MqttException
+import org.eclipse.paho.client.mqttv3.MqttMessage
+import org.eclipse.paho.client.mqttv3.MqttTopic
+
+/**
+ * A simple Mqtt publisher for demonstration purposes, repeatedly publishes
+ * Space separated String Message "hello mqtt demo for spark streaming"
+ */
+object MQTTPublisher {
+
+ var client: MqttClient = _
+
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ System.err.println("Usage: MQTTPublisher <MqttBrokerUrl> <topic>")
+ System.exit(1)
+ }
+
+ val Seq(brokerUrl, topic) = args.toSeq
+
+ try {
+ var peristance:MqttClientPersistence =new MqttDefaultFilePersistence("/tmp")
+ client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance)
+ } catch {
+ case e: MqttException => println("Exception Caught: " + e)
+ }
+
+ client.connect()
+
+ val msgtopic: MqttTopic = client.getTopic(topic);
+ val msg: String = "hello mqtt demo for spark streaming"
+
+ while (true) {
+ val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes())
+ msgtopic.publish(message);
+ println("Published data. topic: " + msgtopic.getName() + " Message: " + message)
+ }
+ client.disconnect()
+ }
+}
+
+/**
+ * A sample wordcount with MqttStream stream
+ *
+ * To work with Mqtt, Mqtt Message broker/server required.
+ * Mosquitto (http://mosquitto.org/) is an open source Mqtt Broker
+ * In ubuntu mosquitto can be installed using the command `$ sudo apt-get install mosquitto`
+ * Eclipse paho project provides Java library for Mqtt Client http://www.eclipse.org/paho/
+ * Example Java code for Mqtt Publisher and Subscriber can be found here https://bitbucket.org/mkjinesh/mqttclient
+ * Usage: MQTTWordCount <master> <MqttbrokerUrl> <topic>
+ * In local mode, <master> should be 'local[n]' with n > 1
+ * <MqttbrokerUrl> and <topic> describe where Mqtt publisher is running.
+ *
+ * To run this example locally, you may run publisher as
+ * `$ ./run-example org.apache.spark.streaming.examples.MQTTPublisher tcp://localhost:1883 foo`
+ * and run the example as
+ * `$ ./run-example org.apache.spark.streaming.examples.MQTTWordCount local[2] tcp://localhost:1883 foo`
+ */
+object MQTTWordCount {
+
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println(
+ "Usage: MQTTWordCount <master> <MqttbrokerUrl> <topic>" +
+ " In local mode, <master> should be 'local[n]' with n > 1")
+ System.exit(1)
+ }
+
+ val Seq(master, brokerUrl, topic) = args.toSeq
+
+ val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"),
+ Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ val lines = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_ONLY)
+
+ val words = lines.flatMap(x => x.toString.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ wordCounts.print()
+ ssc.start()
+ }
+}