diff options
author | junhao <junhao@mogujie.com> | 2016-02-16 19:43:17 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-02-16 19:43:17 -0800 |
commit | 7218c0eba957e0a079a407b79c3a050cce9647b2 (patch) | |
tree | fa8fdf3612c4c3d0ed36dcf131931a48d8108ee1 /streaming/src | |
parent | 5f37aad48cb729a80c4cc25347460f12aafec9fb (diff) | |
download | spark-7218c0eba957e0a079a407b79c3a050cce9647b2.tar.gz spark-7218c0eba957e0a079a407b79c3a050cce9647b2.tar.bz2 spark-7218c0eba957e0a079a407b79c3a050cce9647b2.zip |
[SPARK-11627] Add initial input rate limit for spark streaming backpressure mechanism.
https://issues.apache.org/jira/browse/SPARK-11627
Spark Streaming backpressure mechanism has no initial input rate limit, it might cause OOM exception.
In the firest batch task ,receivers receive data at the maximum speed they can reach,it might exhaust executors memory resources. Add a initial input rate limit value can make sure the Streaming job execute success in the first batch,then the backpressure mechanism can adjust receiving rate adaptively.
Author: junhao <junhao@mogujie.com>
Closes #9593 from junhaoMg/junhao-dev.
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala | 9 |
1 files changed, 8 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala index bca1fbc8fd..6a1b672220 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -36,7 +36,7 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { // treated as an upper limit private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue) - private lazy val rateLimiter = GuavaRateLimiter.create(maxRateLimit.toDouble) + private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble) def waitToPush() { rateLimiter.acquire() @@ -61,4 +61,11 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { rateLimiter.setRate(newRate) } } + + /** + * Get the initial rateLimit to initial rateLimiter + */ + private def getInitialRateLimit(): Long = { + math.min(conf.getLong("spark.streaming.backpressure.initialRate", maxRateLimit), maxRateLimit) + } } |