aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorDenny <dennybritz@gmail.com>2012-11-06 09:41:42 -0800
committerDenny <dennybritz@gmail.com>2012-11-06 09:41:42 -0800
commit0c1de43fc7a9fea8629907d5b331e466f18be418 (patch)
tree2db9096256e9bd51159b93662554eb81c535fe5d /streaming/src
parent596154eabe51961733789a18a47067748fb72e8e (diff)
downloadspark-0c1de43fc7a9fea8629907d5b331e466f18be418.tar.gz
spark-0c1de43fc7a9fea8629907d5b331e466f18be418.tar.bz2
spark-0c1de43fc7a9fea8629907d5b331e466f18be418.zip
Working on kafka.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala11
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala27
-rw-r--r--streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala121
3 files changed, 159 insertions, 0 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index b3148eaa97..4a78090597 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -86,6 +86,17 @@ class StreamingContext (
private[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement()
+ def kafkaStream[T: ClassManifest](
+ hostname: String,
+ port: Int,
+ groupId: String,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2
+ ): DStream[T] = {
+ val inputStream = new KafkaInputDStream[T](this, hostname, port, groupId, storageLevel)
+ graph.addInputStream(inputStream)
+ inputStream
+ }
+
def networkTextStream(
hostname: String,
port: Int,
diff --git a/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
new file mode 100644
index 0000000000..3f637150d1
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
@@ -0,0 +1,27 @@
+package spark.streaming.examples
+
+import spark.streaming.{Seconds, StreamingContext, KafkaInputDStream}
+import spark.streaming.StreamingContext._
+import spark.storage.StorageLevel
+
+object KafkaWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ System.err.println("Usage: WordCountNetwork <master> <hostname> <port>")
+ System.exit(1)
+ }
+
+ // Create the context and set the batch size
+ val ssc = new StreamingContext(args(0), "WordCountNetwork")
+ ssc.setBatchDuration(Seconds(2))
+
+ // Create a NetworkInputDStream on target ip:port and count the
+ // words in input stream of \n delimited test (eg. generated by 'nc')
+ val lines = ssc.kafkaStream[String](args(1), args(2).toInt, "test_group")
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ wordCounts.print()
+ ssc.start()
+
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala
new file mode 100644
index 0000000000..427f398237
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala
@@ -0,0 +1,121 @@
+package spark.streaming
+
+import java.nio.ByteBuffer
+import java.util.Properties
+import java.util.concurrent.{ArrayBlockingQueue, Executors}
+import kafka.api.{FetchRequest}
+import kafka.consumer.{Consumer, ConsumerConfig, KafkaStream}
+import kafka.javaapi.consumer.SimpleConsumer
+import kafka.javaapi.message.ByteBufferMessageSet
+import kafka.message.{Message, MessageSet, MessageAndMetadata}
+import kafka.utils.Utils
+import scala.collection.JavaConversions._
+import spark._
+import spark.RDD
+import spark.storage.StorageLevel
+
+
+/**
+ * An input stream that pulls messages form a Kafka Broker.
+ */
+class KafkaInputDStream[T: ClassManifest](
+ @transient ssc_ : StreamingContext,
+ host: String,
+ port: Int,
+ groupId: String,
+ storageLevel: StorageLevel,
+ timeout: Int = 10000,
+ bufferSize: Int = 1024000
+ ) extends NetworkInputDStream[T](ssc_ ) with Logging {
+
+ def createReceiver(): NetworkReceiver[T] = {
+ new KafkaReceiver(id, host, port, storageLevel, groupId, timeout).asInstanceOf[NetworkReceiver[T]]
+ }
+}
+
+class KafkaReceiver(streamId: Int, host: String, port: Int, storageLevel: StorageLevel, groupId: String, timeout: Int)
+ extends NetworkReceiver[Any](streamId) {
+
+ //var executorPool : = null
+ var blockPushingThread : Thread = null
+
+ def onStop() {
+ blockPushingThread.interrupt()
+ }
+
+ def onStart() {
+
+ val executorPool = Executors.newFixedThreadPool(2)
+
+ logInfo("Starting Kafka Consumer with groupId " + groupId)
+
+ val zooKeeperEndPoint = host + ":" + port
+ logInfo("Connecting to " + zooKeeperEndPoint)
+
+ // Specify some consumer properties
+ val props = new Properties()
+ props.put("zk.connect", zooKeeperEndPoint)
+ props.put("zk.connectiontimeout.ms", timeout.toString)
+ props.put("groupid", groupId)
+
+ // Create the connection to the cluster
+ val consumerConfig = new ConsumerConfig(props)
+ val consumerConnector = Consumer.create(consumerConfig)
+ logInfo("Connected to " + zooKeeperEndPoint)
+ logInfo("")
+ logInfo("")
+
+ // Specify which topics we are listening to
+ val topicCountMap = Map("test" -> 2)
+ val topicMessageStreams = consumerConnector.createMessageStreams(topicCountMap)
+ val streams = topicMessageStreams.get("test")
+
+ // Queue that holds the blocks
+ val queue = new ArrayBlockingQueue[ByteBuffer](2)
+
+ streams.getOrElse(Nil).foreach { stream =>
+ executorPool.submit(new MessageHandler(stream, queue))
+ }
+
+ blockPushingThread = new DaemonThread {
+ override def run() {
+ logInfo("Starting BlockPushingThread.")
+ var nextBlockNumber = 0
+ while (true) {
+ val buffer = queue.take()
+ val blockId = "input-" + streamId + "-" + nextBlockNumber
+ nextBlockNumber += 1
+ pushBlock(blockId, buffer, storageLevel)
+ }
+ }
+ }
+ blockPushingThread.start()
+
+ // while (true) {
+ // // Create a fetch request for topic “test”, partition 0, current offset, and fetch size of 1MB
+ // val fetchRequest = new FetchRequest("test", 0, offset, 1000000)
+
+ // // get the message set from the consumer and print them out
+ // val messages = consumer.fetch(fetchRequest)
+ // for(msg <- messages.iterator) {
+ // logInfo("consumed: " + Utils.toString(msg.message.payload, "UTF-8"))
+ // // advance the offset after consuming each message
+ // offset = msg.offset
+ // queue.put(msg.message.payload)
+ // }
+ // }
+ }
+
+ class MessageHandler(stream: KafkaStream[Message], queue: ArrayBlockingQueue[ByteBuffer]) extends Runnable {
+ def run() {
+ logInfo("Starting MessageHandler.")
+ while(true) {
+ stream.foreach { msgAndMetadata =>
+ logInfo("Consumed: " + Utils.toString(msgAndMetadata.message.payload, "UTF-8"))
+ queue.put(msgAndMetadata.message.payload)
+ }
+ }
+ }
+ }
+
+}