From 56b7fbafa2b7717896c613e39ecc134f2405b4c6 Mon Sep 17 00:00:00 2001 From: seanm Date: Fri, 18 Jan 2013 21:15:54 -0700 Subject: further KafkaInputDStream cleanup (removing unused and commented out code relating to offset management) --- .../streaming/dstream/KafkaInputDStream.scala | 72 +--------------------- 1 file changed, 3 insertions(+), 69 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala index 9605072382..533c91ee95 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -19,15 +19,6 @@ import scala.collection.JavaConversions._ // Key for a specific Kafka Partition: (broker, topic, group, part) case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int) -// NOT USED - Originally intended for fault-tolerance -// Metadata for a Kafka Stream that it sent to the Master -private[streaming] -case class KafkaInputDStreamMetadata(timestamp: Long, data: Map[KafkaPartitionKey, Long]) -// NOT USED - Originally intended for fault-tolerance -// Checkpoint data specific to a KafkaInputDstream -private[streaming] -case class KafkaDStreamCheckpointData(kafkaRdds: HashMap[Time, Any], - savedOffsets: Map[KafkaPartitionKey, Long]) extends DStreamCheckpointData(kafkaRdds) /** * Input stream that pulls messages from a Kafka Broker. @@ -52,49 +43,6 @@ class KafkaInputDStream[T: ClassManifest]( storageLevel: StorageLevel ) extends NetworkInputDStream[T](ssc_ ) with Logging { - // Metadata that keeps track of which messages have already been consumed. - var savedOffsets = HashMap[Long, Map[KafkaPartitionKey, Long]]() - - /* NOT USED - Originally intended for fault-tolerance - - // In case of a failure, the offets for a particular timestamp will be restored. - @transient var restoredOffsets : Map[KafkaPartitionKey, Long] = null - - - override protected[streaming] def addMetadata(metadata: Any) { - metadata match { - case x : KafkaInputDStreamMetadata => - savedOffsets(x.timestamp) = x.data - // TOOD: Remove logging - logInfo("New saved Offsets: " + savedOffsets) - case _ => logInfo("Received unknown metadata: " + metadata.toString) - } - } - - override protected[streaming] def updateCheckpointData(currentTime: Time) { - super.updateCheckpointData(currentTime) - if(savedOffsets.size > 0) { - // Find the offets that were stored before the checkpoint was initiated - val key = savedOffsets.keys.toList.sortWith(_ < _).filter(_ < currentTime.millis).last - val latestOffsets = savedOffsets(key) - logInfo("Updating KafkaDStream checkpoint data: " + latestOffsets.toString) - checkpointData = KafkaDStreamCheckpointData(checkpointData.rdds, latestOffsets) - // TODO: This may throw out offsets that are created after the checkpoint, - // but it's unlikely we'll need them. - savedOffsets.clear() - } - } - - override protected[streaming] def restoreCheckpointData() { - super.restoreCheckpointData() - logInfo("Restoring KafkaDStream checkpoint data.") - checkpointData match { - case x : KafkaDStreamCheckpointData => - restoredOffsets = x.savedOffsets - logInfo("Restored KafkaDStream offsets: " + savedOffsets) - } - } */ - def createReceiver(): NetworkReceiver[T] = { new KafkaReceiver(host, port, groupId, topics, initialOffsets, storageLevel) .asInstanceOf[NetworkReceiver[T]] @@ -111,8 +59,6 @@ class KafkaReceiver(host: String, port: Int, groupId: String, // Handles pushing data into the BlockManager lazy protected val blockGenerator = new BlockGenerator(storageLevel) - // Keeps track of the current offsets. Maps from (broker, topic, group, part) -> Offset - lazy val offsets = HashMap[KafkaPartitionKey, Long]() // Connection to Kafka var consumerConnector : ZookeeperConsumerConnector = null @@ -143,8 +89,8 @@ class KafkaReceiver(host: String, port: Int, groupId: String, consumerConnector = Consumer.create(consumerConfig).asInstanceOf[ZookeeperConsumerConnector] logInfo("Connected to " + zooKeeperEndPoint) - // Reset the Kafka offsets in case we are recovering from a failure - resetOffsets(initialOffsets) + // If specified, set the topic offset + setOffsets(initialOffsets) // Create Threads for each Topic/Message Stream we are listening val topicMessageStreams = consumerConnector.createMessageStreams(topics, new StringDecoder()) @@ -157,7 +103,7 @@ class KafkaReceiver(host: String, port: Int, groupId: String, } // Overwrites the offets in Zookeper. - private def resetOffsets(offsets: Map[KafkaPartitionKey, Long]) { + private def setOffsets(offsets: Map[KafkaPartitionKey, Long]) { offsets.foreach { case(key, offset) => val topicDirs = new ZKGroupTopicDirs(key.groupId, key.topic) val partitionName = key.brokerId + "-" + key.partId @@ -178,16 +124,4 @@ class KafkaReceiver(host: String, port: Int, groupId: String, } } } - - // NOT USED - Originally intended for fault-tolerance - // class KafkaDataHandler(receiver: KafkaReceiver, storageLevel: StorageLevel) - // extends BufferingBlockCreator[Any](receiver, storageLevel) { - - // override def createBlock(blockId: String, iterator: Iterator[Any]) : Block = { - // // Creates a new Block with Kafka-specific Metadata - // new Block(blockId, iterator, KafkaInputDStreamMetadata(System.currentTimeMillis, offsets.toMap)) - // } - - // } - } -- cgit v1.2.3