diff options
-rw-r--r-- | docs/configuration.md | 8 | ||||
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala | 9 |
2 files changed, 16 insertions, 1 deletions
diff --git a/docs/configuration.md b/docs/configuration.md index 0dbfe3b079..a2c0dfe76c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1471,6 +1471,14 @@ Apart from these, the following properties are also available, and may be useful </td> </tr> <tr> + <td><code>spark.streaming.backpressure.initialRate</code></td> + <td>not set</td> + <td> + This is the initial maximum receiving rate at which each receiver will receive data for the + first batch when the backpressure mechanism is enabled. + </td> +</tr> +<tr> <td><code>spark.streaming.blockInterval</code></td> <td>200ms</td> <td> diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala index bca1fbc8fd..6a1b672220 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala @@ -36,7 +36,7 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { // treated as an upper limit private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue) - private lazy val rateLimiter = GuavaRateLimiter.create(maxRateLimit.toDouble) + private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble) def waitToPush() { rateLimiter.acquire() @@ -61,4 +61,11 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging { rateLimiter.setRate(newRate) } } + + /** + * Get the initial rateLimit to initial rateLimiter + */ + private def getInitialRateLimit(): Long = { + math.min(conf.getLong("spark.streaming.backpressure.initialRate", maxRateLimit), maxRateLimit) + } } |