From 5e2b0a3bf60dead1ac7946c9984b067c926c2904 Mon Sep 17 00:00:00 2001 From: Denny Date: Mon, 19 Nov 2012 10:17:58 -0800 Subject: Added Kafka Wordcount producer --- .../spark/streaming/examples/KafkaWordCount.scala | 72 +++++++++++++++------- .../spark/streaming/input/KafkaInputDStream.scala | 5 +- 2 files changed, 52 insertions(+), 25 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala index c85ac8e984..12e3f49fe9 100644 --- a/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala +++ b/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala @@ -1,5 +1,9 @@ package spark.streaming.examples +import java.util.Properties +import kafka.message.Message +import kafka.producer.SyncProducerConfig +import kafka.producer._ import spark.streaming._ import spark.streaming.StreamingContext._ import spark.storage.StorageLevel @@ -8,33 +12,57 @@ import spark.streaming.util.RawTextHelper._ object KafkaWordCount { def main(args: Array[String]) { - if (args.length < 4) { - System.err.println("Usage: KafkaWordCount ") + if (args.length < 6) { + System.err.println("Usage: KafkaWordCount ") System.exit(1) } - val ssc = args(3) match { - // Restore the stream from a checkpoint - case "true" => - new StreamingContext("work/checkpoint") - case _ => - val tmp = new StreamingContext(args(0), "KafkaWordCount") - - tmp.setBatchDuration(Seconds(2)) - tmp.checkpoint("work/checkpoint", Seconds(10)) - - val lines = tmp.kafkaStream[String](args(1), args(2).toInt, "test_group", Map("test" -> 1), - Map(KafkaPartitionKey(0,"test","test_group",0) -> 0l)) - val words = lines.flatMap(_.split(" ")) - val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2) - - wordCounts.persist().checkpoint(Seconds(10)) - wordCounts.print() - - tmp - } + val Array(master, hostname, port, group, topics, numThreads) = args + + val ssc = new StreamingContext(master, "KafkaWordCount") + ssc.checkpoint("checkpoint") + ssc.setBatchDuration(Seconds(2)) + + val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap + val lines = ssc.kafkaStream[String](hostname, port.toInt, group, topicpMap) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2) + wordCounts.print() + ssc.start() + } +} + +// Produces some random words between 1 and 100. +object KafkaWordCountProducer { + + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: KafkaWordCountProducer ") + System.exit(1) + } + + val Array(hostname, port, topic, messagesPerSec, wordsPerMessage) = args + // Zookeper connection properties + val props = new Properties() + props.put("zk.connect", hostname + ":" + port) + props.put("serializer.class", "kafka.serializer.StringEncoder") + + val config = new ProducerConfig(props) + val producer = new Producer[String, String](config) + + // Send some messages + while(true) { + val messages = (1 to messagesPerSec.toInt).map { messageNum => + (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ") + }.toArray + println(messages.mkString(",")) + val data = new ProducerData[String, String](topic, messages) + producer.send(data) + Thread.sleep(100) + } } + } diff --git a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala index 3685d6c666..7c642d4802 100644 --- a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala @@ -171,8 +171,7 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String, groupId, msgAndMetadata.topicInfo.partition.partId) val offset = msgAndMetadata.topicInfo.getConsumeOffset offsets.put(key, offset) - // TODO: Remove Logging - logInfo("Handled message: " + (key, offset).toString) + // logInfo("Handled message: " + (key, offset).toString) // Keep on handling messages true @@ -190,5 +189,5 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String, // } // } - + } -- cgit v1.2.3