aboutsummaryrefslogtreecommitdiff
path: root/external/kafka
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2015-08-24 23:26:14 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-24 23:26:14 -0700
commitd9c25dec87e6da7d66a47ff94e7eefa008081b9d (patch)
tree61ae5c9ae6b483357d98bb9f45cbb545e1c14c58 /external/kafka
parent5175ca0c85b10045d12c3fb57b1e52278a413ecf (diff)
downloadspark-d9c25dec87e6da7d66a47ff94e7eefa008081b9d.tar.gz
spark-d9c25dec87e6da7d66a47ff94e7eefa008081b9d.tar.bz2
spark-d9c25dec87e6da7d66a47ff94e7eefa008081b9d.zip
[SPARK-9786] [STREAMING] [KAFKA] fix backpressure so it works with defa…
…ult maxRatePerPartition setting of 0 Author: cody koeninger <cody@koeninger.org> Closes #8413 from koeninger/backpressure-testing-master.
Diffstat (limited to 'external/kafka')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala9
1 files changed, 7 insertions, 2 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 8a17707777..1000094e93 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -95,8 +95,13 @@ class DirectKafkaInputDStream[
val effectiveRateLimitPerPartition = estimatedRateLimit
.filter(_ > 0)
- .map(limit => Math.min(maxRateLimitPerPartition, (limit / numPartitions)))
- .getOrElse(maxRateLimitPerPartition)
+ .map { limit =>
+ if (maxRateLimitPerPartition > 0) {
+ Math.min(maxRateLimitPerPartition, (limit / numPartitions))
+ } else {
+ limit / numPartitions
+ }
+ }.getOrElse(maxRateLimitPerPartition)
if (effectiveRateLimitPerPartition > 0) {
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000