diff options
author | cody koeninger <cody@koeninger.org> | 2015-08-24 23:26:14 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-08-24 23:26:14 -0700 |
commit | d9c25dec87e6da7d66a47ff94e7eefa008081b9d (patch) | |
tree | 61ae5c9ae6b483357d98bb9f45cbb545e1c14c58 | |
parent | 5175ca0c85b10045d12c3fb57b1e52278a413ecf (diff) | |
download | spark-d9c25dec87e6da7d66a47ff94e7eefa008081b9d.tar.gz spark-d9c25dec87e6da7d66a47ff94e7eefa008081b9d.tar.bz2 spark-d9c25dec87e6da7d66a47ff94e7eefa008081b9d.zip |
[SPARK-9786] [STREAMING] [KAFKA] fix backpressure so it works with defa…
…ult maxRatePerPartition setting of 0
Author: cody koeninger <cody@koeninger.org>
Closes #8413 from koeninger/backpressure-testing-master.
-rw-r--r-- | external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala | 9 |
1 files changed, 7 insertions, 2 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index 8a17707777..1000094e93 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -95,8 +95,13 @@ class DirectKafkaInputDStream[ val effectiveRateLimitPerPartition = estimatedRateLimit .filter(_ > 0) - .map(limit => Math.min(maxRateLimitPerPartition, (limit / numPartitions))) - .getOrElse(maxRateLimitPerPartition) + .map { limit => + if (maxRateLimitPerPartition > 0) { + Math.min(maxRateLimitPerPartition, (limit / numPartitions)) + } else { + limit / numPartitions + } + }.getOrElse(maxRateLimitPerPartition) if (effectiveRateLimitPerPartition > 0) { val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 |