aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala55
1 files changed, 33 insertions, 22 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index 21443ebbbf..38095e88dc 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -38,7 +38,7 @@ import org.apache.spark.streaming.receiver.Receiver
/**
* Input stream that pulls messages from a Kafka Broker.
*
- * @param kafkaParams Map of kafka configuration paramaters.
+ * @param kafkaParams Map of kafka configuration parameters.
* See: http://kafka.apache.org/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
@@ -76,29 +76,31 @@ class KafkaReceiver[
// Connection to Kafka
var consumerConnector : ConsumerConnector = null
- def onStop() { }
+ def onStop() {
+ if (consumerConnector != null) {
+ consumerConnector.shutdown()
+ }
+ }
def onStart() {
- // In case we are using multiple Threads to handle Kafka Messages
- val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
-
logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))
// Kafka connection properties
val props = new Properties()
kafkaParams.foreach(param => props.put(param._1, param._2))
+ val zkConnect = kafkaParams("zookeeper.connect")
// Create the connection to the cluster
- logInfo("Connecting to Zookeper: " + kafkaParams("zookeeper.connect"))
+ logInfo("Connecting to Zookeeper: " + zkConnect)
val consumerConfig = new ConsumerConfig(props)
consumerConnector = Consumer.create(consumerConfig)
- logInfo("Connected to " + kafkaParams("zookeeper.connect"))
+ logInfo("Connected to " + zkConnect)
- // When autooffset.reset is defined, it is our responsibility to try and whack the
+ // When auto.offset.reset is defined, it is our responsibility to try and whack the
// consumer group zk node.
if (kafkaParams.contains("auto.offset.reset")) {
- tryZookeeperConsumerGroupCleanup(kafkaParams("zookeeper.connect"), kafkaParams("group.id"))
+ tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
}
val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
@@ -112,10 +114,14 @@ class KafkaReceiver[
val topicMessageStreams = consumerConnector.createMessageStreams(
topics, keyDecoder, valueDecoder)
-
- // Start the messages handler for each partition
- topicMessageStreams.values.foreach { streams =>
- streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
+ val executorPool = Executors.newFixedThreadPool(topics.values.sum)
+ try {
+ // Start the messages handler for each partition
+ topicMessageStreams.values.foreach { streams =>
+ streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
+ }
+ } finally {
+ executorPool.shutdown() // Just causes threads to terminate after work is done
}
}
@@ -124,30 +130,35 @@ class KafkaReceiver[
extends Runnable {
def run() {
logInfo("Starting MessageHandler.")
- for (msgAndMetadata <- stream) {
- store((msgAndMetadata.key, msgAndMetadata.message))
+ try {
+ for (msgAndMetadata <- stream) {
+ store((msgAndMetadata.key, msgAndMetadata.message))
+ }
+ } catch {
+ case e: Throwable => logError("Error handling message; exiting", e)
}
}
}
- // It is our responsibility to delete the consumer group when specifying autooffset.reset. This
+ // It is our responsibility to delete the consumer group when specifying auto.offset.reset. This
// is because Kafka 0.7.2 only honors this param when the group is not in zookeeper.
//
// The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied
- // from Kafkas' ConsoleConsumer. See code related to 'autooffset.reset' when it is set to
+ // from Kafka's ConsoleConsumer. See code related to 'auto.offset.reset' when it is set to
// 'smallest'/'largest':
// scalastyle:off
// https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
// scalastyle:on
private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
+ val dir = "/consumers/" + groupId
+ logInfo("Cleaning up temporary Zookeeper data under " + dir + ".")
+ val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
try {
- val dir = "/consumers/" + groupId
- logInfo("Cleaning up temporary zookeeper data under " + dir + ".")
- val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
zk.deleteRecursive(dir)
- zk.close()
} catch {
- case _ : Throwable => // swallow
+ case e: Throwable => logWarning("Error cleaning up temporary Zookeeper data", e)
+ } finally {
+ zk.close()
}
}
}