aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorseanm <sean.mcnamara@webtrends.com>2013-03-11 17:16:15 -0600
committerseanm <sean.mcnamara@webtrends.com>2013-03-14 23:45:19 -0600
commitcfa8e769a86664722f47182fa572179e8beadcb7 (patch)
tree57840fd4833cbf0105198815e610b37ecab6a889 /streaming
parent1c3d98197b120e2a81f59bd9315d3892ef4d24ca (diff)
downloadspark-cfa8e769a86664722f47182fa572179e8beadcb7.tar.gz
spark-cfa8e769a86664722f47182fa572179e8beadcb7.tar.bz2
spark-cfa8e769a86664722f47182fa572179e8beadcb7.zip
KafkaInputDStream improvements. Allows more Kafka configurability
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala22
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala48
2 files changed, 51 insertions, 19 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 25c67b279b..4e1732adf5 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -199,7 +199,7 @@ class StreamingContext private (
}
/**
- * Create an input stream that pulls messages form a Kafka Broker.
+ * Create an input stream that pulls messages from a Kafka Broker.
* @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
@@ -216,7 +216,25 @@ class StreamingContext private (
initialOffsets: Map[KafkaPartitionKey, Long] = Map[KafkaPartitionKey, Long](),
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2
): DStream[T] = {
- val inputStream = new KafkaInputDStream[T](this, zkQuorum, groupId, topics, initialOffsets, storageLevel)
+ val kafkaParams = Map[String, String]("zk.connect" -> zkQuorum, "groupid" -> groupId, "zk.connectiontimeout.ms" -> "10000");
+ kafkaStream[T](kafkaParams, topics, initialOffsets, storageLevel)
+ }
+
+ /**
+ * Create an input stream that pulls messages from a Kafka Broker.
+ * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param initialOffsets Optional initial offsets for each of the partitions to consume.
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def kafkaStream[T: ClassManifest](
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ initialOffsets: Map[KafkaPartitionKey, Long],
+ storageLevel: StorageLevel
+ ): DStream[T] = {
+ val inputStream = new KafkaInputDStream[T](this, kafkaParams, topics, initialOffsets, storageLevel)
registerInputStream(inputStream)
inputStream
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index dc7139cc27..f769fc1cc3 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -12,6 +12,8 @@ import kafka.message.{Message, MessageSet, MessageAndMetadata}
import kafka.serializer.StringDecoder
import kafka.utils.{Utils, ZKGroupTopicDirs}
import kafka.utils.ZkUtils._
+import kafka.utils.ZKStringSerializer
+import org.I0Itec.zkclient._
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
@@ -23,8 +25,7 @@ case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, part
/**
* Input stream that pulls messages from a Kafka Broker.
*
- * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
- * @param groupId The group id for this consumer.
+ * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param initialOffsets Optional initial offsets for each of the partitions to consume.
@@ -34,8 +35,7 @@ case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, part
private[streaming]
class KafkaInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
- zkQuorum: String,
- groupId: String,
+ kafkaParams: Map[String, String],
topics: Map[String, Int],
initialOffsets: Map[KafkaPartitionKey, Long],
storageLevel: StorageLevel
@@ -43,19 +43,16 @@ class KafkaInputDStream[T: ClassManifest](
def getReceiver(): NetworkReceiver[T] = {
- new KafkaReceiver(zkQuorum, groupId, topics, initialOffsets, storageLevel)
+ new KafkaReceiver(kafkaParams, topics, initialOffsets, storageLevel)
.asInstanceOf[NetworkReceiver[T]]
}
}
private[streaming]
-class KafkaReceiver(zkQuorum: String, groupId: String,
+class KafkaReceiver(kafkaParams: Map[String, String],
topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long],
storageLevel: StorageLevel) extends NetworkReceiver[Any] {
- // Timeout for establishing a connection to Zookeper in ms.
- val ZK_TIMEOUT = 10000
-
// Handles pushing data into the BlockManager
lazy protected val blockGenerator = new BlockGenerator(storageLevel)
// Connection to Kafka
@@ -72,20 +69,24 @@ class KafkaReceiver(zkQuorum: String, groupId: String,
// In case we are using multiple Threads to handle Kafka Messages
val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
- logInfo("Starting Kafka Consumer Stream with group: " + groupId)
+ logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("groupid"))
logInfo("Initial offsets: " + initialOffsets.toString)
- // Zookeper connection properties
+ // Kafka connection properties
val props = new Properties()
- props.put("zk.connect", zkQuorum)
- props.put("zk.connectiontimeout.ms", ZK_TIMEOUT.toString)
- props.put("groupid", groupId)
+ kafkaParams.foreach(param => props.put(param._1, param._2))
// Create the connection to the cluster
- logInfo("Connecting to Zookeper: " + zkQuorum)
+ logInfo("Connecting to Zookeper: " + kafkaParams("zk.connect"))
val consumerConfig = new ConsumerConfig(props)
consumerConnector = Consumer.create(consumerConfig).asInstanceOf[ZookeeperConsumerConnector]
- logInfo("Connected to " + zkQuorum)
+ logInfo("Connected to " + kafkaParams("zk.connect"))
+
+ // When autooffset.reset is 'smallest', it is our responsibility to try and whack the
+ // consumer group zk node.
+ if (kafkaParams.get("autooffset.reset").exists(_ == "smallest")) {
+ tryZookeeperConsumerGroupCleanup(kafkaParams("zk.connect"), kafkaParams("groupid"))
+ }
// If specified, set the topic offset
setOffsets(initialOffsets)
@@ -97,7 +98,6 @@ class KafkaReceiver(zkQuorum: String, groupId: String,
topicMessageStreams.values.foreach { streams =>
streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
}
-
}
// Overwrites the offets in Zookeper.
@@ -122,4 +122,18 @@ class KafkaReceiver(zkQuorum: String, groupId: String,
}
}
}
+
+ // Handles cleanup of consumer group znode. Lifted with love from Kafka's
+ // ConsumerConsole.scala tryCleanupZookeeper()
+ private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
+ try {
+ val dir = "/consumers/" + groupId
+ logInfo("Cleaning up temporary zookeeper data under " + dir + ".")
+ val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
+ zk.deleteRecursive(dir)
+ zk.close()
+ } catch {
+ case _ => // swallow
+ }
+ }
}