From 67d4a31f8780ba5f30d52b10ea211d472c4da9de Mon Sep 17 00:00:00 2001 From: Neal Wiggins Date: Thu, 10 Oct 2013 09:30:02 -0700 Subject: Remove unnecessary mutable imports --- examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala | 2 -- 1 file changed, 2 deletions(-) (limited to 'examples/src') diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala index f7bf75b4e5..bc2db39c12 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala @@ -21,8 +21,6 @@ import java.util.Random import org.apache.spark.SparkContext import org.apache.spark.util.Vector import org.apache.spark.SparkContext._ -import scala.collection.mutable.HashMap -import scala.collection.mutable.HashSet /** * K-means clustering. -- cgit v1.2.3 From c23cd72b4bbcbf5f615636095c69e9a2e39bfbdd Mon Sep 17 00:00:00 2001 From: jerryshao Date: Sat, 12 Oct 2013 15:02:57 +0800 Subject: Upgrade Kafka 0.7.2 to Kafka 0.8.0-beta1 for Spark Streaming --- examples/pom.xml | 28 ++++-- .../streaming/examples/JavaKafkaWordCount.java | 98 +++++++++++++++++++++ .../spark/streaming/examples/KafkaWordCount.scala | 28 +++--- project/SparkBuild.scala | 9 +- .../kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar | Bin 1358063 -> 0 bytes .../kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md5 | 1 - .../kafka/0.7.2-spark/kafka-0.7.2-spark.jar.sha1 | 1 - .../kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom | 9 -- .../kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md5 | 1 - .../kafka/0.7.2-spark/kafka-0.7.2-spark.pom.sha1 | 1 - .../apache/kafka/kafka/maven-metadata-local.xml | 12 --- .../kafka/kafka/maven-metadata-local.xml.md5 | 1 - .../kafka/kafka/maven-metadata-local.xml.sha1 | 1 - streaming/pom.xml | 32 ++++--- .../apache/spark/streaming/StreamingContext.scala | 20 +++-- .../streaming/api/java/JavaStreamingContext.scala | 33 ++++--- .../streaming/dstream/KafkaInputDStream.scala | 61 +++++++------ .../org/apache/spark/streaming/JavaAPISuite.java | 16 ++-- .../apache/spark/streaming/InputStreamsSuite.scala | 8 +- 19 files changed, 250 insertions(+), 110 deletions(-) create mode 100644 examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java delete mode 100644 streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar delete mode 100644 streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md5 delete mode 100644 streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.sha1 delete mode 100644 streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom delete mode 100644 streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md5 delete mode 100644 streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.sha1 delete mode 100644 streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml delete mode 100644 streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.md5 delete mode 100644 streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.sha1 (limited to 'examples/src') diff --git a/examples/pom.xml b/examples/pom.xml index b8c020a321..b97e6af288 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -32,13 +32,20 @@ http://spark.incubator.apache.org/ - - lib - file://${project.basedir}/lib + apache-repo + Apache Repository + https://repository.apache.org/content/repositories/releases + + true + + + false + + org.apache.spark @@ -81,9 +88,18 @@ org.apache.kafka - kafka - 0.7.2-spark - provided + kafka_2.9.2 + 0.8.0-beta1 + + + com.sun.jmx + jmxri + + + com.sun.jdmk + jmxtools + + org.eclipse.jetty diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java new file mode 100644 index 0000000000..9a8e4209ed --- /dev/null +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.examples; + +import java.util.Map; +import java.util.HashMap; + +import com.google.common.collect.Lists; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import scala.Tuple2; + +/** + * Consumes messages from one or more topics in Kafka and does wordcount. + * Usage: JavaKafkaWordCount + * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. + * is a list of one or more zookeeper servers that make quorum + * is the name of kafka consumer group + * is a list of one or more kafka topics to consume from + * is the number of threads the kafka consumer should use + * + * Example: + * `./run-example org.apache.spark.streaming.examples.JavaKafkaWordCount local[2] zoo01,zoo02, + * zoo03 my-consumer-group topic1,topic2 1` + */ + +public class JavaKafkaWordCount { + public static void main(String[] args) { + if (args.length < 5) { + System.err.println("Usage: KafkaWordCount "); + System.exit(1); + } + + // Create the context with a 1 second batch size + JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount", + new Duration(2000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); + + int numThreads = Integer.parseInt(args[4]); + Map topicMap = new HashMap(); + String[] topics = args[3].split(","); + for (String topic: topics) { + topicMap.put(topic, numThreads); + } + + JavaPairDStream messages = ssc.kafkaStream(args[1], args[2], topicMap); + + JavaDStream lines = messages.map(new Function, String>() { + @Override + public String call(Tuple2 tuple2) throws Exception { + return tuple2._2(); + } + }); + + JavaDStream words = lines.flatMap(new FlatMapFunction() { + @Override + public Iterable call(String x) { + return Lists.newArrayList(x.split(" ")); + } + }); + + JavaPairDStream wordCounts = words.map( + new PairFunction() { + @Override + public Tuple2 call(String s) throws Exception { + return new Tuple2(s, 1); + } + }).reduceByKey(new Function2() { + @Override + public Integer call(Integer i1, Integer i2) throws Exception { + return i1 + i2; + } + }); + + wordCounts.print(); + ssc.start(); + } +} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala index 12f939d5a7..570ba4c81a 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala @@ -18,13 +18,11 @@ package org.apache.spark.streaming.examples import java.util.Properties -import kafka.message.Message -import kafka.producer.SyncProducerConfig + import kafka.producer._ -import org.apache.spark.SparkContext + import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.util.RawTextHelper._ /** @@ -54,9 +52,10 @@ object KafkaWordCount { ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap - val lines = ssc.kafkaStream(zkQuorum, group, topicpMap) + val lines = ssc.kafkaStream(zkQuorum, group, topicpMap).map(_._2) val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2) + val wordCounts = words.map(x => (x, 1l)) + .reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2) wordCounts.print() ssc.start() @@ -68,15 +67,16 @@ object KafkaWordCountProducer { def main(args: Array[String]) { if (args.length < 2) { - System.err.println("Usage: KafkaWordCountProducer ") + System.err.println("Usage: KafkaWordCountProducer " + + " ") System.exit(1) } - val Array(zkQuorum, topic, messagesPerSec, wordsPerMessage) = args + val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args // Zookeper connection properties val props = new Properties() - props.put("zk.connect", zkQuorum) + props.put("metadata.broker.list", brokers) props.put("serializer.class", "kafka.serializer.StringEncoder") val config = new ProducerConfig(props) @@ -85,11 +85,13 @@ object KafkaWordCountProducer { // Send some messages while(true) { val messages = (1 to messagesPerSec.toInt).map { messageNum => - (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ") + val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) + .mkString(" ") + + new KeyedMessage[String, String](topic, str) }.toArray - println(messages.mkString(",")) - val data = new ProducerData[String, String](topic, messages) - producer.send(data) + + producer.send(messages: _*) Thread.sleep(100) } } diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 973f1e2f11..b14970942b 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -273,13 +273,16 @@ object SparkBuild extends Build { def streamingSettings = sharedSettings ++ Seq( name := "spark-streaming", resolvers ++= Seq( - "Akka Repository" at "http://repo.akka.io/releases/" + "Akka Repository" at "http://repo.akka.io/releases/", + "Apache repo" at "https://repository.apache.org/content/repositories/releases" ), libraryDependencies ++= Seq( "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy), - "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty), "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty), - "com.typesafe.akka" % "akka-zeromq" % "2.0.5" excludeAll(excludeNetty) + "com.typesafe.akka" % "akka-zeromq" % "2.0.5" excludeAll(excludeNetty), + "org.apache.kafka" % "kafka_2.9.2" % "0.8.0-beta1" + exclude("com.sun.jdmk", "jmxtools") + exclude("com.sun.jmx", "jmxri") ) ) diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar deleted file mode 100644 index 65f79925a4..0000000000 Binary files a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar and /dev/null differ diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md5 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md5 deleted file mode 100644 index 29f45f4adb..0000000000 --- a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md5 +++ /dev/null @@ -1 +0,0 @@ -18876b8bc2e4cef28b6d191aa49d963f \ No newline at end of file diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.sha1 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.sha1 deleted file mode 100644 index e3bd62bac0..0000000000 --- a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -06b27270ffa52250a2c08703b397c99127b72060 \ No newline at end of file diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom deleted file mode 100644 index 082d35726a..0000000000 --- a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom +++ /dev/null @@ -1,9 +0,0 @@ - - - 4.0.0 - org.apache.kafka - kafka - 0.7.2-spark - POM was created from install:install-file - diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md5 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md5 deleted file mode 100644 index 92c4132b5b..0000000000 --- a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md5 +++ /dev/null @@ -1 +0,0 @@ -7bc4322266e6032bdf9ef6eebdd8097d \ No newline at end of file diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.sha1 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.sha1 deleted file mode 100644 index 8a1d8a097a..0000000000 --- a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.sha1 +++ /dev/null @@ -1 +0,0 @@ -d0f79e8eff0db43ca7bcf7dce2c8cd2972685c9d \ No newline at end of file diff --git a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml deleted file mode 100644 index 720cd51c2f..0000000000 --- a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml +++ /dev/null @@ -1,12 +0,0 @@ - - - org.apache.kafka - kafka - - 0.7.2-spark - - 0.7.2-spark - - 20130121015225 - - diff --git a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.md5 b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.md5 deleted file mode 100644 index a4ce5dc9e8..0000000000 --- a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.md5 +++ /dev/null @@ -1 +0,0 @@ -e2b9c7c5f6370dd1d21a0aae5e8dcd77 \ No newline at end of file diff --git a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.sha1 b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.sha1 deleted file mode 100644 index b869eaf2a6..0000000000 --- a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.sha1 +++ /dev/null @@ -1 +0,0 @@ -2a4341da936b6c07a09383d17ffb185ac558ee91 \ No newline at end of file diff --git a/streaming/pom.xml b/streaming/pom.xml index 3b25fb49fb..14c043175d 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -32,10 +32,16 @@ http://spark.incubator.apache.org/ - - lib - file://${project.basedir}/lib + apache-repo + Apache Repository + https://repository.apache.org/content/repositories/releases + + true + + + false + @@ -56,9 +62,18 @@ org.apache.kafka - kafka - 0.7.2-spark - provided + kafka_2.9.2 + 0.8.0-beta1 + + + com.sun.jmx + jmxri + + + com.sun.jdmk + jmxtools + + org.apache.flume @@ -71,11 +86,6 @@ - - com.github.sgroschupf - zkclient - 0.1 - org.twitter4j twitter4j-stream diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 878725c705..dc60046805 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -252,10 +252,14 @@ class StreamingContext private ( groupId: String, topics: Map[String, Int], storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2 - ): DStream[String] = { + ): DStream[(String, String)] = { val kafkaParams = Map[String, String]( - "zk.connect" -> zkQuorum, "groupid" -> groupId, "zk.connectiontimeout.ms" -> "10000") - kafkaStream[String, kafka.serializer.StringDecoder](kafkaParams, topics, storageLevel) + "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, + "zookeeper.connection.timeout.ms" -> "10000") + kafkaStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder]( + kafkaParams, + topics, + storageLevel) } /** @@ -266,12 +270,16 @@ class StreamingContext private ( * in its own thread. * @param storageLevel Storage level to use for storing the received objects */ - def kafkaStream[T: ClassManifest, D <: kafka.serializer.Decoder[_]: Manifest]( + def kafkaStream[ + K: ClassManifest, + V: ClassManifest, + U <: kafka.serializer.Decoder[_]: Manifest, + T <: kafka.serializer.Decoder[_]: Manifest]( kafkaParams: Map[String, String], topics: Map[String, Int], storageLevel: StorageLevel - ): DStream[T] = { - val inputStream = new KafkaInputDStream[T, D](this, kafkaParams, topics, storageLevel) + ): DStream[(K, V)] = { + val inputStream = new KafkaInputDStream[K, V, U, T](this, kafkaParams, topics, storageLevel) registerInputStream(inputStream) inputStream } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 54ba3e6025..6423b916b0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -141,7 +141,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { zkQuorum: String, groupId: String, topics: JMap[String, JInt]) - : JavaDStream[String] = { + : JavaPairDStream[String, String] = { implicit val cmt: ClassManifest[String] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]] ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), @@ -162,7 +162,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { groupId: String, topics: JMap[String, JInt], storageLevel: StorageLevel) - : JavaDStream[String] = { + : JavaPairDStream[String, String] = { implicit val cmt: ClassManifest[String] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]] ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), @@ -171,25 +171,34 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Create an input stream that pulls messages form a Kafka Broker. - * @param typeClass Type of RDD - * @param decoderClass Type of kafka decoder + * @param keyTypeClass Key type of RDD + * @param valueTypeClass value type of RDD + * @param keyDecoderClass Type of kafka key decoder + * @param valueDecoderClass Type of kafka value decoder * @param kafkaParams Map of kafka configuration paramaters. * See: http://kafka.apache.org/configuration.html * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. * @param storageLevel RDD storage level. Defaults to memory-only */ - def kafkaStream[T, D <: kafka.serializer.Decoder[_]]( - typeClass: Class[T], - decoderClass: Class[D], + def kafkaStream[K, V, U <: kafka.serializer.Decoder[_], T <: kafka.serializer.Decoder[_]]( + keyTypeClass: Class[K], + valueTypeClass: Class[V], + keyDecoderClass: Class[U], + valueDecoderClass: Class[T], kafkaParams: JMap[String, String], topics: JMap[String, JInt], storageLevel: StorageLevel) - : JavaDStream[T] = { - implicit val cmt: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] - implicit val cmd: Manifest[D] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[D]] - ssc.kafkaStream[T, D]( + : JavaPairDStream[K, V] = { + implicit val keyCmt: ClassManifest[K] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] + implicit val valueCmt: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + + implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]] + implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]] + + ssc.kafkaStream[K, V, U, T]( kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala index 51e913675d..a5de5e1fb5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala @@ -19,22 +19,18 @@ package org.apache.spark.streaming.dstream import org.apache.spark.Logging import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Time, DStreamCheckpointData, StreamingContext} +import org.apache.spark.streaming.StreamingContext import java.util.Properties import java.util.concurrent.Executors import kafka.consumer._ -import kafka.message.{Message, MessageSet, MessageAndMetadata} import kafka.serializer.Decoder -import kafka.utils.{Utils, ZKGroupTopicDirs} -import kafka.utils.ZkUtils._ +import kafka.utils.VerifiableProperties import kafka.utils.ZKStringSerializer import org.I0Itec.zkclient._ import scala.collection.Map -import scala.collection.mutable.HashMap -import scala.collection.JavaConversions._ /** @@ -46,25 +42,32 @@ import scala.collection.JavaConversions._ * @param storageLevel RDD storage level. */ private[streaming] -class KafkaInputDStream[T: ClassManifest, D <: Decoder[_]: Manifest]( +class KafkaInputDStream[ + K: ClassManifest, + V: ClassManifest, + U <: Decoder[_]: Manifest, + T <: Decoder[_]: Manifest]( @transient ssc_ : StreamingContext, kafkaParams: Map[String, String], topics: Map[String, Int], storageLevel: StorageLevel - ) extends NetworkInputDStream[T](ssc_ ) with Logging { + ) extends NetworkInputDStream[(K, V)](ssc_) with Logging { - - def getReceiver(): NetworkReceiver[T] = { - new KafkaReceiver[T, D](kafkaParams, topics, storageLevel) - .asInstanceOf[NetworkReceiver[T]] + def getReceiver(): NetworkReceiver[(K, V)] = { + new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) + .asInstanceOf[NetworkReceiver[(K, V)]] } } private[streaming] -class KafkaReceiver[T: ClassManifest, D <: Decoder[_]: Manifest]( - kafkaParams: Map[String, String], - topics: Map[String, Int], - storageLevel: StorageLevel +class KafkaReceiver[ + K: ClassManifest, + V: ClassManifest, + U <: Decoder[_]: Manifest, + T <: Decoder[_]: Manifest]( + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel ) extends NetworkReceiver[Any] { // Handles pushing data into the BlockManager @@ -83,27 +86,34 @@ class KafkaReceiver[T: ClassManifest, D <: Decoder[_]: Manifest]( // In case we are using multiple Threads to handle Kafka Messages val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) - logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("groupid")) + logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id")) // Kafka connection properties val props = new Properties() kafkaParams.foreach(param => props.put(param._1, param._2)) // Create the connection to the cluster - logInfo("Connecting to Zookeper: " + kafkaParams("zk.connect")) + logInfo("Connecting to Zookeper: " + kafkaParams("zookeeper.connect")) val consumerConfig = new ConsumerConfig(props) consumerConnector = Consumer.create(consumerConfig) - logInfo("Connected to " + kafkaParams("zk.connect")) + logInfo("Connected to " + kafkaParams("zookeeper.connect")) // When autooffset.reset is defined, it is our responsibility to try and whack the // consumer group zk node. - if (kafkaParams.contains("autooffset.reset")) { - tryZookeeperConsumerGroupCleanup(kafkaParams("zk.connect"), kafkaParams("groupid")) + if (kafkaParams.contains("auto.offset.reset")) { + tryZookeeperConsumerGroupCleanup(kafkaParams("zookeeper.connect"), kafkaParams("group.id")) } // Create Threads for each Topic/Message Stream we are listening - val decoder = manifest[D].erasure.newInstance.asInstanceOf[Decoder[T]] - val topicMessageStreams = consumerConnector.createMessageStreams(topics, decoder) + val keyDecoder = manifest[U].erasure.getConstructor(classOf[VerifiableProperties]) + .newInstance(consumerConfig.props) + .asInstanceOf[Decoder[K]] + val valueDecoder = manifest[T].erasure.getConstructor(classOf[VerifiableProperties]) + .newInstance(consumerConfig.props) + .asInstanceOf[Decoder[V]] + + val topicMessageStreams = consumerConnector.createMessageStreams( + topics, keyDecoder, valueDecoder) // Start the messages handler for each partition topicMessageStreams.values.foreach { streams => @@ -112,11 +122,12 @@ class KafkaReceiver[T: ClassManifest, D <: Decoder[_]: Manifest]( } // Handles Kafka Messages - private class MessageHandler[T: ClassManifest](stream: KafkaStream[T]) extends Runnable { + private class MessageHandler[K: ClassManifest, V: ClassManifest](stream: KafkaStream[K, V]) + extends Runnable { def run() { logInfo("Starting MessageHandler.") for (msgAndMetadata <- stream) { - blockGenerator += msgAndMetadata.message + blockGenerator += (msgAndMetadata.key, msgAndMetadata.message) } } } diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index c0d729ff87..dc01f1e8aa 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -1220,14 +1220,20 @@ public class JavaAPISuite implements Serializable { @Test public void testKafkaStream() { HashMap topics = Maps.newHashMap(); - JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics); - JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, + JavaPairDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics); + JavaPairDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK()); HashMap kafkaParams = Maps.newHashMap(); - kafkaParams.put("zk.connect","localhost:12345"); - kafkaParams.put("groupid","consumer-group"); - JavaDStream test3 = ssc.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics, + kafkaParams.put("zookeeper.connect","localhost:12345"); + kafkaParams.put("group.id","consumer-group"); + JavaPairDStream test3 = ssc.kafkaStream( + String.class, + String.class, + StringDecoder.class, + StringDecoder.class, + kafkaParams, + topics, StorageLevel.MEMORY_AND_DISK()); } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 42e3e51e3f..c29b75ece6 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -268,8 +268,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK) // Test specifying decoder - val kafkaParams = Map("zk.connect"->"localhost:12345","groupid"->"consumer-group") - val test3 = ssc.kafkaStream[String, kafka.serializer.StringDecoder](kafkaParams, topics, StorageLevel.MEMORY_AND_DISK) + val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group") + val test3 = ssc.kafkaStream[ + String, + String, + kafka.serializer.StringDecoder, + kafka.serializer.StringDecoder](kafkaParams, topics, StorageLevel.MEMORY_AND_DISK) } } -- cgit v1.2.3 From 35befe07bb79411f453cf05d482ff051fa827b47 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Tue, 15 Oct 2013 22:51:58 -0700 Subject: Fixing spark streaming example and a bug in examples build. - Examples assembly included a log4j.properties which clobbered Spark's - Example had an error where some classes weren't serializable - Did some other clean-up in this example --- .../streaming/examples/clickstream/PageViewGenerator.scala | 13 +++++++++---- project/SparkBuild.scala | 1 + 2 files changed, 10 insertions(+), 4 deletions(-) (limited to 'examples/src') diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala index 884d6d6f34..de70c50473 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala @@ -17,17 +17,19 @@ package org.apache.spark.streaming.examples.clickstream -import java.net.{InetAddress,ServerSocket,Socket,SocketException} -import java.io.{InputStreamReader, BufferedReader, PrintWriter} +import java.net.ServerSocket +import java.io.PrintWriter import util.Random /** Represents a page view on a website with associated dimension data.*/ -class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int) { +class PageView(val url : String, val status : Int, val zipCode : Int, val userID : Int) + extends Serializable { override def toString() : String = { "%s\t%s\t%s\t%s\n".format(url, status, zipCode, userID) } } -object PageView { + +object PageView extends Serializable { def fromString(in : String) : PageView = { val parts = in.split("\t") new PageView(parts(0), parts(1).toInt, parts(2).toInt, parts(3).toInt) @@ -39,6 +41,9 @@ object PageView { * This should be used in tandem with PageViewStream.scala. Example: * $ ./run-example spark.streaming.examples.clickstream.PageViewGenerator 44444 10 * $ ./run-example spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444 + * + * When running this, you may want to set the root logging level to ERROR in + * conf/log4j.properties to reduce the verbosity of the output. * */ object PageViewGenerator { val pages = Map("http://foo.com/" -> .7, diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 973f1e2f11..f2bbe5358f 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -311,6 +311,7 @@ object SparkBuild extends Build { mergeStrategy in assembly := { case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard + case "log4j.properties" => MergeStrategy.discard case "META-INF/services/org.apache.hadoop.fs.FileSystem" => MergeStrategy.concat case "reference.conf" => MergeStrategy.concat case _ => MergeStrategy.first -- cgit v1.2.3 From 9eaf68fd4032eaa8e8e8930c14fae2fad3d17d14 Mon Sep 17 00:00:00 2001 From: prabeesh Date: Wed, 16 Oct 2013 13:40:38 +0530 Subject: added mqtt adapter wordcount example --- .../spark/streaming/examples/MQTTWordCount.scala | 112 +++++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala (limited to 'examples/src') diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala new file mode 100644 index 0000000000..04e21bef5e --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.examples + +import org.apache.spark.streaming.{ Seconds, StreamingContext } +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.dstream.MQTTReceiver +import org.apache.spark.storage.StorageLevel + +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttClientPersistence; +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.MqttTopic; + +/** + * A simple Mqtt publisher for demonstration purposes, repeatedly publishes Space separated String Message "hello mqtt demo for spark streaming" + */ + +object MQTTPublisher { + + var client: MqttClient = _ + + def main(args: Array[String]) { + if (args.length < 2) { + System.err.println( + "Usage: MQTTPublisher ") + System.exit(1) + } + + val Seq(brokerUrl, topic) = args.toSeq + + try { + var peristance:MqttClientPersistence =new MqttDefaultFilePersistence("/tmp"); + client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance); + } catch { + case e: MqttException => println("Exception Caught: " + e); + + } + + client.connect(); + + val msgtopic: MqttTopic = client.getTopic(topic); + val msg: String = "hello mqtt demo for spark streaming"; + + while (true) { + val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes()); + msgtopic.publish(message); + println("Published data. topic: " + msgtopic.getName() + " Message: " + message); + } + client.disconnect(); + + } +} + +/** + * A sample wordcount with MqttStream stream + * + * To work with Mqtt, Mqtt Message broker/server required. + * Mosquitto (http://mosquitto.org/) is an open source Mqtt Broker + * In ubuntu mosquitto can be installed using the command `$ sudo apt-get install mosquitto` + * Eclipse paho project provides Java library for Mqtt Client http://www.eclipse.org/paho/ + * Example Java code for Mqtt Publisher and Subscriber can be found here https://bitbucket.org/mkjinesh/mqttclient + * Usage: MQTTWordCount + * In local mode, should be 'local[n]' with n > 1 + * and describe where Mqtt publisher is running. + * + * To run this example locally, you may run publisher as + * `$ ./run-example org.apache.spark.streaming.examples.MQTTPublisher tcp://localhost:1883 foo` + * and run the example as + * `$ ./run-example org.apache.spark.streaming.examples.MQTTWordCount local[2] tcp://localhost:1883 foo` + */ + +object MQTTWordCount { + + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println( + "Usage: MQTTWordCount " + + " In local mode, should be 'local[n]' with n > 1") + System.exit(1) + } + + val Seq(master, brokerUrl, topic) = args.toSeq + + val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + val lines = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_ONLY) + + val words = lines.flatMap(x => x.toString.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.print() + + ssc.start() + } +} + -- cgit v1.2.3 From feb45d391f8d09c120d7d43e72e96e9bf9784fa0 Mon Sep 17 00:00:00 2001 From: Mosharaf Chowdhury Date: Mon, 14 Oct 2013 14:50:49 -0700 Subject: Default blockSize is 4MB. BroadcastTest2 example added for testing broadcasts. --- .../apache/spark/broadcast/TorrentBroadcast.scala | 2 +- .../org/apache/spark/examples/BroadcastTest2.scala | 59 ++++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) create mode 100644 examples/src/main/scala/org/apache/spark/examples/BroadcastTest2.scala (limited to 'examples/src') diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 29e0dd26d4..c174804e9a 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -179,7 +179,7 @@ extends Logging { initialized = false } - val BlockSize = System.getProperty("spark.broadcast.blockSize", "2048").toInt * 1024 + val BlockSize = System.getProperty("spark.broadcast.blockSize", "4096").toInt * 1024 def blockifyObject[IN](obj: IN): TorrentInfo = { val byteArray = Utils.serialize[IN](obj) diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest2.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest2.scala new file mode 100644 index 0000000000..4b96d0c9dd --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest2.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples + +import org.apache.spark.SparkContext + +object BroadcastTest2 { + def main(args: Array[String]) { + if (args.length == 0) { + System.err.println("Usage: BroadcastTest2 [slices] [numElem] [broadcastAlgo] [blockSize]") + System.exit(1) + } + + val bcName = if (args.length > 3) args(3) else "Http" + val blockSize = if (args.length > 4) args(4) else "4096" + + System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName + "BroadcastFactory") + System.setProperty("spark.broadcast.blockSize", blockSize) + + val sc = new SparkContext(args(0), "Broadcast Test 2", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + + val slices = if (args.length > 1) args(1).toInt else 2 + val num = if (args.length > 2) args(2).toInt else 1000000 + + var arr1 = new Array[Int](num) + for (i <- 0 until arr1.length) { + arr1(i) = i + } + + for (i <- 0 until 3) { + println("Iteration " + i) + println("===========") + val startTime = System.nanoTime + val barr1 = sc.broadcast(arr1) + sc.parallelize(1 to 10, slices).foreach { + i => println(barr1.value.size) + } + println("Iteration %d took %.0f milliseconds".format(i, (System.nanoTime - startTime) / 1E6)) + } + + System.exit(0) + } +} -- cgit v1.2.3 From e96bd0068f13907a45507030e9ca0b178c193823 Mon Sep 17 00:00:00 2001 From: Mosharaf Chowdhury Date: Wed, 16 Oct 2013 21:28:03 -0700 Subject: BroadcastTest2 --> BroadcastTest --- .../org/apache/spark/examples/BroadcastTest.scala | 15 ++++-- .../org/apache/spark/examples/BroadcastTest2.scala | 59 ---------------------- 2 files changed, 12 insertions(+), 62 deletions(-) delete mode 100644 examples/src/main/scala/org/apache/spark/examples/BroadcastTest2.scala (limited to 'examples/src') diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala index 868ff81f67..529709c2f9 100644 --- a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala @@ -22,12 +22,19 @@ import org.apache.spark.SparkContext object BroadcastTest { def main(args: Array[String]) { if (args.length == 0) { - System.err.println("Usage: BroadcastTest [] [numElem]") + System.err.println("Usage: BroadcastTest [slices] [numElem] [broadcastAlgo] [blockSize]") System.exit(1) } - val sc = new SparkContext(args(0), "Broadcast Test", + val bcName = if (args.length > 3) args(3) else "Http" + val blockSize = if (args.length > 4) args(4) else "4096" + + System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName + "BroadcastFactory") + System.setProperty("spark.broadcast.blockSize", blockSize) + + val sc = new SparkContext(args(0), "Broadcast Test 2", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + val slices = if (args.length > 1) args(1).toInt else 2 val num = if (args.length > 2) args(2).toInt else 1000000 @@ -36,13 +43,15 @@ object BroadcastTest { arr1(i) = i } - for (i <- 0 until 2) { + for (i <- 0 until 3) { println("Iteration " + i) println("===========") + val startTime = System.nanoTime val barr1 = sc.broadcast(arr1) sc.parallelize(1 to 10, slices).foreach { i => println(barr1.value.size) } + println("Iteration %d took %.0f milliseconds".format(i, (System.nanoTime - startTime) / 1E6)) } System.exit(0) diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest2.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest2.scala deleted file mode 100644 index 4b96d0c9dd..0000000000 --- a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest2.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.examples - -import org.apache.spark.SparkContext - -object BroadcastTest2 { - def main(args: Array[String]) { - if (args.length == 0) { - System.err.println("Usage: BroadcastTest2 [slices] [numElem] [broadcastAlgo] [blockSize]") - System.exit(1) - } - - val bcName = if (args.length > 3) args(3) else "Http" - val blockSize = if (args.length > 4) args(4) else "4096" - - System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName + "BroadcastFactory") - System.setProperty("spark.broadcast.blockSize", blockSize) - - val sc = new SparkContext(args(0), "Broadcast Test 2", - System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) - - val slices = if (args.length > 1) args(1).toInt else 2 - val num = if (args.length > 2) args(2).toInt else 1000000 - - var arr1 = new Array[Int](num) - for (i <- 0 until arr1.length) { - arr1(i) = i - } - - for (i <- 0 until 3) { - println("Iteration " + i) - println("===========") - val startTime = System.nanoTime - val barr1 = sc.broadcast(arr1) - sc.parallelize(1 to 10, slices).foreach { - i => println(barr1.value.size) - } - println("Iteration %d took %.0f milliseconds".format(i, (System.nanoTime - startTime) / 1E6)) - } - - System.exit(0) - } -} -- cgit v1.2.3 From 6ec39829e9204c742e364d48c23e106625bba17d Mon Sep 17 00:00:00 2001 From: Prabeesh K Date: Fri, 18 Oct 2013 17:00:28 +0530 Subject: Update MQTTWordCount.scala --- .../spark/streaming/examples/MQTTWordCount.scala | 29 +++++++++++----------- 1 file changed, 14 insertions(+), 15 deletions(-) (limited to 'examples/src') diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala index 04e21bef5e..be6587b316 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala @@ -22,12 +22,12 @@ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.MQTTReceiver import org.apache.spark.storage.StorageLevel -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttClientPersistence; -import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.eclipse.paho.client.mqttv3.MqttTopic; +import org.eclipse.paho.client.mqttv3.MqttClient +import org.eclipse.paho.client.mqttv3.MqttClientPersistence +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence +import org.eclipse.paho.client.mqttv3.MqttException +import org.eclipse.paho.client.mqttv3.MqttMessage +import org.eclipse.paho.client.mqttv3.MqttTopic /** * A simple Mqtt publisher for demonstration purposes, repeatedly publishes Space separated String Message "hello mqtt demo for spark streaming" @@ -47,24 +47,24 @@ object MQTTPublisher { val Seq(brokerUrl, topic) = args.toSeq try { - var peristance:MqttClientPersistence =new MqttDefaultFilePersistence("/tmp"); - client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance); + var peristance:MqttClientPersistence =new MqttDefaultFilePersistence("/tmp") + client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance) } catch { - case e: MqttException => println("Exception Caught: " + e); + case e: MqttException => println("Exception Caught: " + e) } - client.connect(); + client.connect() val msgtopic: MqttTopic = client.getTopic(topic); - val msg: String = "hello mqtt demo for spark streaming"; + val msg: String = "hello mqtt demo for spark streaming" while (true) { - val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes()); + val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes()) msgtopic.publish(message); - println("Published data. topic: " + msgtopic.getName() + " Message: " + message); + println("Published data. topic: " + msgtopic.getName() + " Message: " + message) } - client.disconnect(); + client.disconnect() } } @@ -109,4 +109,3 @@ object MQTTWordCount { ssc.start() } } - -- cgit v1.2.3 From dbafa11396d7c1619f5523fba5ae6abed07e90d9 Mon Sep 17 00:00:00 2001 From: Prabeesh K Date: Tue, 22 Oct 2013 08:50:34 +0530 Subject: Update MQTTWordCount.scala --- .../scala/org/apache/spark/streaming/examples/MQTTWordCount.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'examples/src') diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala index be6587b316..7d06505df7 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala @@ -30,7 +30,8 @@ import org.eclipse.paho.client.mqttv3.MqttMessage import org.eclipse.paho.client.mqttv3.MqttTopic /** - * A simple Mqtt publisher for demonstration purposes, repeatedly publishes Space separated String Message "hello mqtt demo for spark streaming" + * A simple Mqtt publisher for demonstration purposes, repeatedly publishes + * Space separated String Message "hello mqtt demo for spark streaming" */ object MQTTPublisher { @@ -99,13 +100,13 @@ object MQTTWordCount { val Seq(master, brokerUrl, topic) = args.toSeq - val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"), + Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val lines = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_ONLY) val words = lines.flatMap(x => x.toString.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() - ssc.start() } } -- cgit v1.2.3 From 9ca1bd95305a904637075e4f5747b28571114fb1 Mon Sep 17 00:00:00 2001 From: Prabeesh K Date: Tue, 22 Oct 2013 09:05:57 +0530 Subject: Update MQTTWordCount.scala --- .../scala/org/apache/spark/streaming/examples/MQTTWordCount.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) (limited to 'examples/src') diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala index 7d06505df7..af698a01d5 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala @@ -33,15 +33,13 @@ import org.eclipse.paho.client.mqttv3.MqttTopic * A simple Mqtt publisher for demonstration purposes, repeatedly publishes * Space separated String Message "hello mqtt demo for spark streaming" */ - object MQTTPublisher { var client: MqttClient = _ def main(args: Array[String]) { if (args.length < 2) { - System.err.println( - "Usage: MQTTPublisher ") + System.err.println("Usage: MQTTPublisher ") System.exit(1) } @@ -52,7 +50,6 @@ object MQTTPublisher { client = new MqttClient(brokerUrl, MqttClient.generateClientId(), peristance) } catch { case e: MqttException => println("Exception Caught: " + e) - } client.connect() @@ -66,7 +63,6 @@ object MQTTPublisher { println("Published data. topic: " + msgtopic.getName() + " Message: " + message) } client.disconnect() - } } @@ -87,7 +83,6 @@ object MQTTPublisher { * and run the example as * `$ ./run-example org.apache.spark.streaming.examples.MQTTWordCount local[2] tcp://localhost:1883 foo` */ - object MQTTWordCount { def main(args: Array[String]) { -- cgit v1.2.3 From 05a0df2b9e0e4c3d032404187c0adf6d6d881860 Mon Sep 17 00:00:00 2001 From: Ali Ghodsi Date: Fri, 6 Sep 2013 21:30:42 -0700 Subject: Makes Spark SIMR ready. --- .../main/scala/org/apache/spark/SparkContext.scala | 22 +++++-- .../executor/CoarseGrainedExecutorBackend.scala | 5 ++ .../cluster/CoarseGrainedClusterMessage.scala | 4 ++ .../cluster/CoarseGrainedSchedulerBackend.scala | 20 +++++++ .../scheduler/cluster/SimrSchedulerBackend.scala | 69 ++++++++++++++++++++++ .../scala/org/apache/spark/examples/SparkPi.scala | 2 +- .../scala/org/apache/spark/repl/SparkILoop.scala | 14 +++++ 7 files changed, 131 insertions(+), 5 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala (limited to 'examples/src') diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 564466cfd5..3b39c97260 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -56,15 +56,21 @@ import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, - SparkDeploySchedulerBackend, ClusterScheduler} + SparkDeploySchedulerBackend, ClusterScheduler, SimrSchedulerBackend} +import org.apache.spark.scheduler.local.LocalScheduler import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import org.apache.spark.scheduler.local.LocalScheduler import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils} import org.apache.spark.ui.SparkUI import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils} - - +import org.apache.spark.scheduler.StageInfo +import org.apache.spark.storage.RDDInfo +import org.apache.spark.storage.StorageStatus +import scala.Some +import org.apache.spark.scheduler.StageInfo +import org.apache.spark.storage.RDDInfo +import org.apache.spark.storage.StorageStatus /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -127,7 +133,7 @@ class SparkContext( val startTime = System.currentTimeMillis() // Add each JAR given through the constructor - if (jars != null) { + if (jars != null && jars != Seq(null)) { jars.foreach { addJar(_) } } @@ -158,6 +164,8 @@ class SparkContext( val SPARK_REGEX = """spark://(.*)""".r // Regular expression for connection to Mesos cluster val MESOS_REGEX = """mesos://(.*)""".r + //Regular expression for connection to Simr cluster + val SIMR_REGEX = """simr://(.*)""".r master match { case "local" => @@ -176,6 +184,12 @@ class SparkContext( scheduler.initialize(backend) scheduler + case SIMR_REGEX(simrUrl) => + val scheduler = new ClusterScheduler(this) + val backend = new SimrSchedulerBackend(scheduler, this, simrUrl) + scheduler.initialize(backend) + scheduler + case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang. val memoryPerSlaveInt = memoryPerSlave.toInt diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 52b1c492b2..80ff4c59cb 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -80,6 +80,11 @@ private[spark] class CoarseGrainedExecutorBackend( case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => logError("Driver terminated or disconnected! Shutting down.") System.exit(1) + + case StopExecutor => + logInfo("Driver commanded a shutdown") + context.stop(self) + context.system.shutdown() } override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index a8230ec6bc..53316dae2a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -60,6 +60,10 @@ private[spark] object CoarseGrainedClusterMessages { case object StopDriver extends CoarseGrainedClusterMessage + case object StopExecutor extends CoarseGrainedClusterMessage + + case object StopExecutors extends CoarseGrainedClusterMessage + case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index c0f1c6dbad..80a9b4667d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -101,6 +101,13 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac sender ! true context.stop(self) + case StopExecutors => + logInfo("Asking each executor to shut down") + for (executor <- executorActor.values) { + executor ! StopExecutor + } + sender ! true + case RemoveExecutor(executorId, reason) => removeExecutor(executorId, reason) sender ! true @@ -170,6 +177,19 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + def stopExecutors() { + try { + if (driverActor != null) { + logInfo("Shutting down all executors") + val future = driverActor.ask(StopExecutors)(timeout) + Await.result(future, timeout) + } + } catch { + case e: Exception => + throw new SparkException("Error asking standalone scheduler to shut down executors", e) + } + } + override def stop() { try { if (driverActor != null) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala new file mode 100644 index 0000000000..ae56244979 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -0,0 +1,69 @@ +package org.apache.spark.scheduler.cluster + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import org.apache.spark.{Logging, SparkContext} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{Path, FileSystem} + +private[spark] class SimrSchedulerBackend( + scheduler: ClusterScheduler, + sc: SparkContext, + driverFilePath: String) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) + with Logging { + + val tmpPath = new Path(driverFilePath + "_tmp"); + val filePath = new Path(driverFilePath); + + val maxCores = System.getProperty("spark.simr.executor.cores", "1").toInt + + override def start() { + super.start() + + val driverUrl = "akka://spark@%s:%s/user/%s".format( + System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), + CoarseGrainedSchedulerBackend.ACTOR_NAME) + + val conf = new Configuration() + val fs = FileSystem.get(conf) + + logInfo("Writing to HDFS file: " + driverFilePath); + logInfo("Writing AKKA address: " + driverUrl); + + // Create temporary file to prevent race condition where executors get empty driverUrl file + val temp = fs.create(tmpPath, true) + temp.writeUTF(driverUrl) + temp.writeInt(maxCores) + temp.close() + + // "Atomic" rename + fs.rename(tmpPath, filePath); + } + + override def stop() { + val conf = new Configuration() + val fs = FileSystem.get(conf) + fs.delete(new Path(driverFilePath), false); + super.stopExecutors() + super.stop() + } +} + + diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala index 5a2bc9b0d0..a689e5a360 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala @@ -38,6 +38,6 @@ object SparkPi { if (x*x + y*y < 1) 1 else 0 }.reduce(_ + _) println("Pi is roughly " + 4.0 * count / n) - System.exit(0) + spark.stop() } } diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 48a8fa9328..0ced284da6 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -633,6 +633,20 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master: Result(true, shouldReplay) } + def addAllClasspath(args: Seq[String]): Unit = { + var added = false + var totalClasspath = "" + for (arg <- args) { + val f = File(arg).normalize + if (f.exists) { + added = true + addedClasspath = ClassPath.join(addedClasspath, f.path) + totalClasspath = ClassPath.join(settings.classpath.value, addedClasspath) + } + } + if (added) replay() + } + def addClasspath(arg: String): Unit = { val f = File(arg).normalize if (f.exists) { -- cgit v1.2.3 From e5e0ebdb1190a256e51dbf1265c6957f0cd56a29 Mon Sep 17 00:00:00 2001 From: tgravescs Date: Tue, 29 Oct 2013 20:12:45 -0500 Subject: fix sparkhdfs lr test --- examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'examples/src') diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala index 646682878f..86dd9ca1b3 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala @@ -21,6 +21,7 @@ import java.util.Random import scala.math.exp import org.apache.spark.util.Vector import org.apache.spark._ +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler.InputFormatInfo /** @@ -51,7 +52,7 @@ object SparkHdfsLR { System.exit(1) } val inputPath = args(1) - val conf = SparkEnv.get.hadoop.newConfiguration() + val conf = SparkHadoopUtil.get.newConfiguration() val sc = new SparkContext(args(0), "SparkHdfsLR", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")), Map(), InputFormatInfo.computePreferredLocations( -- cgit v1.2.3