From b61a4ec77300d6e7fb40f771a9054ae8bc4488de Mon Sep 17 00:00:00 2001 From: seanm Date: Mon, 14 Jan 2013 17:13:10 -0700 Subject: Removing offset management code that is non-existent in kafka 0.7.0+ --- .../src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala | 7 ------- 1 file changed, 7 deletions(-) (limited to 'streaming/src/main') diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala index 2b4740bdf7..9605072382 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -173,13 +173,6 @@ class KafkaReceiver(host: String, port: Int, groupId: String, stream.takeWhile { msgAndMetadata => blockGenerator += msgAndMetadata.message - // Updating the offet. The key is (broker, topic, group, partition). - val key = KafkaPartitionKey(msgAndMetadata.topicInfo.brokerId, msgAndMetadata.topic, - groupId, msgAndMetadata.topicInfo.partition.partId) - val offset = msgAndMetadata.topicInfo.getConsumeOffset - offsets.put(key, offset) - // logInfo("Handled message: " + (key, offset).toString) - // Keep on handling messages true } -- cgit v1.2.3 From c203a292963a018bd9b84f02bb522fd191a110af Mon Sep 17 00:00:00 2001 From: seanm Date: Mon, 14 Jan 2013 17:22:03 -0700 Subject: StateDStream changes to give updateStateByKey consistent behavior --- .../main/scala/spark/streaming/dstream/StateDStream.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) (limited to 'streaming/src/main') diff --git a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala index a1ec2f5454..4e57968eed 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/StateDStream.scala @@ -48,8 +48,16 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife //logDebug("Generating state RDD for time " + validTime) return Some(stateRDD) } - case None => { // If parent RDD does not exist, then return old state RDD - return Some(prevStateRDD) + case None => { // If parent RDD does not exist + + // Re-apply the update function to the old state RDD + val updateFuncLocal = updateFunc + val finalFunc = (iterator: Iterator[(K, S)]) => { + val i = iterator.map(t => (t._1, Seq[V](), Option(t._2))) + updateFuncLocal(i) + } + val stateRDD = prevStateRDD.mapPartitions(finalFunc, preservePartitioning) + return Some(stateRDD) } } } -- cgit v1.2.3 From 56b7fbafa2b7717896c613e39ecc134f2405b4c6 Mon Sep 17 00:00:00 2001 From: seanm Date: Fri, 18 Jan 2013 21:15:54 -0700 Subject: further KafkaInputDStream cleanup (removing unused and commented out code relating to offset management) --- .../streaming/dstream/KafkaInputDStream.scala | 72 +--------------------- 1 file changed, 3 insertions(+), 69 deletions(-) (limited to 'streaming/src/main') diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala index 9605072382..533c91ee95 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -19,15 +19,6 @@ import scala.collection.JavaConversions._ // Key for a specific Kafka Partition: (broker, topic, group, part) case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int) -// NOT USED - Originally intended for fault-tolerance -// Metadata for a Kafka Stream that it sent to the Master -private[streaming] -case class KafkaInputDStreamMetadata(timestamp: Long, data: Map[KafkaPartitionKey, Long]) -// NOT USED - Originally intended for fault-tolerance -// Checkpoint data specific to a KafkaInputDstream -private[streaming] -case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any], - savedOffsets: Map[KafkaPartitionKey, Long]) extends DStreamCheckpointData(kafkaRdds) /** * Input stream that pulls messages from a Kafka Broker. @@ -52,49 +43,6 @@ class KafkaInputDStream[T: ClassManifest]( storageLevel: StorageLevel ) extends NetworkInputDStream[T](ssc_ ) with Logging { - // Metadata that keeps track of which messages have already been consumed. - var savedOffsets = HashMap[Long, Map[KafkaPartitionKey, Long]]() - - /* NOT USED - Originally intended for fault-tolerance - - // In case of a failure, the offets for a particular timestamp will be restored. - @transient var restoredOffsets : Map[KafkaPartitionKey, Long] = null - - - override protected[streaming] def addMetadata(metadata: Any) { - metadata match { - case x : KafkaInputDStreamMetadata => - savedOffsets(x.timestamp) = x.data - // TOOD: Remove logging - logInfo("New saved Offsets: " + savedOffsets) - case _ => logInfo("Received unknown metadata: " + metadata.toString) - } - } - - override protected[streaming] def updateCheckpointData(currentTime: Time) { - super.updateCheckpointData(currentTime) - if(savedOffsets.size > 0) { - // Find the offets that were stored before the checkpoint was initiated - val key = savedOffsets.keys.toList.sortWith(_ < _).filter(_ < currentTime.millis).last - val latestOffsets = savedOffsets(key) - logInfo("Updating KafkaDStream checkpoint data: " + latestOffsets.toString) - checkpointData = KafkaDStreamCheckpointData(checkpointData.rdds, latestOffsets) - // TODO: This may throw out offsets that are created after the checkpoint, - // but it's unlikely we'll need them. - savedOffsets.clear() - } - } - - override protected[streaming] def restoreCheckpointData() { - super.restoreCheckpointData() - logInfo("Restoring KafkaDStream checkpoint data.") - checkpointData match { - case x : KafkaDStreamCheckpointData => - restoredOffsets = x.savedOffsets - logInfo("Restored KafkaDStream offsets: " + savedOffsets) - } - } */ - def createReceiver(): NetworkReceiver[T] = { new KafkaReceiver(host, port, groupId, topics, initialOffsets, storageLevel) .asInstanceOf[NetworkReceiver[T]] @@ -111,8 +59,6 @@ class KafkaReceiver(host: String, port: Int, groupId: String, // Handles pushing data into the BlockManager lazy protected val blockGenerator = new BlockGenerator(storageLevel) - // Keeps track of the current offsets. Maps from (broker, topic, group, part) -> Offset - lazy val offsets = HashMap[KafkaPartitionKey, Long]() // Connection to Kafka var consumerConnector : ZookeeperConsumerConnector = null @@ -143,8 +89,8 @@ class KafkaReceiver(host: String, port: Int, groupId: String, consumerConnector = Consumer.create(consumerConfig).asInstanceOf[ZookeeperConsumerConnector] logInfo("Connected to " + zooKeeperEndPoint) - // Reset the Kafka offsets in case we are recovering from a failure - resetOffsets(initialOffsets) + // If specified, set the topic offset + setOffsets(initialOffsets) // Create Threads for each Topic/Message Stream we are listening val topicMessageStreams = consumerConnector.createMessageStreams(topics, new StringDecoder()) @@ -157,7 +103,7 @@ class KafkaReceiver(host: String, port: Int, groupId: String, } // Overwrites the offets in Zookeper. - private def resetOffsets(offsets: Map[KafkaPartitionKey, Long]) { + private def setOffsets(offsets: Map[KafkaPartitionKey, Long]) { offsets.foreach { case(key, offset) => val topicDirs = new ZKGroupTopicDirs(key.groupId, key.topic) val partitionName = key.brokerId + "-" + key.partId @@ -178,16 +124,4 @@ class KafkaReceiver(host: String, port: Int, groupId: String, } } } - - // NOT USED - Originally intended for fault-tolerance - // class KafkaDataHandler(receiver: KafkaReceiver, storageLevel: StorageLevel) - // extends BufferingBlockCreator[Any](receiver, storageLevel) { - - // override def createBlock(blockId: String, iterator: Iterator[Any]) : Block = { - // // Creates a new Block with Kafka-specific Metadata - // new Block(blockId, iterator, KafkaInputDStreamMetadata(System.currentTimeMillis, offsets.toMap)) - // } - - // } - } -- cgit v1.2.3 From d3064fe70762cbfcb7dbd5e1fbd708539c3de5e9 Mon Sep 17 00:00:00 2001 From: seanm Date: Fri, 18 Jan 2013 21:34:29 -0700 Subject: kafkaStream API cleanup. A quorum of zookeepers can now be specified --- .../scala/spark/streaming/examples/KafkaWordCount.scala | 16 ++++++++-------- .../main/scala/spark/streaming/StreamingContext.scala | 8 +++----- .../spark/streaming/dstream/KafkaInputDStream.scala | 17 +++++++---------- 3 files changed, 18 insertions(+), 23 deletions(-) (limited to 'streaming/src/main') diff --git a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala index fe55db6e2c..65d5da82fc 100644 --- a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala @@ -13,19 +13,19 @@ import spark.streaming.util.RawTextHelper._ object KafkaWordCount { def main(args: Array[String]) { - if (args.length < 6) { - System.err.println("Usage: KafkaWordCount ") + if (args.length < 5) { + System.err.println("Usage: KafkaWordCount ") System.exit(1) } - val Array(master, hostname, port, group, topics, numThreads) = args + val Array(master, zkQuorum, group, topics, numThreads) = args val sc = new SparkContext(master, "KafkaWordCount") val ssc = new StreamingContext(sc, Seconds(2)) ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap - val lines = ssc.kafkaStream[String](hostname, port.toInt, group, topicpMap) + val lines = ssc.kafkaStream[String](zkQuorum, group, topicpMap) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2) wordCounts.print() @@ -38,16 +38,16 @@ object KafkaWordCount { object KafkaWordCountProducer { def main(args: Array[String]) { - if (args.length < 3) { - System.err.println("Usage: KafkaWordCountProducer ") + if (args.length < 2) { + System.err.println("Usage: KafkaWordCountProducer ") System.exit(1) } - val Array(hostname, port, topic, messagesPerSec, wordsPerMessage) = args + val Array(zkQuorum, topic, messagesPerSec, wordsPerMessage) = args // Zookeper connection properties val props = new Properties() - props.put("zk.connect", hostname + ":" + port) + props.put("zk.connect", zkQuorum) props.put("serializer.class", "kafka.serializer.StringEncoder") val config = new ProducerConfig(props) diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 14500bdcb1..06cf7a06ed 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -136,8 +136,7 @@ class StreamingContext private ( /** * Create an input stream that pulls messages form a Kafka Broker. - * @param hostname Zookeper hostname. - * @param port Zookeper port. + * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). * @param groupId The group id for this consumer. * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. @@ -146,14 +145,13 @@ class StreamingContext private ( * @param storageLevel RDD storage level. Defaults to memory-only. */ def kafkaStream[T: ClassManifest]( - hostname: String, - port: Int, + zkQuorum: String, groupId: String, topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long] = Map[KafkaPartitionKey, Long](), storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2 ): DStream[T] = { - val inputStream = new KafkaInputDStream[T](this, hostname, port, groupId, topics, initialOffsets, storageLevel) + val inputStream = new KafkaInputDStream[T](this, zkQuorum, groupId, topics, initialOffsets, storageLevel) registerInputStream(inputStream) inputStream } diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala index 533c91ee95..4f8c8b9d10 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -23,8 +23,7 @@ case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, part /** * Input stream that pulls messages from a Kafka Broker. * - * @param host Zookeper hostname. - * @param port Zookeper port. + * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). * @param groupId The group id for this consumer. * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. @@ -35,8 +34,7 @@ case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, part private[streaming] class KafkaInputDStream[T: ClassManifest]( @transient ssc_ : StreamingContext, - host: String, - port: Int, + zkQuorum: String, groupId: String, topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long], @@ -44,13 +42,13 @@ class KafkaInputDStream[T: ClassManifest]( ) extends NetworkInputDStream[T](ssc_ ) with Logging { def createReceiver(): NetworkReceiver[T] = { - new KafkaReceiver(host, port, groupId, topics, initialOffsets, storageLevel) + new KafkaReceiver(zkQuorum, groupId, topics, initialOffsets, storageLevel) .asInstanceOf[NetworkReceiver[T]] } } private[streaming] -class KafkaReceiver(host: String, port: Int, groupId: String, +class KafkaReceiver(zkQuorum: String, groupId: String, topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long], storageLevel: StorageLevel) extends NetworkReceiver[Any] { @@ -73,21 +71,20 @@ class KafkaReceiver(host: String, port: Int, groupId: String, // In case we are using multiple Threads to handle Kafka Messages val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) - val zooKeeperEndPoint = host + ":" + port logInfo("Starting Kafka Consumer Stream with group: " + groupId) logInfo("Initial offsets: " + initialOffsets.toString) // Zookeper connection properties val props = new Properties() - props.put("zk.connect", zooKeeperEndPoint) + props.put("zk.connect", zkQuorum) props.put("zk.connectiontimeout.ms", ZK_TIMEOUT.toString) props.put("groupid", groupId) // Create the connection to the cluster - logInfo("Connecting to Zookeper: " + zooKeeperEndPoint) + logInfo("Connecting to Zookeper: " + zkQuorum) val consumerConfig = new ConsumerConfig(props) consumerConnector = Consumer.create(consumerConfig).asInstanceOf[ZookeeperConsumerConnector] - logInfo("Connected to " + zooKeeperEndPoint) + logInfo("Connected to " + zkQuorum) // If specified, set the topic offset setOffsets(initialOffsets) -- cgit v1.2.3 From c0694291c81ad775918421941a80a00ca9593a38 Mon Sep 17 00:00:00 2001 From: seanm Date: Sun, 20 Jan 2013 12:09:45 -0700 Subject: Splitting StreamingContext.queueStream into two methods --- .../scala/spark/streaming/StreamingContext.scala | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) (limited to 'streaming/src/main') diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 14500bdcb1..3cec35cb37 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -283,17 +283,31 @@ class StreamingContext private ( } /** - * Creates a input stream from an queue of RDDs. In each batch, + * Creates an input stream from a queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. * @param queue Queue of RDDs * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval - * @param defaultRDD Default RDD is returned by the DStream when the queue is empty * @tparam T Type of objects in the RDD */ def queueStream[T: ClassManifest]( queue: Queue[RDD[T]], - oneAtATime: Boolean = true, - defaultRDD: RDD[T] = null + oneAtATime: Boolean = true + ): DStream[T] = { + queueStream(queue, oneAtATime, sc.makeRDD(Seq[T](), 1)) + } + + /** + * Creates an input stream from a queue of RDDs. In each batch, + * it will process either one or all of the RDDs returned by the queue. + * @param queue Queue of RDDs + * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval + * @param defaultRDD Default RDD is returned by the DStream when the queue is empty. Set as null if no RDD should be returned when empty + * @tparam T Type of objects in the RDD + */ + def queueStream[T: ClassManifest]( + queue: Queue[RDD[T]], + oneAtATime: Boolean, + defaultRDD: RDD[T] ): DStream[T] = { val inputStream = new QueueInputDStream(this, queue, oneAtATime, defaultRDD) registerInputStream(inputStream) -- cgit v1.2.3