aboutsummaryrefslogtreecommitdiff
path: root/external/kafka-0-10/src/main/scala
diff options
context:
space:
mode:
authorcody koeninger <cody@koeninger.org>2016-11-14 11:10:37 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-11-14 11:10:37 -0800
commit89d1fa58dbe88560b1f2b0362fcc3035ccc888be (patch)
tree4de76454213bb7d48930f86c5ab494533c4c1ebe /external/kafka-0-10/src/main/scala
parentbdfe60ac921172be0fb77de2f075cc7904a3b238 (diff)
downloadspark-89d1fa58dbe88560b1f2b0362fcc3035ccc888be.tar.gz
spark-89d1fa58dbe88560b1f2b0362fcc3035ccc888be.tar.bz2
spark-89d1fa58dbe88560b1f2b0362fcc3035ccc888be.zip
[SPARK-17510][STREAMING][KAFKA] config max rate on a per-partition basis
## What changes were proposed in this pull request? Allow configuration of max rate on a per-topicpartition basis. ## How was this patch tested? Unit tests. The reporter (Jeff Nadler) said he could test on his workload, so let's wait on that report. Author: cody koeninger <cody@koeninger.org> Closes #15132 from koeninger/SPARK-17510.
Diffstat (limited to 'external/kafka-0-10/src/main/scala')
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala11
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala53
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala47
3 files changed, 104 insertions, 7 deletions
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
index 7e57bb18cb..794f53c5ab 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -57,7 +57,8 @@ import org.apache.spark.streaming.scheduler.rate.RateEstimator
private[spark] class DirectKafkaInputDStream[K, V](
_ssc: StreamingContext,
locationStrategy: LocationStrategy,
- consumerStrategy: ConsumerStrategy[K, V]
+ consumerStrategy: ConsumerStrategy[K, V],
+ ppc: PerPartitionConfig
) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets {
val executorKafkaParams = {
@@ -128,12 +129,9 @@ private[spark] class DirectKafkaInputDStream[K, V](
}
}
- private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt(
- "spark.streaming.kafka.maxRatePerPartition", 0)
-
protected[streaming] def maxMessagesPerPartition(
offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] = {
- val estimatedRateLimit = rateController.map(_.getLatestRate().toInt)
+ val estimatedRateLimit = rateController.map(_.getLatestRate())
// calculate a per-partition rate limit based on current lag
val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match {
@@ -144,11 +142,12 @@ private[spark] class DirectKafkaInputDStream[K, V](
val totalLag = lagPerPartition.values.sum
lagPerPartition.map { case (tp, lag) =>
+ val maxRateLimitPerPartition = ppc.maxRatePerPartition(tp)
val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
tp -> (if (maxRateLimitPerPartition > 0) {
Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate)
}
- case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition }
+ case None => offsets.map { case (tp, offset) => tp -> ppc.maxRatePerPartition(tp) }
}
if (effectiveRateLimitPerPartition.values.sum > 0) {
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
index b2190bfa05..c11917f59d 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
@@ -123,7 +123,31 @@ object KafkaUtils extends Logging {
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V]
): InputDStream[ConsumerRecord[K, V]] = {
- new DirectKafkaInputDStream[K, V](ssc, locationStrategy, consumerStrategy)
+ val ppc = new DefaultPerPartitionConfig(ssc.sparkContext.getConf)
+ createDirectStream[K, V](ssc, locationStrategy, consumerStrategy, ppc)
+ }
+
+ /**
+ * :: Experimental ::
+ * Scala constructor for a DStream where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent,
+ * see [[LocationStrategies]] for more details.
+ * @param consumerStrategy In most cases, pass in ConsumerStrategies.subscribe,
+ * see [[ConsumerStrategies]] for more details.
+ * @param perPartitionConfig configuration of settings such as max rate on a per-partition basis.
+ * see [[PerPartitionConfig]] for more details.
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+ @Experimental
+ def createDirectStream[K, V](
+ ssc: StreamingContext,
+ locationStrategy: LocationStrategy,
+ consumerStrategy: ConsumerStrategy[K, V],
+ perPartitionConfig: PerPartitionConfig
+ ): InputDStream[ConsumerRecord[K, V]] = {
+ new DirectKafkaInputDStream[K, V](ssc, locationStrategy, consumerStrategy, perPartitionConfig)
}
/**
@@ -151,6 +175,33 @@ object KafkaUtils extends Logging {
}
/**
+ * :: Experimental ::
+ * Java constructor for a DStream where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * @param keyClass Class of the keys in the Kafka records
+ * @param valueClass Class of the values in the Kafka records
+ * @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent,
+ * see [[LocationStrategies]] for more details.
+ * @param consumerStrategy In most cases, pass in ConsumerStrategies.subscribe,
+ * see [[ConsumerStrategies]] for more details
+ * @param perPartitionConfig configuration of settings such as max rate on a per-partition basis.
+ * see [[PerPartitionConfig]] for more details.
+ * @tparam K type of Kafka message key
+ * @tparam V type of Kafka message value
+ */
+ @Experimental
+ def createDirectStream[K, V](
+ jssc: JavaStreamingContext,
+ locationStrategy: LocationStrategy,
+ consumerStrategy: ConsumerStrategy[K, V],
+ perPartitionConfig: PerPartitionConfig
+ ): JavaInputDStream[ConsumerRecord[K, V]] = {
+ new JavaInputDStream(
+ createDirectStream[K, V](
+ jssc.ssc, locationStrategy, consumerStrategy, perPartitionConfig))
+ }
+
+ /**
* Tweak kafka params to prevent issues on executors
*/
private[kafka010] def fixKafkaParams(kafkaParams: ju.HashMap[String, Object]): Unit = {
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala
new file mode 100644
index 0000000000..4792f2a955
--- /dev/null
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/PerPartitionConfig.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkConf
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Interface for user-supplied configurations that can't otherwise be set via Spark properties,
+ * because they need tweaking on a per-partition basis,
+ */
+@Experimental
+abstract class PerPartitionConfig extends Serializable {
+ /**
+ * Maximum rate (number of records per second) at which data will be read
+ * from each Kafka partition.
+ */
+ def maxRatePerPartition(topicPartition: TopicPartition): Long
+}
+
+/**
+ * Default per-partition configuration
+ */
+private class DefaultPerPartitionConfig(conf: SparkConf)
+ extends PerPartitionConfig {
+ val maxRate = conf.getLong("spark.streaming.kafka.maxRatePerPartition", 0)
+
+ def maxRatePerPartition(topicPartition: TopicPartition): Long = maxRate
+}