aboutsummaryrefslogtreecommitdiff
path: root/external/kafka
diff options
context:
space:
mode:
authorJason White <jason.white@shopify.com>2016-03-04 16:04:56 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-03-04 16:04:56 -0800
commitf19228eed89cf8e22a07a7ef7f37a5f6f8a3d455 (patch)
tree01133c1c61c350ed602abfe8b31bb4907248bd78 /external/kafka
parenta6e2bd31f52f9e9452e52ab5b846de3dee8b98a7 (diff)
downloadspark-f19228eed89cf8e22a07a7ef7f37a5f6f8a3d455.tar.gz
spark-f19228eed89cf8e22a07a7ef7f37a5f6f8a3d455.tar.bz2
spark-f19228eed89cf8e22a07a7ef7f37a5f6f8a3d455.zip
[SPARK-12073][STREAMING] backpressure rate controller consumes events preferentially from lagg…
…ing partitions I'm pretty sure this is the reason we couldn't easily recover from an unbalanced Kafka partition under heavy load when using backpressure. `maxMessagesPerPartition` calculates an appropriate limit for the message rate from all partitions, and then divides by the number of partitions to determine how many messages to retrieve per partition. The problem with this approach is that when one partition is behind by millions of records (due to random Kafka issues), but the rate estimator calculates only 100k total messages can be retrieved, each partition (out of say 32) only retrieves max 100k/32=3125 messages. This PR (still needing a test) determines a per-partition desired message count by using the current lag for each partition to preferentially weight the total message limit among the partitions. In this situation, if each partition gets 1k messages, but 1 partition starts 1M behind, then the total number of messages to retrieve is (32 * 1k + 1M) = 1032000 messages, of which the one partition needs 1001000. So, it gets (1001000 / 1032000) = 97% of the 100k messages, and the other 31 partitions share the remaining 3%. Assuming all of 100k the messages are retrieved and processed within the batch window, the rate calculator will increase the number of messages to retrieve in the next batch, until it reaches a new stable point or the backlog is finished processed. We're going to try deploying this internally at Shopify to see if this resolves our issue. tdas koeninger holdenk Author: Jason White <jason.white@shopify.com> Closes #10089 from JasonMWhite/rate_controller_offsets.
Diffstat (limited to 'external/kafka')
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala44
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala9
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java2
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java2
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java2
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala68
6 files changed, 97 insertions, 30 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 54d8c8b03f..0eaaf408c0 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -89,23 +89,32 @@ class DirectKafkaInputDStream[
private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRatePerPartition", 0)
- protected def maxMessagesPerPartition: Option[Long] = {
+
+ protected[streaming] def maxMessagesPerPartition(
+ offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, Long]] = {
val estimatedRateLimit = rateController.map(_.getLatestRate().toInt)
- val numPartitions = currentOffsets.keys.size
-
- val effectiveRateLimitPerPartition = estimatedRateLimit
- .filter(_ > 0)
- .map { limit =>
- if (maxRateLimitPerPartition > 0) {
- Math.min(maxRateLimitPerPartition, (limit / numPartitions))
- } else {
- limit / numPartitions
+
+ // calculate a per-partition rate limit based on current lag
+ val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match {
+ case Some(rate) =>
+ val lagPerPartition = offsets.map { case (tp, offset) =>
+ tp -> Math.max(offset - currentOffsets(tp), 0)
+ }
+ val totalLag = lagPerPartition.values.sum
+
+ lagPerPartition.map { case (tp, lag) =>
+ val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
+ tp -> (if (maxRateLimitPerPartition > 0) {
+ Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate)
}
- }.getOrElse(maxRateLimitPerPartition)
+ case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition }
+ }
- if (effectiveRateLimitPerPartition > 0) {
+ if (effectiveRateLimitPerPartition.values.sum > 0) {
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
- Some((secsPerBatch * effectiveRateLimitPerPartition).toLong)
+ Some(effectiveRateLimitPerPartition.map {
+ case (tp, limit) => tp -> (secsPerBatch * limit).toLong
+ })
} else {
None
}
@@ -134,9 +143,12 @@ class DirectKafkaInputDStream[
// limits the maximum number of messages per partition
protected def clamp(
leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = {
- maxMessagesPerPartition.map { mmp =>
- leaderOffsets.map { case (tp, lo) =>
- tp -> lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset))
+ val offsets = leaderOffsets.mapValues(lo => lo.offset)
+
+ maxMessagesPerPartition(offsets).map { mmp =>
+ mmp.map { case (tp, messages) =>
+ val lo = leaderOffsets(tp)
+ tp -> lo.copy(offset = Math.min(currentOffsets(tp) + messages, lo.offset))
}
}.getOrElse(leaderOffsets)
}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
index a76fa6671a..a5ea1d6d28 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
@@ -152,12 +152,15 @@ private[kafka] class KafkaTestUtils extends Logging {
}
/** Create a Kafka topic and wait until it is propagated to the whole cluster */
- def createTopic(topic: String): Unit = {
- AdminUtils.createTopic(zkClient, topic, 1, 1)
+ def createTopic(topic: String, partitions: Int): Unit = {
+ AdminUtils.createTopic(zkClient, topic, partitions, 1)
// wait until metadata is propagated
- waitUntilMetadataIsPropagated(topic, 0)
+ (0 until partitions).foreach { p => waitUntilMetadataIsPropagated(topic, p) }
}
+ /** Single-argument version for backwards compatibility */
+ def createTopic(topic: String): Unit = createTopic(topic, 1)
+
/** Java-friendly function for sending messages to the Kafka broker */
def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
sendMessages(topic, Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*))
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
index 4891e4f4a1..fa6b0dbc8c 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
@@ -168,7 +168,7 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
private String[] createTopicAndSendData(String topic) {
String[] data = { topic + "-1", topic + "-2", topic + "-3"};
- kafkaTestUtils.createTopic(topic);
+ kafkaTestUtils.createTopic(topic, 1);
kafkaTestUtils.sendMessages(topic, data);
return data;
}
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
index afcc6cfccd..c41b6297b0 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java
@@ -149,7 +149,7 @@ public class JavaKafkaRDDSuite implements Serializable {
private String[] createTopicAndSendData(String topic) {
String[] data = { topic + "-1", topic + "-2", topic + "-3"};
- kafkaTestUtils.createTopic(topic);
+ kafkaTestUtils.createTopic(topic, 1);
kafkaTestUtils.sendMessages(topic, data);
return data;
}
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 617c92a008..868df64e8c 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -76,7 +76,7 @@ public class JavaKafkaStreamSuite implements Serializable {
sent.put("b", 3);
sent.put("c", 10);
- kafkaTestUtils.createTopic(topic);
+ kafkaTestUtils.createTopic(topic, 1);
kafkaTestUtils.sendMessages(topic, sent);
Map<String, String> kafkaParams = new HashMap<>();
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index 8398178e9b..b2c81d1534 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -353,10 +353,38 @@ class DirectKafkaStreamSuite
ssc.stop()
}
+ test("maxMessagesPerPartition with backpressure disabled") {
+ val topic = "maxMessagesPerPartition"
+ val kafkaStream = getDirectKafkaStream(topic, None)
+
+ val input = Map(TopicAndPartition(topic, 0) -> 50L, TopicAndPartition(topic, 1) -> 50L)
+ assert(kafkaStream.maxMessagesPerPartition(input).get ==
+ Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 10L))
+ }
+
+ test("maxMessagesPerPartition with no lag") {
+ val topic = "maxMessagesPerPartition"
+ val rateController = Some(new ConstantRateController(0, new ConstantEstimator(100), 100))
+ val kafkaStream = getDirectKafkaStream(topic, rateController)
+
+ val input = Map(TopicAndPartition(topic, 0) -> 0L, TopicAndPartition(topic, 1) -> 0L)
+ assert(kafkaStream.maxMessagesPerPartition(input).isEmpty)
+ }
+
+ test("maxMessagesPerPartition respects max rate") {
+ val topic = "maxMessagesPerPartition"
+ val rateController = Some(new ConstantRateController(0, new ConstantEstimator(100), 1000))
+ val kafkaStream = getDirectKafkaStream(topic, rateController)
+
+ val input = Map(TopicAndPartition(topic, 0) -> 1000L, TopicAndPartition(topic, 1) -> 1000L)
+ assert(kafkaStream.maxMessagesPerPartition(input).get ==
+ Map(TopicAndPartition(topic, 0) -> 10L, TopicAndPartition(topic, 1) -> 10L))
+ }
+
test("using rate controller") {
val topic = "backpressure"
- val topicPartition = TopicAndPartition(topic, 0)
- kafkaTestUtils.createTopic(topic)
+ val topicPartitions = Set(TopicAndPartition(topic, 0), TopicAndPartition(topic, 1))
+ kafkaTestUtils.createTopic(topic, 2)
val kafkaParams = Map(
"metadata.broker.list" -> kafkaTestUtils.brokerAddress,
"auto.offset.reset" -> "smallest"
@@ -364,8 +392,8 @@ class DirectKafkaStreamSuite
val batchIntervalMilliseconds = 100
val estimator = new ConstantEstimator(100)
- val messageKeys = (1 to 200).map(_.toString)
- val messages = messageKeys.map((_, 1)).toMap
+ val messages = Map("foo" -> 200)
+ kafkaTestUtils.sendMessages(topic, messages)
val sparkConf = new SparkConf()
// Safe, even with streaming, because we're using the direct API.
@@ -380,11 +408,11 @@ class DirectKafkaStreamSuite
val kafkaStream = withClue("Error creating direct stream") {
val kc = new KafkaCluster(kafkaParams)
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
- val m = kc.getEarliestLeaderOffsets(Set(topicPartition))
+ val m = kc.getEarliestLeaderOffsets(topicPartitions)
.fold(e => Map.empty[TopicAndPartition, Long], m => m.mapValues(lo => lo.offset))
new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)](
- ssc, kafkaParams, m, messageHandler) {
+ ssc, kafkaParams, m, messageHandler) {
override protected[streaming] val rateController =
Some(new DirectKafkaRateController(id, estimator))
}
@@ -405,13 +433,12 @@ class DirectKafkaStreamSuite
ssc.start()
// Try different rate limits.
- // Send data to Kafka and wait for arrays of data to appear matching the rate.
+ // Wait for arrays of data to appear matching the rate.
Seq(100, 50, 20).foreach { rate =>
collectedData.clear() // Empty this buffer on each pass.
estimator.updateRate(rate) // Set a new rate.
// Expect blocks of data equal to "rate", scaled by the interval length in secs.
val expectedSize = Math.round(rate * batchIntervalMilliseconds * 0.001)
- kafkaTestUtils.sendMessages(topic, messages)
eventually(timeout(5.seconds), interval(batchIntervalMilliseconds.milliseconds)) {
// Assert that rate estimator values are used to determine maxMessagesPerPartition.
// Funky "-" in message makes the complete assertion message read better.
@@ -430,6 +457,25 @@ class DirectKafkaStreamSuite
rdd.asInstanceOf[KafkaRDD[K, V, _, _, (K, V)]].offsetRanges
}.toSeq.sortBy { _._1 }
}
+
+ private def getDirectKafkaStream(topic: String, mockRateController: Option[RateController]) = {
+ val batchIntervalMilliseconds = 100
+
+ val sparkConf = new SparkConf()
+ .setMaster("local[1]")
+ .setAppName(this.getClass.getSimpleName)
+ .set("spark.streaming.kafka.maxRatePerPartition", "100")
+
+ // Setup the streaming context
+ ssc = new StreamingContext(sparkConf, Milliseconds(batchIntervalMilliseconds))
+
+ val earliestOffsets = Map(TopicAndPartition(topic, 0) -> 0L, TopicAndPartition(topic, 1) -> 0L)
+ val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
+ new DirectKafkaInputDStream[String, String, StringDecoder, StringDecoder, (String, String)](
+ ssc, Map[String, String](), earliestOffsets, messageHandler) {
+ override protected[streaming] val rateController = mockRateController
+ }
+ }
}
object DirectKafkaStreamSuite {
@@ -468,3 +514,9 @@ private[streaming] class ConstantEstimator(@volatile private var rate: Long)
processingDelay: Long,
schedulingDelay: Long): Option[Double] = Some(rate)
}
+
+private[streaming] class ConstantRateController(id: Int, estimator: RateEstimator, rate: Long)
+ extends RateController(id, estimator) {
+ override def publish(rate: Long): Unit = ()
+ override def getLatestRate(): Long = rate
+}