diff options
author | seanm <sean.mcnamara@webtrends.com> | 2013-03-14 23:25:35 -0600 |
---|---|---|
committer | seanm <sean.mcnamara@webtrends.com> | 2013-03-14 23:45:33 -0600 |
commit | d06928321194b11e082986cd2bb2737d9bc3b698 (patch) | |
tree | 5a08452b5398ffdabefda555854f4bfb475b8a66 /streaming/src | |
parent | cfa8e769a86664722f47182fa572179e8beadcb7 (diff) | |
download | spark-d06928321194b11e082986cd2bb2737d9bc3b698.tar.gz spark-d06928321194b11e082986cd2bb2737d9bc3b698.tar.bz2 spark-d06928321194b11e082986cd2bb2737d9bc3b698.zip |
fixing memory leak in kafka MessageHandler
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala | 5 |
1 files changed, 1 insertions, 4 deletions
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala index f769fc1cc3..d674b6ee87 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -114,11 +114,8 @@ class KafkaReceiver(kafkaParams: Map[String, String], private class MessageHandler(stream: KafkaStream[String]) extends Runnable { def run() { logInfo("Starting MessageHandler.") - stream.takeWhile { msgAndMetadata => + for (msgAndMetadata <- stream) { blockGenerator += msgAndMetadata.message - // Keep on handling messages - - true } } } |