aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/configuration.md8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala9
2 files changed, 16 insertions, 1 deletions
diff --git a/docs/configuration.md b/docs/configuration.md
index 0dbfe3b079..a2c0dfe76c 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1471,6 +1471,14 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td><code>spark.streaming.backpressure.initialRate</code></td>
+ <td>not set</td>
+ <td>
+ This is the initial maximum receiving rate at which each receiver will receive data for the
+ first batch when the backpressure mechanism is enabled.
+ </td>
+</tr>
+<tr>
<td><code>spark.streaming.blockInterval</code></td>
<td>200ms</td>
<td>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
index bca1fbc8fd..6a1b672220 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
@@ -36,7 +36,7 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
// treated as an upper limit
private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue)
- private lazy val rateLimiter = GuavaRateLimiter.create(maxRateLimit.toDouble)
+ private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble)
def waitToPush() {
rateLimiter.acquire()
@@ -61,4 +61,11 @@ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
rateLimiter.setRate(newRate)
}
}
+
+ /**
+ * Get the initial rateLimit to initial rateLimiter
+ */
+ private def getInitialRateLimit(): Long = {
+ math.min(conf.getLong("spark.streaming.backpressure.initialRate", maxRateLimit), maxRateLimit)
+ }
}