diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-07 12:41:07 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-07 12:41:07 -0800 |
commit | 12300758ccbe7d45c6149d56772133ae1bc5cb25 (patch) | |
tree | 3b10f668c01d61595d97a2aecb418871ecabc52b /examples | |
parent | 915d9931fea72296ed50765ad3f7a88d254d7d60 (diff) | |
parent | d3064fe70762cbfcb7dbd5e1fbd708539c3de5e9 (diff) | |
download | spark-12300758ccbe7d45c6149d56772133ae1bc5cb25.tar.gz spark-12300758ccbe7d45c6149d56772133ae1bc5cb25.tar.bz2 spark-12300758ccbe7d45c6149d56772133ae1bc5cb25.zip |
Merge pull request #372 from Reinvigorate/sm-kafka
Removing offset management code that is non-existent in kafka 0.7.0+
Diffstat (limited to 'examples')
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala | 16 |
1 files changed, 8 insertions, 8 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala index fe55db6e2c..65d5da82fc 100644 --- a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala @@ -13,19 +13,19 @@ import spark.streaming.util.RawTextHelper._ object KafkaWordCount { def main(args: Array[String]) { - if (args.length < 6) { - System.err.println("Usage: KafkaWordCount <master> <hostname> <port> <group> <topics> <numThreads>") + if (args.length < 5) { + System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>") System.exit(1) } - val Array(master, hostname, port, group, topics, numThreads) = args + val Array(master, zkQuorum, group, topics, numThreads) = args val sc = new SparkContext(master, "KafkaWordCount") val ssc = new StreamingContext(sc, Seconds(2)) ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap - val lines = ssc.kafkaStream[String](hostname, port.toInt, group, topicpMap) + val lines = ssc.kafkaStream[String](zkQuorum, group, topicpMap) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2) wordCounts.print() @@ -38,16 +38,16 @@ object KafkaWordCount { object KafkaWordCountProducer { def main(args: Array[String]) { - if (args.length < 3) { - System.err.println("Usage: KafkaWordCountProducer <hostname> <port> <topic> <messagesPerSec> <wordsPerMessage>") + if (args.length < 2) { + System.err.println("Usage: KafkaWordCountProducer <zkQuorum> <topic> <messagesPerSec> <wordsPerMessage>") System.exit(1) } - val Array(hostname, port, topic, messagesPerSec, wordsPerMessage) = args + val Array(zkQuorum, topic, messagesPerSec, wordsPerMessage) = args // Zookeper connection properties val props = new Properties() - props.put("zk.connect", hostname + ":" + port) + props.put("zk.connect", zkQuorum) props.put("serializer.class", "kafka.serializer.StringEncoder") val config = new ProducerConfig(props) |