diff options
author | seanm <sean.mcnamara@webtrends.com> | 2013-03-11 17:16:15 -0600 |
---|---|---|
committer | seanm <sean.mcnamara@webtrends.com> | 2013-03-14 23:45:19 -0600 |
commit | cfa8e769a86664722f47182fa572179e8beadcb7 (patch) | |
tree | 57840fd4833cbf0105198815e610b37ecab6a889 /streaming | |
parent | 1c3d98197b120e2a81f59bd9315d3892ef4d24ca (diff) | |
download | spark-cfa8e769a86664722f47182fa572179e8beadcb7.tar.gz spark-cfa8e769a86664722f47182fa572179e8beadcb7.tar.bz2 spark-cfa8e769a86664722f47182fa572179e8beadcb7.zip |
KafkaInputDStream improvements. Allows more Kafka configurability
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/StreamingContext.scala | 22 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala | 48 |
2 files changed, 51 insertions, 19 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 25c67b279b..4e1732adf5 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -199,7 +199,7 @@ class StreamingContext private ( } /** - * Create an input stream that pulls messages form a Kafka Broker. + * Create an input stream that pulls messages from a Kafka Broker. * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). * @param groupId The group id for this consumer. * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed @@ -216,7 +216,25 @@ class StreamingContext private ( initialOffsets: Map[KafkaPartitionKey, Long] = Map[KafkaPartitionKey, Long](), storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2 ): DStream[T] = { - val inputStream = new KafkaInputDStream[T](this, zkQuorum, groupId, topics, initialOffsets, storageLevel) + val kafkaParams = Map[String, String]("zk.connect" -> zkQuorum, "groupid" -> groupId, "zk.connectiontimeout.ms" -> "10000"); + kafkaStream[T](kafkaParams, topics, initialOffsets, storageLevel) + } + + /** + * Create an input stream that pulls messages from a Kafka Broker. + * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param initialOffsets Optional initial offsets for each of the partitions to consume. + * @param storageLevel Storage level to use for storing the received objects + */ + def kafkaStream[T: ClassManifest]( + kafkaParams: Map[String, String], + topics: Map[String, Int], + initialOffsets: Map[KafkaPartitionKey, Long], + storageLevel: StorageLevel + ): DStream[T] = { + val inputStream = new KafkaInputDStream[T](this, kafkaParams, topics, initialOffsets, storageLevel) registerInputStream(inputStream) inputStream } diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala index dc7139cc27..f769fc1cc3 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -12,6 +12,8 @@ import kafka.message.{Message, MessageSet, MessageAndMetadata} import kafka.serializer.StringDecoder import kafka.utils.{Utils, ZKGroupTopicDirs} import kafka.utils.ZkUtils._ +import kafka.utils.ZKStringSerializer +import org.I0Itec.zkclient._ import scala.collection.mutable.HashMap import scala.collection.JavaConversions._ @@ -23,8 +25,7 @@ case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, part /** * Input stream that pulls messages from a Kafka Broker. * - * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). - * @param groupId The group id for this consumer. + * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. * @param initialOffsets Optional initial offsets for each of the partitions to consume. @@ -34,8 +35,7 @@ case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, part private[streaming] class KafkaInputDStream[T: ClassManifest]( @transient ssc_ : StreamingContext, - zkQuorum: String, - groupId: String, + kafkaParams: Map[String, String], topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long], storageLevel: StorageLevel @@ -43,19 +43,16 @@ class KafkaInputDStream[T: ClassManifest]( def getReceiver(): NetworkReceiver[T] = { - new KafkaReceiver(zkQuorum, groupId, topics, initialOffsets, storageLevel) + new KafkaReceiver(kafkaParams, topics, initialOffsets, storageLevel) .asInstanceOf[NetworkReceiver[T]] } } private[streaming] -class KafkaReceiver(zkQuorum: String, groupId: String, +class KafkaReceiver(kafkaParams: Map[String, String], topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long], storageLevel: StorageLevel) extends NetworkReceiver[Any] { - // Timeout for establishing a connection to Zookeper in ms. - val ZK_TIMEOUT = 10000 - // Handles pushing data into the BlockManager lazy protected val blockGenerator = new BlockGenerator(storageLevel) // Connection to Kafka @@ -72,20 +69,24 @@ class KafkaReceiver(zkQuorum: String, groupId: String, // In case we are using multiple Threads to handle Kafka Messages val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) - logInfo("Starting Kafka Consumer Stream with group: " + groupId) + logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("groupid")) logInfo("Initial offsets: " + initialOffsets.toString) - // Zookeper connection properties + // Kafka connection properties val props = new Properties() - props.put("zk.connect", zkQuorum) - props.put("zk.connectiontimeout.ms", ZK_TIMEOUT.toString) - props.put("groupid", groupId) + kafkaParams.foreach(param => props.put(param._1, param._2)) // Create the connection to the cluster - logInfo("Connecting to Zookeper: " + zkQuorum) + logInfo("Connecting to Zookeper: " + kafkaParams("zk.connect")) val consumerConfig = new ConsumerConfig(props) consumerConnector = Consumer.create(consumerConfig).asInstanceOf[ZookeeperConsumerConnector] - logInfo("Connected to " + zkQuorum) + logInfo("Connected to " + kafkaParams("zk.connect")) + + // When autooffset.reset is 'smallest', it is our responsibility to try and whack the + // consumer group zk node. + if (kafkaParams.get("autooffset.reset").exists(_ == "smallest")) { + tryZookeeperConsumerGroupCleanup(kafkaParams("zk.connect"), kafkaParams("groupid")) + } // If specified, set the topic offset setOffsets(initialOffsets) @@ -97,7 +98,6 @@ class KafkaReceiver(zkQuorum: String, groupId: String, topicMessageStreams.values.foreach { streams => streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } } - } // Overwrites the offets in Zookeper. @@ -122,4 +122,18 @@ class KafkaReceiver(zkQuorum: String, groupId: String, } } } + + // Handles cleanup of consumer group znode. Lifted with love from Kafka's + // ConsumerConsole.scala tryCleanupZookeeper() + private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) { + try { + val dir = "/consumers/" + groupId + logInfo("Cleaning up temporary zookeeper data under " + dir + ".") + val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) + zk.deleteRecursive(dir) + zk.close() + } catch { + case _ => // swallow + } + } } |