diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-10-21 23:31:38 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-10-21 23:31:38 -0700 |
commit | 731c94e91d310eadb58ec5ff3338b430f10fdefb (patch) | |
tree | 439912d78b338df612dc134565e3e0590452bacc /examples | |
parent | 48952d67e6dde25faaba241b9deba737b83a5372 (diff) | |
parent | c23cd72b4bbcbf5f615636095c69e9a2e39bfbdd (diff) | |
download | spark-731c94e91d310eadb58ec5ff3338b430f10fdefb.tar.gz spark-731c94e91d310eadb58ec5ff3338b430f10fdefb.tar.bz2 spark-731c94e91d310eadb58ec5ff3338b430f10fdefb.zip |
Merge pull request #56 from jerryshao/kafka-0.8-dev
Upgrade Kafka 0.7.2 to Kafka 0.8.0-beta1 for Spark Streaming
Conflicts:
streaming/pom.xml
Diffstat (limited to 'examples')
3 files changed, 135 insertions, 19 deletions
diff --git a/examples/pom.xml b/examples/pom.xml index 15399a8a33..aee371fbc7 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -32,13 +32,20 @@ <url>http://spark.incubator.apache.org/</url> <repositories> - <!-- A repository in the local filesystem for the Kafka JAR, which we modified for Scala 2.9 --> <repository> - <id>lib</id> - <url>file://${project.basedir}/lib</url> + <id>apache-repo</id> + <name>Apache Repository</name> + <url>https://repository.apache.org/content/repositories/releases</url> + <releases> + <enabled>true</enabled> + </releases> + <snapshots> + <enabled>false</enabled> + </snapshots> </repository> </repositories> + <dependencies> <dependency> <groupId>org.apache.spark</groupId> @@ -81,9 +88,18 @@ </dependency> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka</artifactId> - <version>0.7.2-spark</version> <!-- Comes from our in-project repository --> - <scope>provided</scope> + <artifactId>kafka_2.9.2</artifactId> + <version>0.8.0-beta1</version> + <exclusions> + <exclusion> + <groupId>com.sun.jmx</groupId> + <artifactId>jmxri</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jdmk</groupId> + <artifactId>jmxtools</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java new file mode 100644 index 0000000000..9a8e4209ed --- /dev/null +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.examples; + +import java.util.Map; +import java.util.HashMap; + +import com.google.common.collect.Lists; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import scala.Tuple2; + +/** + * Consumes messages from one or more topics in Kafka and does wordcount. + * Usage: JavaKafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads> + * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. + * <zkQuorum> is a list of one or more zookeeper servers that make quorum + * <group> is the name of kafka consumer group + * <topics> is a list of one or more kafka topics to consume from + * <numThreads> is the number of threads the kafka consumer should use + * + * Example: + * `./run-example org.apache.spark.streaming.examples.JavaKafkaWordCount local[2] zoo01,zoo02, + * zoo03 my-consumer-group topic1,topic2 1` + */ + +public class JavaKafkaWordCount { + public static void main(String[] args) { + if (args.length < 5) { + System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>"); + System.exit(1); + } + + // Create the context with a 1 second batch size + JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount", + new Duration(2000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); + + int numThreads = Integer.parseInt(args[4]); + Map<String, Integer> topicMap = new HashMap<String, Integer>(); + String[] topics = args[3].split(","); + for (String topic: topics) { + topicMap.put(topic, numThreads); + } + + JavaPairDStream<String, String> messages = ssc.kafkaStream(args[1], args[2], topicMap); + + JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { + @Override + public String call(Tuple2<String, String> tuple2) throws Exception { + return tuple2._2(); + } + }); + + JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { + @Override + public Iterable<String> call(String x) { + return Lists.newArrayList(x.split(" ")); + } + }); + + JavaPairDStream<String, Integer> wordCounts = words.map( + new PairFunction<String, String, Integer>() { + @Override + public Tuple2<String, Integer> call(String s) throws Exception { + return new Tuple2<String, Integer>(s, 1); + } + }).reduceByKey(new Function2<Integer, Integer, Integer>() { + @Override + public Integer call(Integer i1, Integer i2) throws Exception { + return i1 + i2; + } + }); + + wordCounts.print(); + ssc.start(); + } +} diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala index 12f939d5a7..570ba4c81a 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala @@ -18,13 +18,11 @@ package org.apache.spark.streaming.examples import java.util.Properties -import kafka.message.Message -import kafka.producer.SyncProducerConfig + import kafka.producer._ -import org.apache.spark.SparkContext + import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.util.RawTextHelper._ /** @@ -54,9 +52,10 @@ object KafkaWordCount { ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap - val lines = ssc.kafkaStream(zkQuorum, group, topicpMap) + val lines = ssc.kafkaStream(zkQuorum, group, topicpMap).map(_._2) val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2) + val wordCounts = words.map(x => (x, 1l)) + .reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2) wordCounts.print() ssc.start() @@ -68,15 +67,16 @@ object KafkaWordCountProducer { def main(args: Array[String]) { if (args.length < 2) { - System.err.println("Usage: KafkaWordCountProducer <zkQuorum> <topic> <messagesPerSec> <wordsPerMessage>") + System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " + + "<messagesPerSec> <wordsPerMessage>") System.exit(1) } - val Array(zkQuorum, topic, messagesPerSec, wordsPerMessage) = args + val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args // Zookeper connection properties val props = new Properties() - props.put("zk.connect", zkQuorum) + props.put("metadata.broker.list", brokers) props.put("serializer.class", "kafka.serializer.StringEncoder") val config = new ProducerConfig(props) @@ -85,11 +85,13 @@ object KafkaWordCountProducer { // Send some messages while(true) { val messages = (1 to messagesPerSec.toInt).map { messageNum => - (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ") + val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) + .mkString(" ") + + new KeyedMessage[String, String](topic, str) }.toArray - println(messages.mkString(",")) - val data = new ProducerData[String, String](topic, messages) - producer.send(data) + + producer.send(messages: _*) Thread.sleep(100) } } |