aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-8
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2016-11-14 11:10:37 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-11-14 11:10:37 -0800
commit89d1fa58dbe88560b1f2b0362fcc3035ccc888be (patch)
tree4de76454213bb7d48930f86c5ab494533c4c1ebe /external/kafka-0-8
parentbdfe60ac921172be0fb77de2f075cc7904a3b238 (diff)
downloadspark-89d1fa58dbe88560b1f2b0362fcc3035ccc888be.tar.gz
spark-89d1fa58dbe88560b1f2b0362fcc3035ccc888be.tar.bz2
spark-89d1fa58dbe88560b1f2b0362fcc3035ccc888be.zip
[SPARK-17510][STREAMING][KAFKA] config max rate on a per-partition basis
## What changes were proposed in this pull request? Allow configuration of max rate on a per-topicpartition basis. ## How was this patch tested? Unit tests. The reporter (Jeff Nadler) said he could test on his workload, so let's wait on that report. Author: cody koeninger <cody@koeninger.org> Closes #15132 from koeninger/SPARK-17510.
Diffstat (limited to 'external/kafka-0-8')
-rw-r--r--external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala4
1 files changed, 2 insertions, 2 deletions
diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index c3c799375b..d52c230eb7 100644
--- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -88,12 +88,12 @@ class DirectKafkaInputDStream[
protected val kc = new KafkaCluster(kafkaParams)
- private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt(
+ private val maxRateLimitPerPartition: Long = context.sparkContext.getConf.getLong(
"spark.streaming.kafka.maxRatePerPartition", 0)
protected[streaming] def maxMessagesPerPartition(
offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, Long]] = {
- val estimatedRateLimit = rateController.map(_.getLatestRate().toInt)
+ val estimatedRateLimit = rateController.map(_.getLatestRate())
// calculate a per-partition rate limit based on current lag
val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match {