aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorjunhao <junhao@mogujie.com>2016-02-16 19:43:17 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-02-16 19:43:17 -0800
commit7218c0eba957e0a079a407b79c3a050cce9647b2 (patch)
treefa8fdf3612c4c3d0ed36dcf131931a48d8108ee1 /streaming/src
parent5f37aad48cb729a80c4cc25347460f12aafec9fb (diff)
downloadspark-7218c0eba957e0a079a407b79c3a050cce9647b2.tar.gz
spark-7218c0eba957e0a079a407b79c3a050cce9647b2.tar.bz2
spark-7218c0eba957e0a079a407b79c3a050cce9647b2.zip
[SPARK-11627] Add initial input rate limit for spark streaming backpressure mechanism.
https://issues.apache.org/jira/browse/SPARK-11627 Spark Streaming backpressure mechanism has no initial input rate limit, it might cause OOM exception. In the firest batch task ,receivers receive data at the maximum speed they can reach,it might exhaust executors memory resources. Add a initial input rate limit value can make sure the Streaming job execute success in the first batch,then the backpressure mechanism can adjust receiving rate adaptively. Author: junhao <junhao@mogujie.com> Closes #9593 from junhaoMg/junhao-dev.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala9
1 files changed, 8 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
index bca1fbc8fd..6a1b672220 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
@@ -36,7 +36,7 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
// treated as an upper limit
private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue)
- private lazy val rateLimiter = GuavaRateLimiter.create(maxRateLimit.toDouble)
+ private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble)
def waitToPush() {
rateLimiter.acquire()
@@ -61,4 +61,11 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
rateLimiter.setRate(newRate)
}
}
+
+ /**
+ * Get the initial rateLimit to initial rateLimiter
+ */
+ private def getInitialRateLimit(): Long = {
+ math.min(conf.getLong("spark.streaming.backpressure.initialRate", maxRateLimit), maxRateLimit)
+ }
}