aboutsummaryrefslogtreecommitdiff
path: root/project
diff options
context:
space:
mode:
authorJason White <jason.white@shopify.com>2016-03-04 16:04:56 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-03-04 16:04:56 -0800
commitf19228eed89cf8e22a07a7ef7f37a5f6f8a3d455 (patch)
tree01133c1c61c350ed602abfe8b31bb4907248bd78 /project
parenta6e2bd31f52f9e9452e52ab5b846de3dee8b98a7 (diff)
downloadspark-f19228eed89cf8e22a07a7ef7f37a5f6f8a3d455.tar.gz
spark-f19228eed89cf8e22a07a7ef7f37a5f6f8a3d455.tar.bz2
spark-f19228eed89cf8e22a07a7ef7f37a5f6f8a3d455.zip
[SPARK-12073][STREAMING] backpressure rate controller consumes events preferentially from lagg…
…ing partitions I'm pretty sure this is the reason we couldn't easily recover from an unbalanced Kafka partition under heavy load when using backpressure. `maxMessagesPerPartition` calculates an appropriate limit for the message rate from all partitions, and then divides by the number of partitions to determine how many messages to retrieve per partition. The problem with this approach is that when one partition is behind by millions of records (due to random Kafka issues), but the rate estimator calculates only 100k total messages can be retrieved, each partition (out of say 32) only retrieves max 100k/32=3125 messages. This PR (still needing a test) determines a per-partition desired message count by using the current lag for each partition to preferentially weight the total message limit among the partitions. In this situation, if each partition gets 1k messages, but 1 partition starts 1M behind, then the total number of messages to retrieve is (32 * 1k + 1M) = 1032000 messages, of which the one partition needs 1001000. So, it gets (1001000 / 1032000) = 97% of the 100k messages, and the other 31 partitions share the remaining 3%. Assuming all of 100k the messages are retrieved and processed within the batch window, the rate calculator will increase the number of messages to retrieve in the next batch, until it reaches a new stable point or the backlog is finished processed. We're going to try deploying this internally at Shopify to see if this resolves our issue. tdas koeninger holdenk Author: Jason White <jason.white@shopify.com> Closes #10089 from JasonMWhite/rate_controller_offsets.
Diffstat (limited to 'project')
-rw-r--r--project/MimaExcludes.scala4
1 files changed, 4 insertions, 0 deletions
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 9ce37fc753..983f71684c 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -288,6 +288,10 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLConf$SQLConfEntry"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLConf$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.SQLConf$SQLConfEntry$")
+ ) ++ Seq(
+ // SPARK-12073: backpressure rate controller consumes events preferentially from lagging partitions
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.kafka.KafkaTestUtils.createTopic"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.kafka.DirectKafkaInputDStream.maxMessagesPerPartition")
)
case v if v.startsWith("1.6") =>
Seq(