aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorNilanjan Raychaudhuri <nraychaudhuri@gmail.com>2015-08-06 12:50:08 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-06 12:50:08 -0700
commita1bbf1bc5c51cd796015ac159799cf024de6fa07 (patch)
tree4e172c67f556291d4ad5604196e674321a6665ee /external
parent0d7aac99da660cc42eb5a9be8e262bd9bd8a770f (diff)
downloadspark-a1bbf1bc5c51cd796015ac159799cf024de6fa07.tar.gz
spark-a1bbf1bc5c51cd796015ac159799cf024de6fa07.tar.bz2
spark-a1bbf1bc5c51cd796015ac159799cf024de6fa07.zip
[SPARK-8978] [STREAMING] Implements the DirectKafkaRateController
Author: Dean Wampler <dean@concurrentthought.com> Author: Nilanjan Raychaudhuri <nraychaudhuri@gmail.com> Author: François Garillot <francois@garillot.net> Closes #7796 from dragos/topic/streaming-bp/kafka-direct and squashes the following commits: 50d1f21 [Nilanjan Raychaudhuri] Taking care of the remaining nits 648c8b1 [Dean Wampler] Refactored rate controller test to be more predictable and run faster. e43f678 [Nilanjan Raychaudhuri] fixing doc and nits ce19d2a [Dean Wampler] Removing an unreliable assertion. 9615320 [Dean Wampler] Give me a break... 6372478 [Dean Wampler] Found a few ways to make this test more robust... 9e69e37 [Dean Wampler] Attempt to fix flakey test that fails in CI, but not locally :( d3db1ea [Dean Wampler] Fixing stylecheck errors. d04a288 [Nilanjan Raychaudhuri] adding test to make sure rate controller is used to calculate maxMessagesPerPartition b6ecb67 [Nilanjan Raychaudhuri] Fixed styling issue 3110267 [Nilanjan Raychaudhuri] [SPARK-8978][Streaming] Implements the DirectKafkaRateController 393c580 [François Garillot] [SPARK-8978][Streaming] Implements the DirectKafkaRateController 51e78c6 [Nilanjan Raychaudhuri] Rename and fix build failure 2795509 [Nilanjan Raychaudhuri] Added missing RateController 19200f5 [Dean Wampler] Removed usage of infix notation. Changed a private variable name to be more consistent with usage. aa4a70b [François Garillot] [SPARK-8978][Streaming] Implements the DirectKafkaController
Diffstat (limited to 'external')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala47
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala89
2 files changed, 127 insertions, 9 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 48a1933d92..8a17707777 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -29,7 +29,8 @@ import org.apache.spark.{Logging, SparkException}
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
-import org.apache.spark.streaming.scheduler.StreamInputInfo
+import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
/**
* A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
@@ -61,7 +62,7 @@ class DirectKafkaInputDStream[
val kafkaParams: Map[String, String],
val fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
-) extends InputDStream[R](ssc_) with Logging {
+ ) extends InputDStream[R](ssc_) with Logging {
val maxRetries = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRetries", 1)
@@ -71,14 +72,35 @@ class DirectKafkaInputDStream[
protected[streaming] override val checkpointData =
new DirectKafkaInputDStreamCheckpointData
+
+ /**
+ * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
+ */
+ override protected[streaming] val rateController: Option[RateController] = {
+ if (RateController.isBackPressureEnabled(ssc.conf)) {
+ Some(new DirectKafkaRateController(id,
+ RateEstimator.create(ssc.conf, ssc_.graph.batchDuration)))
+ } else {
+ None
+ }
+ }
+
protected val kc = new KafkaCluster(kafkaParams)
- protected val maxMessagesPerPartition: Option[Long] = {
- val ratePerSec = context.sparkContext.getConf.getInt(
+ private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRatePerPartition", 0)
- if (ratePerSec > 0) {
+ protected def maxMessagesPerPartition: Option[Long] = {
+ val estimatedRateLimit = rateController.map(_.getLatestRate().toInt)
+ val numPartitions = currentOffsets.keys.size
+
+ val effectiveRateLimitPerPartition = estimatedRateLimit
+ .filter(_ > 0)
+ .map(limit => Math.min(maxRateLimitPerPartition, (limit / numPartitions)))
+ .getOrElse(maxRateLimitPerPartition)
+
+ if (effectiveRateLimitPerPartition > 0) {
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
- Some((secsPerBatch * ratePerSec).toLong)
+ Some((secsPerBatch * effectiveRateLimitPerPartition).toLong)
} else {
None
}
@@ -170,11 +192,18 @@ class DirectKafkaInputDStream[
val leaders = KafkaCluster.checkErrors(kc.findLeaders(topics))
batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
- logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
- generatedRDDs += t -> new KafkaRDD[K, V, U, T, R](
- context.sparkContext, kafkaParams, b.map(OffsetRange(_)), leaders, messageHandler)
+ logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
+ generatedRDDs += t -> new KafkaRDD[K, V, U, T, R](
+ context.sparkContext, kafkaParams, b.map(OffsetRange(_)), leaders, messageHandler)
}
}
}
+ /**
+ * A RateController to retrieve the rate from RateEstimator.
+ */
+ private[streaming] class DirectKafkaRateController(id: Int, estimator: RateEstimator)
+ extends RateController(id, estimator) {
+ override def publish(rate: Long): Unit = ()
+ }
}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index 5b3c79444a..02225d5aa7 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -20,6 +20,9 @@ package org.apache.spark.streaming.kafka
import java.io.File
import java.util.concurrent.atomic.AtomicLong
+import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
@@ -350,6 +353,77 @@ class DirectKafkaStreamSuite
ssc.stop()
}
+ test("using rate controller") {
+ val topic = "backpressure"
+ val topicPartition = TopicAndPartition(topic, 0)
+ kafkaTestUtils.createTopic(topic)
+ val kafkaParams = Map(
+ "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
+ "auto.offset.reset" -> "smallest"
+ )
+
+ val batchIntervalMilliseconds = 100
+ val estimator = new ConstantEstimator(100)
+ val messageKeys = (1 to 200).map(_.toString)
+ val messages = messageKeys.map((_, 1)).toMap
+
+ val sparkConf = new SparkConf()
+ // Safe, even with streaming, because we're using the direct API.
+ // Using 1 core is useful to make the test more predictable.
+ .setMaster("local[1]")
+ .setAppName(this.getClass.getSimpleName)
+ .set("spark.streaming.kafka.maxRatePerPartition", "100")
+
+ // Setup the streaming context
+ ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds))
+
+ val kafkaStream = withClue("Error creating direct stream") {
+ val kc = new KafkaCluster(kafkaParams)
+ val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
+ val m = kc.getEarliestLeaderOffsets(Set(topicPartition))
+ .fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset))
+
+ new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)](
+ ssc, kafkaParams, m, messageHandler) {
+ override protected[streaming] val rateController =
+ Some(new DirectKafkaRateController(id, estimator))
+ }
+ }
+
+ val collectedData =
+ new mutable.ArrayBuffer[Array[String]]() with mutable.SynchronizedBuffer[Array[String]]
+
+ // Used for assertion failure messages.
+ def dataToString: String =
+ collectedData.map(_.mkString("[", ",", "]")).mkString("{", ", ", "}")
+
+ // This is to collect the raw data received from Kafka
+ kafkaStream.foreachRDD { (rdd: RDD[(String, String)], time: Time) =>
+ val data = rdd.map { _._2 }.collect()
+ collectedData += data
+ }
+
+ ssc.start()
+
+ // Try different rate limits.
+ // Send data to Kafka and wait for arrays of data to appear matching the rate.
+ Seq(100, 50, 20).foreach { rate =>
+ collectedData.clear() // Empty this buffer on each pass.
+ estimator.updateRate(rate) // Set a new rate.
+ // Expect blocks of data equal to "rate", scaled by the interval length in secs.
+ val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001)
+ kafkaTestUtils.sendMessages(topic, messages)
+ eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) {
+ // Assert that rate estimator values are used to determine maxMessagesPerPartition.
+ // Funky "-" in message makes the complete assertion message read better.
+ assert(collectedData.exists(_.size == expectedSize),
+ s" - No arrays of size $expectedSize for rate $rate found in $dataToString")
+ }
+ }
+
+ ssc.stop()
+ }
+
/** Get the generated offset ranges from the DirectKafkaStream */
private def getOffsetRanges[K, V](
kafkaStream: DStream[(K, V)]): Seq[(Time, Array[OffsetRange])] = {
@@ -381,3 +455,18 @@ object DirectKafkaStreamSuite {
}
}
}
+
+private[streaming] class ConstantEstimator(@volatile private var rate: Long)
+ extends RateEstimator {
+
+ def updateRate(newRate: Long): Unit = {
+ rate = newRate
+ }
+
+ def compute(
+ time: Long,
+ elements: Long,
+ processingDelay: Long,
+ schedulingDelay: Long): Option[Double] = Some(rate)
+}
+