aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-04-09 13:50:50 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-04-09 13:50:50 -0700
commit8ac9efba5a435443be9abf8ebbe867806d42c9db (patch)
tree29b935e38ddce61b05eaf316552787127c698fa6 /streaming
parent1c3d98197b120e2a81f59bd9315d3892ef4d24ca (diff)
parent329ef34c2e04d28c2ad150cf6674d6e86d7511ce (diff)
downloadspark-8ac9efba5a435443be9abf8ebbe867806d42c9db.tar.gz
spark-8ac9efba5a435443be9abf8ebbe867806d42c9db.tar.bz2
spark-8ac9efba5a435443be9abf8ebbe867806d42c9db.zip
Merge pull request #527 from Reinvigorate/sm-kafka-cleanup
KafkaInputDStream fixes and improvements
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala23
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala35
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala81
3 files changed, 59 insertions, 80 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 25c67b279b..bb7f216ca7 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -199,13 +199,11 @@ class StreamingContext private (
}
/**
- * Create an input stream that pulls messages form a Kafka Broker.
+ * Create an input stream that pulls messages from a Kafka Broker.
* @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
- * @param initialOffsets Optional initial offsets for each of the partitions to consume.
- * By default the value is pulled from zookeper.
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
@@ -213,10 +211,25 @@ class StreamingContext private (
zkQuorum: String,
groupId: String,
topics: Map[String, Int],
- initialOffsets: Map[KafkaPartitionKey, Long] = Map[KafkaPartitionKey, Long](),
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2
): DStream[T] = {
- val inputStream = new KafkaInputDStream[T](this, zkQuorum, groupId, topics, initialOffsets, storageLevel)
+ val kafkaParams = Map[String, String]("zk.connect" -> zkQuorum, "groupid" -> groupId, "zk.connectiontimeout.ms" -> "10000");
+ kafkaStream[T](kafkaParams, topics, storageLevel)
+ }
+
+ /**
+ * Create an input stream that pulls messages from a Kafka Broker.
+ * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def kafkaStream[T: ClassManifest](
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ storageLevel: StorageLevel
+ ): DStream[T] = {
+ val inputStream = new KafkaInputDStream[T](this, kafkaParams, topics, storageLevel)
registerInputStream(inputStream)
inputStream
}
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index f3b40b5b88..7a8864614c 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -80,52 +80,23 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Create an input stream that pulls messages form a Kafka Broker.
+ * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
* @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
- * @param initialOffsets Optional initial offsets for each of the partitions to consume.
- * By default the value is pulled from zookeper.
- */
- def kafkaStream[T](
- zkQuorum: String,
- groupId: String,
- topics: JMap[String, JInt],
- initialOffsets: JMap[KafkaPartitionKey, JLong])
- : JavaDStream[T] = {
- implicit val cmt: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- ssc.kafkaStream[T](
- zkQuorum,
- groupId,
- Map(topics.mapValues(_.intValue()).toSeq: _*),
- Map(initialOffsets.mapValues(_.longValue()).toSeq: _*))
- }
-
- /**
- * Create an input stream that pulls messages form a Kafka Broker.
- * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
- * @param groupId The group id for this consumer.
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param initialOffsets Optional initial offsets for each of the partitions to consume.
- * By default the value is pulled from zookeper.
* @param storageLevel RDD storage level. Defaults to memory-only
*/
def kafkaStream[T](
- zkQuorum: String,
- groupId: String,
+ kafkaParams: JMap[String, String],
topics: JMap[String, JInt],
- initialOffsets: JMap[KafkaPartitionKey, JLong],
storageLevel: StorageLevel)
: JavaDStream[T] = {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
ssc.kafkaStream[T](
- zkQuorum,
- groupId,
+ kafkaParams.toMap,
Map(topics.mapValues(_.intValue()).toSeq: _*),
- Map(initialOffsets.mapValues(_.longValue()).toSeq: _*),
storageLevel)
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index dc7139cc27..17a5be3420 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -12,54 +12,45 @@ import kafka.message.{Message, MessageSet, MessageAndMetadata}
import kafka.serializer.StringDecoder
import kafka.utils.{Utils, ZKGroupTopicDirs}
import kafka.utils.ZkUtils._
+import kafka.utils.ZKStringSerializer
+import org.I0Itec.zkclient._
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
-// Key for a specific Kafka Partition: (broker, topic, group, part)
-case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int)
-
/**
* Input stream that pulls messages from a Kafka Broker.
*
- * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
- * @param groupId The group id for this consumer.
+ * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
- * @param initialOffsets Optional initial offsets for each of the partitions to consume.
- * By default the value is pulled from zookeper.
* @param storageLevel RDD storage level.
*/
private[streaming]
class KafkaInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
- zkQuorum: String,
- groupId: String,
+ kafkaParams: Map[String, String],
topics: Map[String, Int],
- initialOffsets: Map[KafkaPartitionKey, Long],
storageLevel: StorageLevel
) extends NetworkInputDStream[T](ssc_ ) with Logging {
def getReceiver(): NetworkReceiver[T] = {
- new KafkaReceiver(zkQuorum, groupId, topics, initialOffsets, storageLevel)
+ new KafkaReceiver(kafkaParams, topics, storageLevel)
.asInstanceOf[NetworkReceiver[T]]
}
}
private[streaming]
-class KafkaReceiver(zkQuorum: String, groupId: String,
- topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long],
+class KafkaReceiver(kafkaParams: Map[String, String],
+ topics: Map[String, Int],
storageLevel: StorageLevel) extends NetworkReceiver[Any] {
- // Timeout for establishing a connection to Zookeper in ms.
- val ZK_TIMEOUT = 10000
-
// Handles pushing data into the BlockManager
lazy protected val blockGenerator = new BlockGenerator(storageLevel)
// Connection to Kafka
- var consumerConnector : ZookeeperConsumerConnector = null
+ var consumerConnector : ConsumerConnector = null
def onStop() {
blockGenerator.stop()
@@ -72,23 +63,23 @@ class KafkaReceiver(zkQuorum: String, groupId: String,
// In case we are using multiple Threads to handle Kafka Messages
val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
- logInfo("Starting Kafka Consumer Stream with group: " + groupId)
- logInfo("Initial offsets: " + initialOffsets.toString)
+ logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("groupid"))
- // Zookeper connection properties
+ // Kafka connection properties
val props = new Properties()
- props.put("zk.connect", zkQuorum)
- props.put("zk.connectiontimeout.ms", ZK_TIMEOUT.toString)
- props.put("groupid", groupId)
+ kafkaParams.foreach(param => props.put(param._1, param._2))
// Create the connection to the cluster
- logInfo("Connecting to Zookeper: " + zkQuorum)
+ logInfo("Connecting to Zookeper: " + kafkaParams("zk.connect"))
val consumerConfig = new ConsumerConfig(props)
- consumerConnector = Consumer.create(consumerConfig).asInstanceOf[ZookeeperConsumerConnector]
- logInfo("Connected to " + zkQuorum)
+ consumerConnector = Consumer.create(consumerConfig)
+ logInfo("Connected to " + kafkaParams("zk.connect"))
- // If specified, set the topic offset
- setOffsets(initialOffsets)
+ // When autooffset.reset is defined, it is our responsibility to try and whack the
+ // consumer group zk node.
+ if (kafkaParams.contains("autooffset.reset")) {
+ tryZookeeperConsumerGroupCleanup(kafkaParams("zk.connect"), kafkaParams("groupid"))
+ }
// Create Threads for each Topic/Message Stream we are listening
val topicMessageStreams = consumerConnector.createMessageStreams(topics, new StringDecoder())
@@ -97,29 +88,33 @@ class KafkaReceiver(zkQuorum: String, groupId: String,
topicMessageStreams.values.foreach { streams =>
streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
}
-
- }
-
- // Overwrites the offets in Zookeper.
- private def setOffsets(offsets: Map[KafkaPartitionKey, Long]) {
- offsets.foreach { case(key, offset) =>
- val topicDirs = new ZKGroupTopicDirs(key.groupId, key.topic)
- val partitionName = key.brokerId + "-" + key.partId
- updatePersistentPath(consumerConnector.zkClient,
- topicDirs.consumerOffsetDir + "/" + partitionName, offset.toString)
- }
}
// Handles Kafka Messages
private class MessageHandler(stream: KafkaStream[String]) extends Runnable {
def run() {
logInfo("Starting MessageHandler.")
- stream.takeWhile { msgAndMetadata =>
+ for (msgAndMetadata <- stream) {
blockGenerator += msgAndMetadata.message
- // Keep on handling messages
-
- true
}
}
}
+
+ // It is our responsibility to delete the consumer group when specifying autooffset.reset. This is because
+ // Kafka 0.7.2 only honors this param when the group is not in zookeeper.
+ //
+ // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas'
+ // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest'/'largest':
+ // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+ private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
+ try {
+ val dir = "/consumers/" + groupId
+ logInfo("Cleaning up temporary zookeeper data under " + dir + ".")
+ val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
+ zk.deleteRecursive(dir)
+ zk.close()
+ } catch {
+ case _ => // swallow
+ }
+ }
}