diff options
author | cody koeninger <cody@koeninger.org> | 2016-11-14 11:10:37 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-11-14 11:10:37 -0800 |
commit | 89d1fa58dbe88560b1f2b0362fcc3035ccc888be (patch) | |
tree | 4de76454213bb7d48930f86c5ab494533c4c1ebe /external/kafka-0-10/src/main/scala | |
parent | bdfe60ac921172be0fb77de2f075cc7904a3b238 (diff) | |
download | spark-89d1fa58dbe88560b1f2b0362fcc3035ccc888be.tar.gz spark-89d1fa58dbe88560b1f2b0362fcc3035ccc888be.tar.bz2 spark-89d1fa58dbe88560b1f2b0362fcc3035ccc888be.zip |
[SPARK-17510][STREAMING][KAFKA] config max rate on a per-partition basis
## What changes were proposed in this pull request?
Allow configuration of max rate on a per-topicpartition basis.
## How was this patch tested?
Unit tests.
The reporter (Jeff Nadler) said he could test on his workload, so let's wait on that report.
Author: cody koeninger <cody@koeninger.org>
Closes #15132 from koeninger/SPARK-17510.
Diffstat (limited to 'external/kafka-0-10/src/main/scala')
3 files changed, 104 insertions, 7 deletions
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index 7e57bb18cb..794f53c5ab 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -57,7 +57,8 @@ import org.apache.spark.streaming.scheduler.rate.RateEstimator private[spark] class DirectKafkaInputDStream[K, V]( _ssc: StreamingContext, locationStrategy: LocationStrategy, - consumerStrategy: ConsumerStrategy[K, V] + consumerStrategy: ConsumerStrategy[K, V], + ppc: PerPartitionConfig ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets { val executorKafkaParams = { @@ -128,12 +129,9 @@ private[spark] class DirectKafkaInputDStream[K, V]( } } - private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt( - "spark.streaming.kafka.maxRatePerPartition", 0) - protected[streaming] def maxMessagesPerPartition( offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] = { - val estimatedRateLimit = rateController.map(_.getLatestRate().toInt) + val estimatedRateLimit = rateController.map(_.getLatestRate()) // calculate a per-partition rate limit based on current lag val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match { @@ -144,11 +142,12 @@ private[spark] class DirectKafkaInputDStream[K, V]( val totalLag = lagPerPartition.values.sum lagPerPartition.map { case (tp, lag) => + val maxRateLimitPerPartition = ppc.maxRatePerPartition(tp) val backpressureRate = Math.round(lag / totalLag.toFloat * rate) tp -> (if (maxRateLimitPerPartition > 0) { Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate) } - case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition } + case None => offsets.map { case (tp, offset) => tp -> ppc.maxRatePerPartition(tp) } } if (effectiveRateLimitPerPartition.values.sum > 0) { diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala index b2190bfa05..c11917f59d 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala @@ -123,7 +123,31 @@ object KafkaUtils extends Logging { locationStrategy: LocationStrategy, consumerStrategy: ConsumerStrategy[K, V] ): InputDStream[ConsumerRecord[K, V]] = { - new DirectKafkaInputDStream[K, V](ssc, locationStrategy, consumerStrategy) + val ppc = new DefaultPerPartitionConfig(ssc.sparkContext.getConf) + createDirectStream[K, V](ssc, locationStrategy, consumerStrategy, ppc) + } + + /** + * :: Experimental :: + * Scala constructor for a DStream where + * each given Kafka topic/partition corresponds to an RDD partition. + * @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent, + * see [[LocationStrategies]] for more details. + * @param consumerStrategy In most cases, pass in ConsumerStrategies.subscribe, + * see [[ConsumerStrategies]] for more details. + * @param perPartitionConfig configuration of settings such as max rate on a per-partition basis. + * see [[PerPartitionConfig]] for more details. + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ + @Experimental + def createDirectStream[K, V]( + ssc: StreamingContext, + locationStrategy: LocationStrategy, + consumerStrategy: ConsumerStrategy[K, V], + perPartitionConfig: PerPartitionConfig + ): InputDStream[ConsumerRecord[K, V]] = { + new DirectKafkaInputDStream[K, V](ssc, locationStrategy, consumerStrategy, perPartitionConfig) } /** @@ -151,6 +175,33 @@ object KafkaUtils extends Logging { } /** + * :: Experimental :: + * Java constructor for a DStream where + * each given Kafka topic/partition corresponds to an RDD partition. + * @param keyClass Class of the keys in the Kafka records + * @param valueClass Class of the values in the Kafka records + * @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent, + * see [[LocationStrategies]] for more details. + * @param consumerStrategy In most cases, pass in ConsumerStrategies.subscribe, + * see [[ConsumerStrategies]] for more details + * @param perPartitionConfig configuration of settings such as max rate on a per-partition basis. + * see [[PerPartitionConfig]] for more details. + * @tparam K type of Kafka message key + * @tparam V type of Kafka message value + */ + @Experimental + def createDirectStream[K, V]( + jssc: JavaStreamingContext, + locationStrategy: LocationStrategy, + consumerStrategy: ConsumerStrategy[K, V], + perPartitionConfig: PerPartitionConfig + ): JavaInputDStream[ConsumerRecord[K, V]] = { + new JavaInputDStream( + createDirectStream[K, V]( + jssc.ssc, locationStrategy, consumerStrategy, perPartitionConfig)) + } + + /** * Tweak kafka params to prevent issues on executors */ private[kafka010] def fixKafkaParams(kafkaParams: ju.HashMap[String, Object]): Unit = { diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala new file mode 100644 index 0000000000..4792f2a955 --- /dev/null +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkConf +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Interface for user-supplied configurations that can't otherwise be set via Spark properties, + * because they need tweaking on a per-partition basis, + */ +@Experimental +abstract class PerPartitionConfig extends Serializable { + /** + * Maximum rate (number of records per second) at which data will be read + * from each Kafka partition. + */ + def maxRatePerPartition(topicPartition: TopicPartition): Long +} + +/** + * Default per-partition configuration + */ +private class DefaultPerPartitionConfig(conf: SparkConf) + extends PerPartitionConfig { + val maxRate = conf.getLong("spark.streaming.kafka.maxRatePerPartition", 0) + + def maxRatePerPartition(topicPartition: TopicPartition): Long = maxRate +} |