aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-08-14 15:10:01 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-14 15:10:01 -0700
commitf3bfb711c1742d0915e43bda8230b4d1d22b4190 (patch)
treec6339008ed647f134b8fcca88e78afab81e865c0 /streaming
parent1150a19b188a075166899fdb1e107b2ba1e505d8 (diff)
downloadspark-f3bfb711c1742d0915e43bda8230b4d1d22b4190.tar.gz
spark-f3bfb711c1742d0915e43bda8230b4d1d22b4190.tar.bz2
spark-f3bfb711c1742d0915e43bda8230b4d1d22b4190.zip
[SPARK-9966] [STREAMING] Handle couple of corner cases in PIDRateEstimator
1. The rate estimator should not estimate any rate when there are no records in the batch, as there is no data to estimate the rate. In the current state, it estimates and set the rate to zero. That is incorrect. 2. The rate estimator should not never set the rate to zero under any circumstances. Otherwise the system will stop receiving data, and stop generating useful estimates (see reason 1). So the fix is to define a parameters that sets a lower bound on the estimated rate, so that the system always receives some data. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #8199 from tdas/SPARK-9966 and squashes the following commits: 829f793 [Tathagata Das] Fixed unit test and added comments 3a994db [Tathagata Das] Added min rate and updated tests in PIDRateEstimator
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala46
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimatorSuite.scala79
3 files changed, 87 insertions, 42 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala
index 6ae56a68ad..84a3ca9d74 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala
@@ -17,6 +17,8 @@
package org.apache.spark.streaming.scheduler.rate
+import org.apache.spark.Logging
+
/**
* Implements a proportional-integral-derivative (PID) controller which acts on
* the speed of ingestion of elements into Spark Streaming. A PID controller works
@@ -26,7 +28,7 @@ package org.apache.spark.streaming.scheduler.rate
*
* @see https://en.wikipedia.org/wiki/PID_controller
*
- * @param batchDurationMillis the batch duration, in milliseconds
+ * @param batchIntervalMillis the batch duration, in milliseconds
* @param proportional how much the correction should depend on the current
* error. This term usually provides the bulk of correction and should be positive or zero.
* A value too large would make the controller overshoot the setpoint, while a small value
@@ -39,13 +41,17 @@ package org.apache.spark.streaming.scheduler.rate
* of future errors, based on current rate of change. This value should be positive or 0.
* This term is not used very often, as it impacts stability of the system. The default
* value is 0.
+ * @param minRate what is the minimum rate that can be estimated.
+ * This must be greater than zero, so that the system always receives some data for rate
+ * estimation to work.
*/
private[streaming] class PIDRateEstimator(
batchIntervalMillis: Long,
- proportional: Double = 1D,
- integral: Double = .2D,
- derivative: Double = 0D)
- extends RateEstimator {
+ proportional: Double,
+ integral: Double,
+ derivative: Double,
+ minRate: Double
+ ) extends RateEstimator with Logging {
private var firstRun: Boolean = true
private var latestTime: Long = -1L
@@ -64,16 +70,23 @@ private[streaming] class PIDRateEstimator(
require(
derivative >= 0,
s"Derivative term $derivative in PIDRateEstimator should be >= 0.")
+ require(
+ minRate > 0,
+ s"Minimum rate in PIDRateEstimator should be > 0")
+ logInfo(s"Created PIDRateEstimator with proportional = $proportional, integral = $integral, " +
+ s"derivative = $derivative, min rate = $minRate")
- def compute(time: Long, // in milliseconds
+ def compute(
+ time: Long, // in milliseconds
numElements: Long,
processingDelay: Long, // in milliseconds
schedulingDelay: Long // in milliseconds
): Option[Double] = {
-
+ logTrace(s"\ntime = $time, # records = $numElements, " +
+ s"processing time = $processingDelay, scheduling delay = $schedulingDelay")
this.synchronized {
- if (time > latestTime && processingDelay > 0 && batchIntervalMillis > 0) {
+ if (time > latestTime && numElements > 0 && processingDelay > 0) {
// in seconds, should be close to batchDuration
val delaySinceUpdate = (time - latestTime).toDouble / 1000
@@ -104,21 +117,30 @@ private[streaming] class PIDRateEstimator(
val newRate = (latestRate - proportional * error -
integral * historicalError -
- derivative * dError).max(0.0)
+ derivative * dError).max(minRate)
+ logTrace(s"""
+ | latestRate = $latestRate, error = $error
+ | latestError = $latestError, historicalError = $historicalError
+ | delaySinceUpdate = $delaySinceUpdate, dError = $dError
+ """.stripMargin)
+
latestTime = time
if (firstRun) {
latestRate = processingRate
latestError = 0D
firstRun = false
-
+ logTrace("First run, rate estimation skipped")
None
} else {
latestRate = newRate
latestError = error
-
+ logTrace(s"New rate = $newRate")
Some(newRate)
}
- } else None
+ } else {
+ logTrace("Rate estimation skipped")
+ None
+ }
}
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
index 17ccebc1ed..d7210f64fc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
@@ -18,7 +18,6 @@
package org.apache.spark.streaming.scheduler.rate
import org.apache.spark.SparkConf
-import org.apache.spark.SparkException
import org.apache.spark.streaming.Duration
/**
@@ -61,7 +60,8 @@ object RateEstimator {
val proportional = conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0)
val integral = conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2)
val derived = conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0)
- new PIDRateEstimator(batchInterval.milliseconds, proportional, integral, derived)
+ val minRate = conf.getDouble("spark.streaming.backpressure.pid.minRate", 100)
+ new PIDRateEstimator(batchInterval.milliseconds, proportional, integral, derived, minRate)
case estimator =>
throw new IllegalArgumentException(s"Unkown rate estimator: $estimator")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimatorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimatorSuite.scala
index 97c32d8f2d..a1af95be81 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimatorSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimatorSuite.scala
@@ -36,72 +36,89 @@ class PIDRateEstimatorSuite extends SparkFunSuite with Matchers {
test("estimator checks ranges") {
intercept[IllegalArgumentException] {
- new PIDRateEstimator(0, 1, 2, 3)
+ new PIDRateEstimator(batchIntervalMillis = 0, 1, 2, 3, 10)
}
intercept[IllegalArgumentException] {
- new PIDRateEstimator(100, -1, 2, 3)
+ new PIDRateEstimator(100, proportional = -1, 2, 3, 10)
}
intercept[IllegalArgumentException] {
- new PIDRateEstimator(100, 0, -1, 3)
+ new PIDRateEstimator(100, 0, integral = -1, 3, 10)
}
intercept[IllegalArgumentException] {
- new PIDRateEstimator(100, 0, 0, -1)
+ new PIDRateEstimator(100, 0, 0, derivative = -1, 10)
+ }
+ intercept[IllegalArgumentException] {
+ new PIDRateEstimator(100, 0, 0, 0, minRate = 0)
+ }
+ intercept[IllegalArgumentException] {
+ new PIDRateEstimator(100, 0, 0, 0, minRate = -10)
}
}
- private def createDefaultEstimator: PIDRateEstimator = {
- new PIDRateEstimator(20, 1D, 0D, 0D)
- }
-
- test("first bound is None") {
- val p = createDefaultEstimator
+ test("first estimate is None") {
+ val p = createDefaultEstimator()
p.compute(0, 10, 10, 0) should equal(None)
}
- test("second bound is rate") {
- val p = createDefaultEstimator
+ test("second estimate is not None") {
+ val p = createDefaultEstimator()
p.compute(0, 10, 10, 0)
// 1000 elements / s
p.compute(10, 10, 10, 0) should equal(Some(1000))
}
- test("works even with no time between updates") {
- val p = createDefaultEstimator
+ test("no estimate when no time difference between successive calls") {
+ val p = createDefaultEstimator()
+ p.compute(0, 10, 10, 0)
+ p.compute(time = 10, 10, 10, 0) shouldNot equal(None)
+ p.compute(time = 10, 10, 10, 0) should equal(None)
+ }
+
+ test("no estimate when no records in previous batch") {
+ val p = createDefaultEstimator()
p.compute(0, 10, 10, 0)
- p.compute(10, 10, 10, 0)
- p.compute(10, 10, 10, 0) should equal(None)
+ p.compute(10, numElements = 0, 10, 0) should equal(None)
+ p.compute(20, numElements = -10, 10, 0) should equal(None)
}
- test("bound is never negative") {
- val p = new PIDRateEstimator(20, 1D, 1D, 0D)
+ test("no estimate when there is no processing delay") {
+ val p = createDefaultEstimator()
+ p.compute(0, 10, 10, 0)
+ p.compute(10, 10, processingDelay = 0, 0) should equal(None)
+ p.compute(20, 10, processingDelay = -10, 0) should equal(None)
+ }
+
+ test("estimate is never less than min rate") {
+ val minRate = 5D
+ val p = new PIDRateEstimator(20, 1D, 1D, 0D, minRate)
// prepare a series of batch updates, one every 20ms, 0 processed elements, 2ms of processing
// this might point the estimator to try and decrease the bound, but we test it never
- // goes below zero, which would be nonsensical.
+ // goes below the min rate, which would be nonsensical.
val times = List.tabulate(50)(x => x * 20) // every 20ms
- val elements = List.fill(50)(0) // no processing
+ val elements = List.fill(50)(1) // no processing
val proc = List.fill(50)(20) // 20ms of processing
val sched = List.fill(50)(100) // strictly positive accumulation
val res = for (i <- List.range(0, 50)) yield p.compute(times(i), elements(i), proc(i), sched(i))
res.head should equal(None)
- res.tail should equal(List.fill(49)(Some(0D)))
+ res.tail should equal(List.fill(49)(Some(minRate)))
}
test("with no accumulated or positive error, |I| > 0, follow the processing speed") {
- val p = new PIDRateEstimator(20, 1D, 1D, 0D)
+ val p = new PIDRateEstimator(20, 1D, 1D, 0D, 10)
// prepare a series of batch updates, one every 20ms with an increasing number of processed
// elements in each batch, but constant processing time, and no accumulated error. Even though
// the integral part is non-zero, the estimated rate should follow only the proportional term
val times = List.tabulate(50)(x => x * 20) // every 20ms
- val elements = List.tabulate(50)(x => x * 20) // increasing
+ val elements = List.tabulate(50)(x => (x + 1) * 20) // increasing
val proc = List.fill(50)(20) // 20ms of processing
val sched = List.fill(50)(0)
val res = for (i <- List.range(0, 50)) yield p.compute(times(i), elements(i), proc(i), sched(i))
res.head should equal(None)
- res.tail should equal(List.tabulate(50)(x => Some(x * 1000D)).tail)
+ res.tail should equal(List.tabulate(50)(x => Some((x + 1) * 1000D)).tail)
}
test("with no accumulated but some positive error, |I| > 0, follow the processing speed") {
- val p = new PIDRateEstimator(20, 1D, 1D, 0D)
+ val p = new PIDRateEstimator(20, 1D, 1D, 0D, 10)
// prepare a series of batch updates, one every 20ms with an decreasing number of processed
// elements in each batch, but constant processing time, and no accumulated error. Even though
// the integral part is non-zero, the estimated rate should follow only the proportional term,
@@ -116,13 +133,14 @@ class PIDRateEstimatorSuite extends SparkFunSuite with Matchers {
}
test("with some accumulated and some positive error, |I| > 0, stay below the processing speed") {
- val p = new PIDRateEstimator(20, 1D, .01D, 0D)
+ val minRate = 10D
+ val p = new PIDRateEstimator(20, 1D, .01D, 0D, minRate)
val times = List.tabulate(50)(x => x * 20) // every 20ms
val rng = new Random()
- val elements = List.tabulate(50)(x => rng.nextInt(1000))
+ val elements = List.tabulate(50)(x => rng.nextInt(1000) + 1000)
val procDelayMs = 20
val proc = List.fill(50)(procDelayMs) // 20ms of processing
- val sched = List.tabulate(50)(x => rng.nextInt(19)) // random wait
+ val sched = List.tabulate(50)(x => rng.nextInt(19) + 1) // random wait
val speeds = elements map ((x) => x.toDouble / procDelayMs * 1000)
val res = for (i <- List.range(0, 50)) yield p.compute(times(i), elements(i), proc(i), sched(i))
@@ -131,7 +149,12 @@ class PIDRateEstimatorSuite extends SparkFunSuite with Matchers {
res(n) should not be None
if (res(n).get > 0 && sched(n) > 0) {
res(n).get should be < speeds(n)
+ res(n).get should be >= minRate
}
}
}
+
+ private def createDefaultEstimator(): PIDRateEstimator = {
+ new PIDRateEstimator(20, 1D, 0D, 0D, 10)
+ }
}