aboutsummaryrefslogtreecommitdiff
path: root/examples/src
diff options
context:
space:
mode:
authorseanm <sean.mcnamara@webtrends.com>2013-01-18 21:34:29 -0700
committerseanm <sean.mcnamara@webtrends.com>2013-01-18 21:34:29 -0700
commitd3064fe70762cbfcb7dbd5e1fbd708539c3de5e9 (patch)
tree461851d493577204c456c0e19ddddffb36c0fe7f /examples/src
parent56b7fbafa2b7717896c613e39ecc134f2405b4c6 (diff)
downloadspark-d3064fe70762cbfcb7dbd5e1fbd708539c3de5e9.tar.gz
spark-d3064fe70762cbfcb7dbd5e1fbd708539c3de5e9.tar.bz2
spark-d3064fe70762cbfcb7dbd5e1fbd708539c3de5e9.zip
kafkaStream API cleanup. A quorum of zookeepers can now be specified
Diffstat (limited to 'examples/src')
-rw-r--r--examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala16
1 files changed, 8 insertions, 8 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
index fe55db6e2c..65d5da82fc 100644
--- a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
@@ -13,19 +13,19 @@ import spark.streaming.util.RawTextHelper._
object KafkaWordCount {
def main(args: Array[String]) {
- if (args.length < 6) {
- System.err.println("Usage: KafkaWordCount <master> <hostname> <port> <group> <topics> <numThreads>")
+ if (args.length < 5) {
+ System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
- val Array(master, hostname, port, group, topics, numThreads) = args
+ val Array(master, zkQuorum, group, topics, numThreads) = args
val sc = new SparkContext(master, "KafkaWordCount")
val ssc = new StreamingContext(sc, Seconds(2))
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
- val lines = ssc.kafkaStream[String](hostname, port.toInt, group, topicpMap)
+ val lines = ssc.kafkaStream[String](zkQuorum, group, topicpMap)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
wordCounts.print()
@@ -38,16 +38,16 @@ object KafkaWordCount {
object KafkaWordCountProducer {
def main(args: Array[String]) {
- if (args.length < 3) {
- System.err.println("Usage: KafkaWordCountProducer <hostname> <port> <topic> <messagesPerSec> <wordsPerMessage>")
+ if (args.length < 2) {
+ System.err.println("Usage: KafkaWordCountProducer <zkQuorum> <topic> <messagesPerSec> <wordsPerMessage>")
System.exit(1)
}
- val Array(hostname, port, topic, messagesPerSec, wordsPerMessage) = args
+ val Array(zkQuorum, topic, messagesPerSec, wordsPerMessage) = args
// Zookeper connection properties
val props = new Properties()
- props.put("zk.connect", hostname + ":" + port)
+ props.put("zk.connect", zkQuorum)
props.put("serializer.class", "kafka.serializer.StringEncoder")
val config = new ProducerConfig(props)