aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorseanm <sean.mcnamara@webtrends.com>2013-03-14 23:25:35 -0600
committerseanm <sean.mcnamara@webtrends.com>2013-03-14 23:45:33 -0600
commitd06928321194b11e082986cd2bb2737d9bc3b698 (patch)
tree5a08452b5398ffdabefda555854f4bfb475b8a66 /streaming
parentcfa8e769a86664722f47182fa572179e8beadcb7 (diff)
downloadspark-d06928321194b11e082986cd2bb2737d9bc3b698.tar.gz
spark-d06928321194b11e082986cd2bb2737d9bc3b698.tar.bz2
spark-d06928321194b11e082986cd2bb2737d9bc3b698.zip
fixing memory leak in kafka MessageHandler
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala5
1 files changed, 1 insertions, 4 deletions
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index f769fc1cc3..d674b6ee87 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -114,11 +114,8 @@ class KafkaReceiver(kafkaParams: Map[String, String],
private class MessageHandler(stream: KafkaStream[String]) extends Runnable {
def run() {
logInfo("Starting MessageHandler.")
- stream.takeWhile { msgAndMetadata =>
+ for (msgAndMetadata <- stream) {
blockGenerator += msgAndMetadata.message
- // Keep on handling messages
-
- true
}
}
}