diff options
author | seanm <sean.mcnamara@webtrends.com> | 2013-01-18 21:34:29 -0700 |
---|---|---|
committer | seanm <sean.mcnamara@webtrends.com> | 2013-01-18 21:34:29 -0700 |
commit | d3064fe70762cbfcb7dbd5e1fbd708539c3de5e9 (patch) | |
tree | 461851d493577204c456c0e19ddddffb36c0fe7f /examples/src | |
parent | 56b7fbafa2b7717896c613e39ecc134f2405b4c6 (diff) | |
download | spark-d3064fe70762cbfcb7dbd5e1fbd708539c3de5e9.tar.gz spark-d3064fe70762cbfcb7dbd5e1fbd708539c3de5e9.tar.bz2 spark-d3064fe70762cbfcb7dbd5e1fbd708539c3de5e9.zip |
kafkaStream API cleanup. A quorum of zookeepers can now be specified
Diffstat (limited to 'examples/src')
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala | 16 |
1 files changed, 8 insertions, 8 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala index fe55db6e2c..65d5da82fc 100644 --- a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala @@ -13,19 +13,19 @@ import spark.streaming.util.RawTextHelper._ object KafkaWordCount { def main(args: Array[String]) { - if (args.length < 6) { - System.err.println("Usage: KafkaWordCount <master> <hostname> <port> <group> <topics> <numThreads>") + if (args.length < 5) { + System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>") System.exit(1) } - val Array(master, hostname, port, group, topics, numThreads) = args + val Array(master, zkQuorum, group, topics, numThreads) = args val sc = new SparkContext(master, "KafkaWordCount") val ssc = new StreamingContext(sc, Seconds(2)) ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap - val lines = ssc.kafkaStream[String](hostname, port.toInt, group, topicpMap) + val lines = ssc.kafkaStream[String](zkQuorum, group, topicpMap) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2) wordCounts.print() @@ -38,16 +38,16 @@ object KafkaWordCount { object KafkaWordCountProducer { def main(args: Array[String]) { - if (args.length < 3) { - System.err.println("Usage: KafkaWordCountProducer <hostname> <port> <topic> <messagesPerSec> <wordsPerMessage>") + if (args.length < 2) { + System.err.println("Usage: KafkaWordCountProducer <zkQuorum> <topic> <messagesPerSec> <wordsPerMessage>") System.exit(1) } - val Array(hostname, port, topic, messagesPerSec, wordsPerMessage) = args + val Array(zkQuorum, topic, messagesPerSec, wordsPerMessage) = args // Zookeper connection properties val props = new Properties() - props.put("zk.connect", hostname + ":" + port) + props.put("zk.connect", zkQuorum) props.put("serializer.class", "kafka.serializer.StringEncoder") val config = new ProducerConfig(props) |