aboutsummaryrefslogtreecommitdiff
path: root/examples/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-07 12:41:07 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-07 12:41:07 -0800
commit12300758ccbe7d45c6149d56772133ae1bc5cb25 (patch)
tree3b10f668c01d61595d97a2aecb418871ecabc52b /examples/src
parent915d9931fea72296ed50765ad3f7a88d254d7d60 (diff)
parentd3064fe70762cbfcb7dbd5e1fbd708539c3de5e9 (diff)
downloadspark-12300758ccbe7d45c6149d56772133ae1bc5cb25.tar.gz
spark-12300758ccbe7d45c6149d56772133ae1bc5cb25.tar.bz2
spark-12300758ccbe7d45c6149d56772133ae1bc5cb25.zip
Merge pull request #372 from Reinvigorate/sm-kafka
Removing offset management code that is non-existent in kafka 0.7.0+
Diffstat (limited to 'examples/src')
-rw-r--r--examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala16
1 files changed, 8 insertions, 8 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
index fe55db6e2c..65d5da82fc 100644
--- a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
@@ -13,19 +13,19 @@ import spark.streaming.util.RawTextHelper._
object KafkaWordCount {
def main(args: Array[String]) {
- if (args.length < 6) {
- System.err.println("Usage: KafkaWordCount <master> <hostname> <port> <group> <topics> <numThreads>")
+ if (args.length < 5) {
+ System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
- val Array(master, hostname, port, group, topics, numThreads) = args
+ val Array(master, zkQuorum, group, topics, numThreads) = args
val sc = new SparkContext(master, "KafkaWordCount")
val ssc = new StreamingContext(sc, Seconds(2))
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
- val lines = ssc.kafkaStream[String](hostname, port.toInt, group, topicpMap)
+ val lines = ssc.kafkaStream[String](zkQuorum, group, topicpMap)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
wordCounts.print()
@@ -38,16 +38,16 @@ object KafkaWordCount {
object KafkaWordCountProducer {
def main(args: Array[String]) {
- if (args.length < 3) {
- System.err.println("Usage: KafkaWordCountProducer <hostname> <port> <topic> <messagesPerSec> <wordsPerMessage>")
+ if (args.length < 2) {
+ System.err.println("Usage: KafkaWordCountProducer <zkQuorum> <topic> <messagesPerSec> <wordsPerMessage>")
System.exit(1)
}
- val Array(hostname, port, topic, messagesPerSec, wordsPerMessage) = args
+ val Array(zkQuorum, topic, messagesPerSec, wordsPerMessage) = args
// Zookeper connection properties
val props = new Properties()
- props.put("zk.connect", hostname + ":" + port)
+ props.put("zk.connect", zkQuorum)
props.put("serializer.class", "kafka.serializer.StringEncoder")
val config = new ProducerConfig(props)