aboutsummaryrefslogtreecommitdiff
path: root/external/kafka/src/main
diff options
context:
space:
mode:
authorNilanjan Raychaudhuri <nraychaudhuri@gmail.com>2015-08-06 12:50:08 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-06 12:50:08 -0700
commita1bbf1bc5c51cd796015ac159799cf024de6fa07 (patch)
tree4e172c67f556291d4ad5604196e674321a6665ee /external/kafka/src/main
parent0d7aac99da660cc42eb5a9be8e262bd9bd8a770f (diff)
downloadspark-a1bbf1bc5c51cd796015ac159799cf024de6fa07.tar.gz
spark-a1bbf1bc5c51cd796015ac159799cf024de6fa07.tar.bz2
spark-a1bbf1bc5c51cd796015ac159799cf024de6fa07.zip
[SPARK-8978] [STREAMING] Implements the DirectKafkaRateController
Author: Dean Wampler <dean@concurrentthought.com> Author: Nilanjan Raychaudhuri <nraychaudhuri@gmail.com> Author: François Garillot <francois@garillot.net> Closes #7796 from dragos/topic/streaming-bp/kafka-direct and squashes the following commits: 50d1f21 [Nilanjan Raychaudhuri] Taking care of the remaining nits 648c8b1 [Dean Wampler] Refactored rate controller test to be more predictable and run faster. e43f678 [Nilanjan Raychaudhuri] fixing doc and nits ce19d2a [Dean Wampler] Removing an unreliable assertion. 9615320 [Dean Wampler] Give me a break... 6372478 [Dean Wampler] Found a few ways to make this test more robust... 9e69e37 [Dean Wampler] Attempt to fix flakey test that fails in CI, but not locally :( d3db1ea [Dean Wampler] Fixing stylecheck errors. d04a288 [Nilanjan Raychaudhuri] adding test to make sure rate controller is used to calculate maxMessagesPerPartition b6ecb67 [Nilanjan Raychaudhuri] Fixed styling issue 3110267 [Nilanjan Raychaudhuri] [SPARK-8978][Streaming] Implements the DirectKafkaRateController 393c580 [François Garillot] [SPARK-8978][Streaming] Implements the DirectKafkaRateController 51e78c6 [Nilanjan Raychaudhuri] Rename and fix build failure 2795509 [Nilanjan Raychaudhuri] Added missing RateController 19200f5 [Dean Wampler] Removed usage of infix notation. Changed a private variable name to be more consistent with usage. aa4a70b [François Garillot] [SPARK-8978][Streaming] Implements the DirectKafkaController
Diffstat (limited to 'external/kafka/src/main')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala47
1 files changed, 38 insertions, 9 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 48a1933d92..8a17707777 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -29,7 +29,8 @@ import org.apache.spark.{Logging, SparkException}
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
-import org.apache.spark.streaming.scheduler.StreamInputInfo
+import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
/**
* A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
@@ -61,7 +62,7 @@ class DirectKafkaInputDStream[
val kafkaParams: Map[String, String],
val fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
-) extends InputDStream[R](ssc_) with Logging {
+ ) extends InputDStream[R](ssc_) with Logging {
val maxRetries = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRetries", 1)
@@ -71,14 +72,35 @@ class DirectKafkaInputDStream[
protected[streaming] override val checkpointData =
new DirectKafkaInputDStreamCheckpointData
+
+ /**
+ * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
+ */
+ override protected[streaming] val rateController: Option[RateController] = {
+ if (RateController.isBackPressureEnabled(ssc.conf)) {
+ Some(new DirectKafkaRateController(id,
+ RateEstimator.create(ssc.conf, ssc_.graph.batchDuration)))
+ } else {
+ None
+ }
+ }
+
protected val kc = new KafkaCluster(kafkaParams)
- protected val maxMessagesPerPartition: Option[Long] = {
- val ratePerSec = context.sparkContext.getConf.getInt(
+ private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRatePerPartition", 0)
- if (ratePerSec > 0) {
+ protected def maxMessagesPerPartition: Option[Long] = {
+ val estimatedRateLimit = rateController.map(_.getLatestRate().toInt)
+ val numPartitions = currentOffsets.keys.size
+
+ val effectiveRateLimitPerPartition = estimatedRateLimit
+ .filter(_ > 0)
+ .map(limit => Math.min(maxRateLimitPerPartition, (limit / numPartitions)))
+ .getOrElse(maxRateLimitPerPartition)
+
+ if (effectiveRateLimitPerPartition > 0) {
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
- Some((secsPerBatch * ratePerSec).toLong)
+ Some((secsPerBatch * effectiveRateLimitPerPartition).toLong)
} else {
None
}
@@ -170,11 +192,18 @@ class DirectKafkaInputDStream[
val leaders = KafkaCluster.checkErrors(kc.findLeaders(topics))
batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
- logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
- generatedRDDs += t -> new KafkaRDD[K, V, U, T, R](
- context.sparkContext, kafkaParams, b.map(OffsetRange(_)), leaders, messageHandler)
+ logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
+ generatedRDDs += t -> new KafkaRDD[K, V, U, T, R](
+ context.sparkContext, kafkaParams, b.map(OffsetRange(_)), leaders, messageHandler)
}
}
}
+ /**
+ * A RateController to retrieve the rate from RateEstimator.
+ */
+ private[streaming] class DirectKafkaRateController(id: Int, estimator: RateEstimator)
+ extends RateController(id, estimator) {
+ override def publish(rate: Long): Unit = ()
+ }
}