aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2014-06-22 01:12:15 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-06-22 01:12:15 -0700
commit476581e8c8ca03a5940c404fee8a06361ff94cb5 (patch)
tree15941c80d445465bf4aa3933ef69a0bfda619e7e /external
parent58b32f3470f9fa67ad7dffb7d3dcd2a954b4b4e9 (diff)
downloadspark-476581e8c8ca03a5940c404fee8a06361ff94cb5.tar.gz
spark-476581e8c8ca03a5940c404fee8a06361ff94cb5.tar.bz2
spark-476581e8c8ca03a5940c404fee8a06361ff94cb5.zip
SPARK-2034. KafkaInputDStream doesn't close resources and may prevent JVM shutdown
Tobias noted today on the mailing list: ======== I am trying to use Spark Streaming with Kafka, which works like a charm – except for shutdown. When I run my program with "sbt run-main", sbt will never exit, because there are two non-daemon threads left that don't die. I created a minimal example at <https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-kafkadoesntshutdown-scala>. It starts a StreamingContext and does nothing more than connecting to a Kafka server and printing what it receives. Using the `future Unknown macro: { ... } ` construct, I shut down the StreamingContext after some seconds and then print the difference between the threads at start time and at end time. The output can be found at <https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output1>. There are a number of threads remaining that will prevent sbt from exiting. When I replace `KafkaUtils.createStream(...)` with a call that does exactly the same, except that it calls `consumerConnector.shutdown()` in `KafkaReceiver.onStop()` (which it should, IMO), the output is as shown at <https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output2>. Does anyone have any idea what is going on here and why the program doesn't shut down properly? The behavior is the same with both kafka 0.8.0 and 0.8.1.1, by the way. ======== Something similar was noted last year: http://mail-archives.apache.org/mod_mbox/spark-dev/201309.mbox/%3C1380220041.2428.YahooMailNeo@web160804.mail.bf1.yahoo.com%3E KafkaInputDStream doesn't close `ConsumerConnector` in `onStop()`, and does not close the `Executor` it creates. The latter leaves non-daemon threads and can prevent the JVM from shutting down even if streaming is closed properly. Author: Sean Owen <sowen@cloudera.com> Closes #980 from srowen/SPARK-2034 and squashes the following commits: 9f31a8d [Sean Owen] Restore ClassTag to private class because MIMA flags it; is the shadowing intended? 2d579a8 [Sean Owen] Close ConsumerConnector in onStop; shutdown() the local Executor that is created so that its threads stop when done; close the Zookeeper client even on exception; fix a few typos; log exceptions that otherwise vanish
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala55
1 files changed, 33 insertions, 22 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index 21443ebbbf..38095e88dc 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -38,7 +38,7 @@ import org.apache.spark.streaming.receiver.Receiver
/**
* Input stream that pulls messages from a Kafka Broker.
*
- * @param kafkaParams Map of kafka configuration paramaters.
+ * @param kafkaParams Map of kafka configuration parameters.
* See: http://kafka.apache.org/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
@@ -76,29 +76,31 @@ class KafkaReceiver[
// Connection to Kafka
var consumerConnector : ConsumerConnector = null
- def onStop() { }
+ def onStop() {
+ if (consumerConnector != null) {
+ consumerConnector.shutdown()
+ }
+ }
def onStart() {
- // In case we are using multiple Threads to handle Kafka Messages
- val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
-
logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))
// Kafka connection properties
val props = new Properties()
kafkaParams.foreach(param => props.put(param._1, param._2))
+ val zkConnect = kafkaParams("zookeeper.connect")
// Create the connection to the cluster
- logInfo("Connecting to Zookeper: " + kafkaParams("zookeeper.connect"))
+ logInfo("Connecting to Zookeeper: " + zkConnect)
val consumerConfig = new ConsumerConfig(props)
consumerConnector = Consumer.create(consumerConfig)
- logInfo("Connected to " + kafkaParams("zookeeper.connect"))
+ logInfo("Connected to " + zkConnect)
- // When autooffset.reset is defined, it is our responsibility to try and whack the
+ // When auto.offset.reset is defined, it is our responsibility to try and whack the
// consumer group zk node.
if (kafkaParams.contains("auto.offset.reset")) {
- tryZookeeperConsumerGroupCleanup(kafkaParams("zookeeper.connect"), kafkaParams("group.id"))
+ tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
}
val keyDecoder = manifest[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
@@ -112,10 +114,14 @@ class KafkaReceiver[
val topicMessageStreams = consumerConnector.createMessageStreams(
topics, keyDecoder, valueDecoder)
-
- // Start the messages handler for each partition
- topicMessageStreams.values.foreach { streams =>
- streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
+ val executorPool = Executors.newFixedThreadPool(topics.values.sum)
+ try {
+ // Start the messages handler for each partition
+ topicMessageStreams.values.foreach { streams =>
+ streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
+ }
+ } finally {
+ executorPool.shutdown() // Just causes threads to terminate after work is done
}
}
@@ -124,30 +130,35 @@ class KafkaReceiver[
extends Runnable {
def run() {
logInfo("Starting MessageHandler.")
- for (msgAndMetadata <- stream) {
- store((msgAndMetadata.key, msgAndMetadata.message))
+ try {
+ for (msgAndMetadata <- stream) {
+ store((msgAndMetadata.key, msgAndMetadata.message))
+ }
+ } catch {
+ case e: Throwable => logError("Error handling message; exiting", e)
}
}
}
- // It is our responsibility to delete the consumer group when specifying autooffset.reset. This
+ // It is our responsibility to delete the consumer group when specifying auto.offset.reset. This
// is because Kafka 0.7.2 only honors this param when the group is not in zookeeper.
//
// The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied
- // from Kafkas' ConsoleConsumer. See code related to 'autooffset.reset' when it is set to
+ // from Kafka's ConsoleConsumer. See code related to 'auto.offset.reset' when it is set to
// 'smallest'/'largest':
// scalastyle:off
// https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
// scalastyle:on
private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
+ val dir = "/consumers/" + groupId
+ logInfo("Cleaning up temporary Zookeeper data under " + dir + ".")
+ val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
try {
- val dir = "/consumers/" + groupId
- logInfo("Cleaning up temporary zookeeper data under " + dir + ".")
- val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
zk.deleteRecursive(dir)
- zk.close()
} catch {
- case _ : Throwable => // swallow
+ case e: Throwable => logWarning("Error cleaning up temporary Zookeeper data", e)
+ } finally {
+ zk.close()
}
}
}