aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorDenny <dennybritz@gmail.com>2012-11-19 10:17:58 -0800
committerDenny <dennybritz@gmail.com>2012-11-19 10:17:58 -0800
commit5e2b0a3bf60dead1ac7946c9984b067c926c2904 (patch)
tree68dd33721ada12d31bcede11008012b72bd01350 /streaming
parent6757ed6a40121ee97a15506af8717bb8d97cf1ec (diff)
downloadspark-5e2b0a3bf60dead1ac7946c9984b067c926c2904.tar.gz
spark-5e2b0a3bf60dead1ac7946c9984b067c926c2904.tar.bz2
spark-5e2b0a3bf60dead1ac7946c9984b067c926c2904.zip
Added Kafka Wordcount producer
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala72
-rw-r--r--streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala5
2 files changed, 52 insertions, 25 deletions
diff --git a/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
index c85ac8e984..12e3f49fe9 100644
--- a/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
@@ -1,5 +1,9 @@
package spark.streaming.examples
+import java.util.Properties
+import kafka.message.Message
+import kafka.producer.SyncProducerConfig
+import kafka.producer._
import spark.streaming._
import spark.streaming.StreamingContext._
import spark.storage.StorageLevel
@@ -8,33 +12,57 @@ import spark.streaming.util.RawTextHelper._
object KafkaWordCount {
def main(args: Array[String]) {
- if (args.length < 4) {
- System.err.println("Usage: KafkaWordCount <master> <hostname> <port> <restore>")
+ if (args.length < 6) {
+ System.err.println("Usage: KafkaWordCount <master> <hostname> <port> <group> <topics> <numThreads>")
System.exit(1)
}
- val ssc = args(3) match {
- // Restore the stream from a checkpoint
- case "true" =>
- new StreamingContext("work/checkpoint")
- case _ =>
- val tmp = new StreamingContext(args(0), "KafkaWordCount")
-
- tmp.setBatchDuration(Seconds(2))
- tmp.checkpoint("work/checkpoint", Seconds(10))
-
- val lines = tmp.kafkaStream[String](args(1), args(2).toInt, "test_group", Map("test" -> 1),
- Map(KafkaPartitionKey(0,"test","test_group",0) -> 0l))
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
-
- wordCounts.persist().checkpoint(Seconds(10))
- wordCounts.print()
-
- tmp
- }
+ val Array(master, hostname, port, group, topics, numThreads) = args
+
+ val ssc = new StreamingContext(master, "KafkaWordCount")
+ ssc.checkpoint("checkpoint")
+ ssc.setBatchDuration(Seconds(2))
+
+ val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
+ val lines = ssc.kafkaStream[String](hostname, port.toInt, group, topicpMap)
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
+ wordCounts.print()
+
ssc.start()
+ }
+}
+
+// Produces some random words between 1 and 100.
+object KafkaWordCountProducer {
+
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println("Usage: KafkaWordCountProducer <hostname> <port> <topic> <messagesPerSec> <wordsPerMessage>")
+ System.exit(1)
+ }
+
+ val Array(hostname, port, topic, messagesPerSec, wordsPerMessage) = args
+ // Zookeper connection properties
+ val props = new Properties()
+ props.put("zk.connect", hostname + ":" + port)
+ props.put("serializer.class", "kafka.serializer.StringEncoder")
+
+ val config = new ProducerConfig(props)
+ val producer = new Producer[String, String](config)
+
+ // Send some messages
+ while(true) {
+ val messages = (1 to messagesPerSec.toInt).map { messageNum =>
+ (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ")
+ }.toArray
+ println(messages.mkString(","))
+ val data = new ProducerData[String, String](topic, messages)
+ producer.send(data)
+ Thread.sleep(100)
+ }
}
+
}
diff --git a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala
index 3685d6c666..7c642d4802 100644
--- a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala
@@ -171,8 +171,7 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
groupId, msgAndMetadata.topicInfo.partition.partId)
val offset = msgAndMetadata.topicInfo.getConsumeOffset
offsets.put(key, offset)
- // TODO: Remove Logging
- logInfo("Handled message: " + (key, offset).toString)
+ // logInfo("Handled message: " + (key, offset).toString)
// Keep on handling messages
true
@@ -190,5 +189,5 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
// }
// }
-
+
}