From 0c1de43fc7a9fea8629907d5b331e466f18be418 Mon Sep 17 00:00:00 2001 From: Denny Date: Tue, 6 Nov 2012 09:41:42 -0800 Subject: Working on kafka. --- .../scala/spark/streaming/StreamingContext.scala | 11 ++ .../spark/streaming/examples/KafkaWordCount.scala | 27 +++++ .../spark/streaming/input/KafkaInputDStream.scala | 121 +++++++++++++++++++++ 3 files changed, 159 insertions(+) create mode 100644 streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala create mode 100644 streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala (limited to 'streaming/src') diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index b3148eaa97..4a78090597 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -86,6 +86,17 @@ class StreamingContext ( private[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement() + def kafkaStream[T: ClassManifest]( + hostname: String, + port: Int, + groupId: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2 + ): DStream[T] = { + val inputStream = new KafkaInputDStream[T](this, hostname, port, groupId, storageLevel) + graph.addInputStream(inputStream) + inputStream + } + def networkTextStream( hostname: String, port: Int, diff --git a/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala new file mode 100644 index 0000000000..3f637150d1 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala @@ -0,0 +1,27 @@ +package spark.streaming.examples + +import spark.streaming.{Seconds, StreamingContext, KafkaInputDStream} +import spark.streaming.StreamingContext._ +import spark.storage.StorageLevel + +object KafkaWordCount { + def main(args: Array[String]) { + if (args.length < 2) { + System.err.println("Usage: WordCountNetwork ") + System.exit(1) + } + + // Create the context and set the batch size + val ssc = new StreamingContext(args(0), "WordCountNetwork") + ssc.setBatchDuration(Seconds(2)) + + // Create a NetworkInputDStream on target ip:port and count the + // words in input stream of \n delimited test (eg. generated by 'nc') + val lines = ssc.kafkaStream[String](args(1), args(2).toInt, "test_group") + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.print() + ssc.start() + + } +} diff --git a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala new file mode 100644 index 0000000000..427f398237 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala @@ -0,0 +1,121 @@ +package spark.streaming + +import java.nio.ByteBuffer +import java.util.Properties +import java.util.concurrent.{ArrayBlockingQueue, Executors} +import kafka.api.{FetchRequest} +import kafka.consumer.{Consumer, ConsumerConfig, KafkaStream} +import kafka.javaapi.consumer.SimpleConsumer +import kafka.javaapi.message.ByteBufferMessageSet +import kafka.message.{Message, MessageSet, MessageAndMetadata} +import kafka.utils.Utils +import scala.collection.JavaConversions._ +import spark._ +import spark.RDD +import spark.storage.StorageLevel + + +/** + * An input stream that pulls messages form a Kafka Broker. + */ +class KafkaInputDStream[T: ClassManifest]( + @transient ssc_ : StreamingContext, + host: String, + port: Int, + groupId: String, + storageLevel: StorageLevel, + timeout: Int = 10000, + bufferSize: Int = 1024000 + ) extends NetworkInputDStream[T](ssc_ ) with Logging { + + def createReceiver(): NetworkReceiver[T] = { + new KafkaReceiver(id, host, port, storageLevel, groupId, timeout).asInstanceOf[NetworkReceiver[T]] + } +} + +class KafkaReceiver(streamId: Int, host: String, port: Int, storageLevel: StorageLevel, groupId: String, timeout: Int) + extends NetworkReceiver[Any](streamId) { + + //var executorPool : = null + var blockPushingThread : Thread = null + + def onStop() { + blockPushingThread.interrupt() + } + + def onStart() { + + val executorPool = Executors.newFixedThreadPool(2) + + logInfo("Starting Kafka Consumer with groupId " + groupId) + + val zooKeeperEndPoint = host + ":" + port + logInfo("Connecting to " + zooKeeperEndPoint) + + // Specify some consumer properties + val props = new Properties() + props.put("zk.connect", zooKeeperEndPoint) + props.put("zk.connectiontimeout.ms", timeout.toString) + props.put("groupid", groupId) + + // Create the connection to the cluster + val consumerConfig = new ConsumerConfig(props) + val consumerConnector = Consumer.create(consumerConfig) + logInfo("Connected to " + zooKeeperEndPoint) + logInfo("") + logInfo("") + + // Specify which topics we are listening to + val topicCountMap = Map("test" -> 2) + val topicMessageStreams = consumerConnector.createMessageStreams(topicCountMap) + val streams = topicMessageStreams.get("test") + + // Queue that holds the blocks + val queue = new ArrayBlockingQueue[ByteBuffer](2) + + streams.getOrElse(Nil).foreach { stream => + executorPool.submit(new MessageHandler(stream, queue)) + } + + blockPushingThread = new DaemonThread { + override def run() { + logInfo("Starting BlockPushingThread.") + var nextBlockNumber = 0 + while (true) { + val buffer = queue.take() + val blockId = "input-" + streamId + "-" + nextBlockNumber + nextBlockNumber += 1 + pushBlock(blockId, buffer, storageLevel) + } + } + } + blockPushingThread.start() + + // while (true) { + // // Create a fetch request for topic “test”, partition 0, current offset, and fetch size of 1MB + // val fetchRequest = new FetchRequest("test", 0, offset, 1000000) + + // // get the message set from the consumer and print them out + // val messages = consumer.fetch(fetchRequest) + // for(msg <- messages.iterator) { + // logInfo("consumed: " + Utils.toString(msg.message.payload, "UTF-8")) + // // advance the offset after consuming each message + // offset = msg.offset + // queue.put(msg.message.payload) + // } + // } + } + + class MessageHandler(stream: KafkaStream[Message], queue: ArrayBlockingQueue[ByteBuffer]) extends Runnable { + def run() { + logInfo("Starting MessageHandler.") + while(true) { + stream.foreach { msgAndMetadata => + logInfo("Consumed: " + Utils.toString(msgAndMetadata.message.payload, "UTF-8")) + queue.put(msgAndMetadata.message.payload) + } + } + } + } + +} -- cgit v1.2.3