aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorDenny <dennybritz@gmail.com>2012-11-11 11:06:49 -0800
committerDenny <dennybritz@gmail.com>2012-11-11 11:06:49 -0800
commitd006109e9504b3221de3a15f9bfee96dafa8b593 (patch)
treec8a75fd43c77972693cd6e800b653b16ee840220 /streaming/src
parent2e8f2ee4adbe078a690dbadfa1dbd74fdc824d89 (diff)
downloadspark-d006109e9504b3221de3a15f9bfee96dafa8b593.tar.gz
spark-d006109e9504b3221de3a15f9bfee96dafa8b593.tar.bz2
spark-d006109e9504b3221de3a15f9bfee96dafa8b593.zip
Kafka Stream comments.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala7
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala12
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala44
-rw-r--r--streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala77
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala12
5 files changed, 99 insertions, 53 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 3219919a24..b8324d11a3 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -17,6 +17,7 @@ import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
+
case class DStreamCheckpointData(rdds: HashMap[Time, Any])
abstract class DStream[T: ClassManifest] (@transient var ssc: StreamingContext)
@@ -61,7 +62,7 @@ extends Serializable with Logging {
// Checkpoint details
protected[streaming] val mustCheckpoint = false
protected[streaming] var checkpointInterval: Time = null
- protected[streaming] var checkpointData = DStreamCheckpointData(HashMap[Time, Any]())
+ protected[streaming] var checkpointData = new DStreamCheckpointData(HashMap[Time, Any]())
// Reference to whole DStream graph
protected[streaming] var graph: DStreamGraph = null
@@ -286,7 +287,9 @@ extends Serializable with Logging {
* This methd should be overwritten by sublcasses of InputDStream.
*/
protected[streaming] def addMetadata(metadata: Any) {
- logInfo("Dropping Metadata: " + metadata.toString)
+ if (metadata != null) {
+ logInfo("Dropping Metadata: " + metadata.toString)
+ }
}
/**
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index d68d2632e7..e87d0cb7c8 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -102,6 +102,18 @@ final class StreamingContext (
private[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement()
+ /**
+ * Create an input stream that pulls messages form a Kafka Broker.
+ *
+ * @param host Zookeper hostname.
+ * @param port Zookeper port.
+ * @param groupId The group id for this consumer.
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param initialOffsets Optional initial offsets for each of the partitions to consume.
+ * By default the value is pulled from zookeper.
+ * @param storageLevel RDD storage level. Defaults to memory-only.
+ */
def kafkaStream[T: ClassManifest](
hostname: String,
port: Int,
diff --git a/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
index 655f9627b3..1e92cbb210 100644
--- a/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
@@ -3,34 +3,38 @@ package spark.streaming.examples
import spark.streaming._
import spark.streaming.StreamingContext._
import spark.storage.StorageLevel
+import WordCount2_ExtraFunctions._
object KafkaWordCount {
def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println("Usage: WordCountNetwork <master> <hostname> <port>")
+
+ if (args.length < 4) {
+ System.err.println("Usage: KafkaWordCount <master> <hostname> <port> <restore>")
System.exit(1)
}
- // Create the context and set the batch size
- val ssc = new StreamingContext(args(0), "WordCountNetwork")
- ssc.setBatchDuration(Seconds(2))
+ val ssc = args(3) match {
+ // Restore the stream from a checkpoint
+ case "true" =>
+ new StreamingContext("work/checkpoint")
+ case _ =>
+ val tmp = new StreamingContext(args(0), "KafkaWordCount")
- // Create a NetworkInputDStream on target ip:port and count the
- // words in input stream of \n delimited test (eg. generated by 'nc')
- ssc.checkpoint("checkpoint", Time(1000 * 5))
- val lines = ssc.kafkaStream[String](args(1), args(2).toInt, "test_group", Map("test" -> 1),
- Map(KafkaPartitionKey(0, "test", "test_group", 0) -> 2382))
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.print()
+ tmp.setBatchDuration(Seconds(2))
+ tmp.checkpoint("work/checkpoint", Seconds(10))
+
+ val lines = tmp.kafkaStream[String](args(1), args(2).toInt, "test_group", Map("test" -> 1),
+ Map(KafkaPartitionKey(0,"test","test_group",0) -> 0l))
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
+
+ wordCounts.persist().checkpoint(Seconds(10))
+ wordCounts.print()
+
+ tmp
+ }
ssc.start()
- // Wait for 12 seconds
- Thread.sleep(12000)
- ssc.stop()
-
- val newSsc = new StreamingContext("checkpoint")
- newSsc.start()
-
}
}
+
diff --git a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala
index 814f2706d6..ad8e86a094 100644
--- a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala
@@ -1,15 +1,11 @@
package spark.streaming
-import java.lang.reflect.Method
-import java.nio.ByteBuffer
import java.util.Properties
-import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, Executors}
-import kafka.api.{FetchRequest}
+import java.util.concurrent.Executors
import kafka.consumer._
-import kafka.cluster.Partition
import kafka.message.{Message, MessageSet, MessageAndMetadata}
import kafka.serializer.StringDecoder
-import kafka.utils.{Pool, Utils, ZKGroupTopicDirs}
+import kafka.utils.{Utils, ZKGroupTopicDirs}
import kafka.utils.ZkUtils._
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
@@ -17,14 +13,25 @@ import spark._
import spark.RDD
import spark.storage.StorageLevel
-
+// Key for a specific Kafka Partition: (broker, topic, group, part)
case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int)
+// Metadata for a Kafka Stream that it sent to the Master
case class KafkaInputDStreamMetadata(timestamp: Long, data: Map[KafkaPartitionKey, Long])
+// Checkpoint data specific to a KafkaInputDstream
case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any],
- savedOffsets: HashMap[Long, Map[KafkaPartitionKey, Long]]) extends DStreamCheckpointData(kafkaRdds)
+ savedOffsets: Map[KafkaPartitionKey, Long]) extends DStreamCheckpointData(kafkaRdds)
/**
* Input stream that pulls messages form a Kafka Broker.
+ *
+ * @param host Zookeper hostname.
+ * @param port Zookeper port.
+ * @param groupId The group id for this consumer.
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param initialOffsets Optional initial offsets for each of the partitions to consume.
+ * By default the value is pulled from zookeper.
+ * @param storageLevel RDD storage level.
*/
class KafkaInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
@@ -36,21 +43,31 @@ class KafkaInputDStream[T: ClassManifest](
storageLevel: StorageLevel
) extends NetworkInputDStream[T](ssc_ ) with Logging {
+ // Metadata that keeps track of which messages have already been consumed.
var savedOffsets = HashMap[Long, Map[KafkaPartitionKey, Long]]()
+ // In case of a failure, the offets for a particular timestamp will be restored.
+ @transient var restoredOffsets : Map[KafkaPartitionKey, Long] = null
override protected[streaming] def addMetadata(metadata: Any) {
metadata match {
- case x : KafkaInputDStreamMetadata =>
+ case x : KafkaInputDStreamMetadata =>
savedOffsets(x.timestamp) = x.data
- logInfo("Saved Offsets: " + savedOffsets)
+ // TOOD: Remove logging
+ logInfo("New saved Offsets: " + savedOffsets)
case _ => logInfo("Received unknown metadata: " + metadata.toString)
}
}
override protected[streaming] def updateCheckpointData(currentTime: Time) {
super.updateCheckpointData(currentTime)
- logInfo("Updating KafkaDStream checkpoint data: " + savedOffsets.toString)
- checkpointData = KafkaDStreamCheckpointData(checkpointData.rdds, savedOffsets)
+ if(savedOffsets.size > 0) {
+ // Find the offets that were stored before the checkpoint was initiated
+ val key = savedOffsets.keys.toList.sortWith(_ < _).filter(_ < currentTime.millis).last
+ val latestOffsets = savedOffsets(key)
+ logInfo("Updating KafkaDStream checkpoint data: " + latestOffsets.toString)
+ checkpointData = KafkaDStreamCheckpointData(checkpointData.rdds, latestOffsets)
+ savedOffsets.clear()
+ }
}
override protected[streaming] def restoreCheckpointData() {
@@ -58,14 +75,21 @@ class KafkaInputDStream[T: ClassManifest](
logInfo("Restoring KafkaDStream checkpoint data.")
checkpointData match {
case x : KafkaDStreamCheckpointData =>
- savedOffsets = x.savedOffsets
- logInfo("Restored KafkaDStream offsets: " + savedOffsets.toString)
+ restoredOffsets = x.savedOffsets
+ logInfo("Restored KafkaDStream offsets: " + savedOffsets)
}
}
def createReceiver(): NetworkReceiver[T] = {
- new KafkaReceiver(id, host, port, groupId, topics, initialOffsets, storageLevel)
- .asInstanceOf[NetworkReceiver[T]]
+ // We have restored from a checkpoint, use the restored offsets
+ if (restoredOffsets != null) {
+ new KafkaReceiver(id, host, port, groupId, topics, restoredOffsets, storageLevel)
+ .asInstanceOf[NetworkReceiver[T]]
+ } else {
+ new KafkaReceiver(id, host, port, groupId, topics, initialOffsets, storageLevel)
+ .asInstanceOf[NetworkReceiver[T]]
+ }
+
}
}
@@ -96,27 +120,28 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
val zooKeeperEndPoint = host + ":" + port
- logInfo("Starting Kafka Consumer Stream in group " + groupId)
+ logInfo("Starting Kafka Consumer Stream with group: " + groupId)
logInfo("Initial offsets: " + initialOffsets.toString)
- logInfo("Connecting to " + zooKeeperEndPoint)
- // Specify some Consumer properties
+
+ // Zookeper connection properties
val props = new Properties()
props.put("zk.connect", zooKeeperEndPoint)
props.put("zk.connectiontimeout.ms", ZK_TIMEOUT.toString)
props.put("groupid", groupId)
// Create the connection to the cluster
+ logInfo("Connecting to Zookeper: " + zooKeeperEndPoint)
val consumerConfig = new ConsumerConfig(props)
consumerConnector = Consumer.create(consumerConfig).asInstanceOf[ZookeeperConsumerConnector]
+ logInfo("Connected to " + zooKeeperEndPoint)
// Reset the Kafka offsets in case we are recovering from a failure
resetOffsets(initialOffsets)
-
- logInfo("Connected to " + zooKeeperEndPoint)
// Create Threads for each Topic/Message Stream we are listening
val topicMessageStreams = consumerConnector.createMessageStreams(topics, new StringDecoder())
+ // Start the messages handler for each partition
topicMessageStreams.values.foreach { streams =>
streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
}
@@ -133,19 +158,20 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
}
}
- // Responsible for handling Kafka Messages
- class MessageHandler(stream: KafkaStream[String]) extends Runnable {
+ // Handles Kafka Messages
+ private class MessageHandler(stream: KafkaStream[String]) extends Runnable {
def run() {
logInfo("Starting MessageHandler.")
stream.takeWhile { msgAndMetadata =>
dataHandler += msgAndMetadata.message
- // Updating the offet. The key is (topic, partitionID).
+ // Updating the offet. The key is (broker, topic, group, partition).
val key = KafkaPartitionKey(msgAndMetadata.topicInfo.brokerId, msgAndMetadata.topic,
groupId, msgAndMetadata.topicInfo.partition.partId)
val offset = msgAndMetadata.topicInfo.getConsumeOffset
offsets.put(key, offset)
- logInfo((key, offset).toString)
+ // TODO: Remove Logging
+ logInfo("Handled message: " + (key, offset).toString)
// Keep on handling messages
true
@@ -157,6 +183,7 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
extends DataHandler[Any](receiver, storageLevel) {
override def createBlock(blockId: String, iterator: Iterator[Any]) : Block = {
+ // Creates a new Block with Kafka-specific Metadata
new Block(blockId, iterator, KafkaInputDStreamMetadata(System.currentTimeMillis, offsets.toMap))
}
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index 038827ddb0..0450120061 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -59,9 +59,9 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// then check whether some RDD has been checkpointed or not
ssc.start()
runStreamsWithRealDelay(ssc, firstNumBatches)
- logInfo("Checkpoint data of state stream = \n[" + stateStream.checkpointData.mkString(",\n") + "]")
- assert(!stateStream.checkpointData.isEmpty, "No checkpointed RDDs in state stream before first failure")
- stateStream.checkpointData.foreach {
+ logInfo("Checkpoint data of state stream = \n[" + stateStream.checkpointData.rdds.mkString(",\n") + "]")
+ assert(!stateStream.checkpointData.rdds.isEmpty, "No checkpointed RDDs in state stream before first failure")
+ stateStream.checkpointData.rdds.foreach {
case (time, data) => {
val file = new File(data.toString)
assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " for state stream before first failure does not exist")
@@ -70,7 +70,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run till a further time such that previous checkpoint files in the stream would be deleted
// and check whether the earlier checkpoint files are deleted
- val checkpointFiles = stateStream.checkpointData.map(x => new File(x._2.toString))
+ val checkpointFiles = stateStream.checkpointData.rdds.map(x => new File(x._2.toString))
runStreamsWithRealDelay(ssc, secondNumBatches)
checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
ssc.stop()
@@ -87,8 +87,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// is present in the checkpoint data or not
ssc.start()
runStreamsWithRealDelay(ssc, 1)
- assert(!stateStream.checkpointData.isEmpty, "No checkpointed RDDs in state stream before second failure")
- stateStream.checkpointData.foreach {
+ assert(!stateStream.checkpointData.rdds.isEmpty, "No checkpointed RDDs in state stream before second failure")
+ stateStream.checkpointData.rdds.foreach {
case (time, data) => {
val file = new File(data.toString)
assert(file.exists(),