diff options
author | jerryshao <saisai.shao@intel.com> | 2013-10-12 15:02:57 +0800 |
---|---|---|
committer | jerryshao <saisai.shao@intel.com> | 2013-10-12 20:00:42 +0800 |
commit | c23cd72b4bbcbf5f615636095c69e9a2e39bfbdd (patch) | |
tree | 14a083addc11bef9eeac371c8f383dc0e1c439d5 /examples/src/main/scala | |
parent | dca80094d317363e1e0d7e32bc7dfd99faf943cf (diff) | |
download | spark-c23cd72b4bbcbf5f615636095c69e9a2e39bfbdd.tar.gz spark-c23cd72b4bbcbf5f615636095c69e9a2e39bfbdd.tar.bz2 spark-c23cd72b4bbcbf5f615636095c69e9a2e39bfbdd.zip |
Upgrade Kafka 0.7.2 to Kafka 0.8.0-beta1 for Spark Streaming
Diffstat (limited to 'examples/src/main/scala')
-rw-r--r-- | examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala | 28 |
1 files changed, 15 insertions, 13 deletions
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala index 12f939d5a7..570ba4c81a 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala @@ -18,13 +18,11 @@ package org.apache.spark.streaming.examples import java.util.Properties -import kafka.message.Message -import kafka.producer.SyncProducerConfig + import kafka.producer._ -import org.apache.spark.SparkContext + import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.util.RawTextHelper._ /** @@ -54,9 +52,10 @@ object KafkaWordCount { ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap - val lines = ssc.kafkaStream(zkQuorum, group, topicpMap) + val lines = ssc.kafkaStream(zkQuorum, group, topicpMap).map(_._2) val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2) + val wordCounts = words.map(x => (x, 1l)) + .reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2) wordCounts.print() ssc.start() @@ -68,15 +67,16 @@ object KafkaWordCountProducer { def main(args: Array[String]) { if (args.length < 2) { - System.err.println("Usage: KafkaWordCountProducer <zkQuorum> <topic> <messagesPerSec> <wordsPerMessage>") + System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " + + "<messagesPerSec> <wordsPerMessage>") System.exit(1) } - val Array(zkQuorum, topic, messagesPerSec, wordsPerMessage) = args + val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args // Zookeper connection properties val props = new Properties() - props.put("zk.connect", zkQuorum) + props.put("metadata.broker.list", brokers) props.put("serializer.class", "kafka.serializer.StringEncoder") val config = new ProducerConfig(props) @@ -85,11 +85,13 @@ object KafkaWordCountProducer { // Send some messages while(true) { val messages = (1 to messagesPerSec.toInt).map { messageNum => - (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ") + val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString) + .mkString(" ") + + new KeyedMessage[String, String](topic, str) }.toArray - println(messages.mkString(",")) - val data = new ProducerData[String, String](topic, messages) - producer.send(data) + + producer.send(messages: _*) Thread.sleep(100) } } |