aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala9
1 files changed, 8 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
index bca1fbc8fd..6a1b672220 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
@@ -36,7 +36,7 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
// treated as an upper limit
private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue)
- private lazy val rateLimiter = GuavaRateLimiter.create(maxRateLimit.toDouble)
+ private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble)
def waitToPush() {
rateLimiter.acquire()
@@ -61,4 +61,11 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
rateLimiter.setRate(newRate)
}
}
+
+ /**
+ * Get the initial rateLimit to initial rateLimiter
+ */
+ private def getInitialRateLimit(): Long = {
+ math.min(conf.getLong("spark.streaming.backpressure.initialRate", maxRateLimit), maxRateLimit)
+ }
}