diff options
Diffstat (limited to 'examples/src/main')
8 files changed, 244 insertions, 24 deletions
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java new file mode 100644 index 0000000000..9a8e4209ed --- /dev/null +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.examples; + +import java.util.Map; +import java.util.HashMap; + +import com.google.common.collect.Lists; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import scala.Tuple2; + +/** + * Consumes messages from one or more topics in Kafka and does wordcount. + * Usage: JavaKafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads> + * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. + * <zkQuorum> is a list of one or more zookeeper servers that make quorum + * <group> is the name of kafka consumer group + * <topics> is a list of one or more kafka topics to consume from + * <numThreads> is the number of threads the kafka consumer should use + * + * Example: + * `./run-example org.apache.spark.streaming.examples.JavaKafkaWordCount local[2] zoo01,zoo02, + * zoo03 my-consumer-group topic1,topic2 1` + */ + +public class JavaKafkaWordCount { + public static void main(String[] args) { + if (args.length < 5) { + System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>"); + System.exit(1); + } + + // Create the context with a 1 second batch size + JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount", + new Duration(2000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); + + int numThreads = Integer.parseInt(args[4]); + Map<String, Integer> topicMap = new HashMap<String, Integer>(); + String[] topics = args[3].split(","); + for (String topic: topics) { + topicMap.put(topic, numThreads); + } + + JavaPairDStream<String, String> messages = ssc.kafkaStream(args[1], args[2], topicMap); + + JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { + @Override + public String call(Tuple2<String, String> tuple2) throws Exception { + return tuple2._2(); + } + }); + + JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { + @Override + public Iterable<String> call(String x) { + return Lists.newArrayList(x.split(" ")); + } + }); + + JavaPairDStream<String, Integer> wordCounts = words.map( + new PairFunction<String, String, Integer>() { + @Override + public Tuple2<String, Integer> call(String s) throws Exception { + return new Tuple2<String, Integer>(s, 1); + } + }).reduceByKey(new Function2<Integer, Integer, Integer>() { + @Override + public Integer call(Integer i1, Integer i2) throws Exception { + return i1 + i2; + } + }); + + wordCounts.print(); + ssc.start(); + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala index 868ff81f67..529709c2f9 100644 --- a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala @@ -22,12 +22,19 @@ import org.apache.spark.SparkContext object BroadcastTest { def main(args: Array[String]) { if (args.length == 0) { - System.err.println("Usage: BroadcastTest <master> [<slices>] [numElem]") + System.err.println("Usage: BroadcastTest <master> [slices] [numElem] [broadcastAlgo] [blockSize]") System.exit(1) } - val sc = new SparkContext(args(0), "Broadcast Test", + val bcName = if (args.length > 3) args(3) else "Http" + val blockSize = if (args.length > 4) args(4) else "4096" + + System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName + "BroadcastFactory") + System.setProperty("spark.broadcast.blockSize", blockSize) + + val sc = new SparkContext(args(0), "Broadcast Test 2", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + val slices = if (args.length > 1) args(1).toInt else 2 val num = if (args.length > 2) args(2).toInt else 1000000 @@ -36,13 +43,15 @@ object BroadcastTest { arr1(i) = i } - for (i <- 0 until 2) { + for (i <- 0 until 3) { println("Iteration " + i) println("===========") + val startTime = System.nanoTime val barr1 = sc.broadcast(arr1) sc.parallelize(1 to 10, slices).foreach { i => println(barr1.value.size) } + println("Iteration %d took %.0f milliseconds".format(i, (System.nanoTime - startTime) / 1E6)) } System.exit(0) diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala index 646682878f..86dd9ca1b3 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala @@ -21,6 +21,7 @@ import java.util.Random import scala.math.exp import org.apache.spark.util.Vector import org.apache.spark._ +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler.InputFormatInfo /** @@ -51,7 +52,7 @@ object SparkHdfsLR { System.exit(1) } val inputPath = args(1) - val conf = SparkEnv.get.hadoop.newConfiguration() + val conf = SparkHadoopUtil.get.newConfiguration() val sc = new SparkContext(args(0), "SparkHdfsLR", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")), Map(), InputFormatInfo.computePreferredLocations( diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala index f7bf75b4e5..bc2db39c12 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala @@ -21,8 +21,6 @@ import java.util.Random import org.apache.spark.SparkContext import org.apache.spark.util.Vector import org.apache.spark.SparkContext._ -import scala.collection.mutable.HashMap -import scala.collection.mutable.HashSet /** * K-means clustering. diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala index 5a2bc9b0d0..a689e5a360 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala @@ -38,6 +38,6 @@ object SparkPi { if (x*x + y*y < 1) 1 else 0 }.reduce(_ + _) println("Pi is roughly " + 4.0 * count / n) - System.exit(0) + spark.stop() } } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala index 12f939d5a7..570ba4c81a 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala @@ -18,13 +18,11 @@ package org.apache.spark.streaming.examples import java.util.Properties -import kafka.message.Message -import kafka.producer.SyncProducerConfig + import kafka.producer._ -import org.apache.spark.SparkContext + import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.util.RawTextHelper._ /** @@ -54,9 +52,10 @@ object KafkaWordCount { ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap - val lines = ssc.kafkaStream(zkQuorum, group, topicpMap) + val lines = ssc.kafkaStream(zkQuorum, group, topicpMap).map(_._2) val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2) + val wordCounts = words.map(x => (x, 1l)) + .reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2) wordCounts.print() ssc.start() @@ -68,15 +67,16 @@ object KafkaWordCountProducer { def main(args: Array[String]) { if (args.length < 2) { - System.err.println("Usage: KafkaWordCountProducer <zkQuorum> <topic> <messagesPerSec> <wordsPerMessage>") + System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " + + "<messagesPerSec> <wordsPerMessage>") System.exit(1) } - val Array(zkQuorum, topic, messagesPerSec, wordsPerMessage) = args + val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args // Zookeper connection properties val props = new Properties() - props.put("zk.connect", zkQuorum) + props.put("metadata.broker.list", brokers) props.put("serializer.class", "kafka.serializer.StringEncoder") val config = new ProducerConfig(props) @@ -85,11 +85,13 @@ object KafkaWordCountProducer { // Send some messages while(true) { val messages = (1 to messagesPerSec.toInt).map { messageNum => - (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ") + val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) + .mkString(" ") + + new KeyedMessage[String, String](topic, str) }.toArray - println(messages.mkString(",")) - val data = new ProducerData[String, String](topic, messages) - producer.send(data) + + producer.send(messages: _*) Thread.sleep(100) } } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala new file mode 100644 index 0000000000..af698a01d5 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.examples + +import org.apache.spark.streaming.{ Seconds, StreamingContext } +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.dstream.MQTTReceiver +import org.apache.spark.storage.StorageLevel + +import org.eclipse.paho.client.mqttv3.MqttClient +import org.eclipse.paho.client.mqttv3.MqttClientPersistence +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence +import org.eclipse.paho.client.mqttv3.MqttException +import org.eclipse.paho.client.mqttv3.MqttMessage +import org.eclipse.paho.client.mqttv3.MqttTopic + +/** + * A simple Mqtt publisher for demonstration purposes, repeatedly publishes + * Space separated String Message "hello mqtt demo for spark streaming" + */ +object MQTTPublisher { + + var client: MqttClient = _ + + def main(args: Array[String]) { + if (args.length < 2) { + System.err.println("Usage: MQTTPublisher <MqttBrokerUrl> <topic>") + System.exit(1) + } + + val Seq(brokerUrl, topic) = args.toSeq + + try { + var peristance:MqttClientPersistence =new MqttDefaultFilePersistence("/tmp") + client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance) + } catch { + case e: MqttException => println("Exception Caught: " + e) + } + + client.connect() + + val msgtopic: MqttTopic = client.getTopic(topic); + val msg: String = "hello mqtt demo for spark streaming" + + while (true) { + val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes()) + msgtopic.publish(message); + println("Published data. topic: " + msgtopic.getName() + " Message: " + message) + } + client.disconnect() + } +} + +/** + * A sample wordcount with MqttStream stream + * + * To work with Mqtt, Mqtt Message broker/server required. + * Mosquitto (http://mosquitto.org/) is an open source Mqtt Broker + * In ubuntu mosquitto can be installed using the command `$ sudo apt-get install mosquitto` + * Eclipse paho project provides Java library for Mqtt Client http://www.eclipse.org/paho/ + * Example Java code for Mqtt Publisher and Subscriber can be found here https://bitbucket.org/mkjinesh/mqttclient + * Usage: MQTTWordCount <master> <MqttbrokerUrl> <topic> + * In local mode, <master> should be 'local[n]' with n > 1 + * <MqttbrokerUrl> and <topic> describe where Mqtt publisher is running. + * + * To run this example locally, you may run publisher as + * `$ ./run-example org.apache.spark.streaming.examples.MQTTPublisher tcp://localhost:1883 foo` + * and run the example as + * `$ ./run-example org.apache.spark.streaming.examples.MQTTWordCount local[2] tcp://localhost:1883 foo` + */ +object MQTTWordCount { + + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println( + "Usage: MQTTWordCount <master> <MqttbrokerUrl> <topic>" + + " In local mode, <master> should be 'local[n]' with n > 1") + System.exit(1) + } + + val Seq(master, brokerUrl, topic) = args.toSeq + + val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"), + Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + val lines = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_ONLY) + + val words = lines.flatMap(x => x.toString.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.print() + ssc.start() + } +} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala index 884d6d6f34..de70c50473 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala @@ -17,17 +17,19 @@ package org.apache.spark.streaming.examples.clickstream -import java.net.{InetAddress,ServerSocket,Socket,SocketException} -import java.io.{InputStreamReader, BufferedReader, PrintWriter} +import java.net.ServerSocket +import java.io.PrintWriter import util.Random /** Represents a page view on a website with associated dimension data.*/ -class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int) { +class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int) + extends Serializable { override def toString() : String = { "%s\t%s\t%s\t%s\n".format(url, status, zipCode, userID) } } -object PageView { + +object PageView extends Serializable { def fromString(in : String) : PageView = { val parts = in.split("\t") new PageView(parts(0), parts(1).toInt, parts(2).toInt, parts(3).toInt) @@ -39,6 +41,9 @@ object PageView { * This should be used in tandem with PageViewStream.scala. Example: * $ ./run-example spark.streaming.examples.clickstream.PageViewGenerator 44444 10 * $ ./run-example spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444 + * + * When running this, you may want to set the root logging level to ERROR in + * conf/log4j.properties to reduce the verbosity of the output. * */ object PageViewGenerator { val pages = Map("http://foo.com/" -> .7, |