diff options
author | cody koeninger <cody@koeninger.org> | 2016-11-14 11:10:37 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-11-14 11:10:37 -0800 |
commit | 89d1fa58dbe88560b1f2b0362fcc3035ccc888be (patch) | |
tree | 4de76454213bb7d48930f86c5ab494533c4c1ebe /external/kafka-0-8 | |
parent | bdfe60ac921172be0fb77de2f075cc7904a3b238 (diff) | |
download | spark-89d1fa58dbe88560b1f2b0362fcc3035ccc888be.tar.gz spark-89d1fa58dbe88560b1f2b0362fcc3035ccc888be.tar.bz2 spark-89d1fa58dbe88560b1f2b0362fcc3035ccc888be.zip |
[SPARK-17510][STREAMING][KAFKA] config max rate on a per-partition basis
## What changes were proposed in this pull request?
Allow configuration of max rate on a per-topicpartition basis.
## How was this patch tested?
Unit tests.
The reporter (Jeff Nadler) said he could test on his workload, so let's wait on that report.
Author: cody koeninger <cody@koeninger.org>
Closes #15132 from koeninger/SPARK-17510.
Diffstat (limited to 'external/kafka-0-8')
-rw-r--r-- | external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala | 4 |
1 files changed, 2 insertions, 2 deletions
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala index c3c799375b..d52c230eb7 100644 --- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala @@ -88,12 +88,12 @@ class DirectKafkaInputDStream[ protected val kc = new KafkaCluster(kafkaParams) - private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt( + private val maxRateLimitPerPartition: Long = context.sparkContext.getConf.getLong( "spark.streaming.kafka.maxRatePerPartition", 0) protected[streaming] def maxMessagesPerPartition( offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, Long]] = { - val estimatedRateLimit = rateController.map(_.getLatestRate().toInt) + val estimatedRateLimit = rateController.map(_.getLatestRate()) // calculate a per-partition rate limit based on current lag val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match { |