aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorIulian Dragos <jaguarul@gmail.com>2015-07-31 12:04:03 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-07-31 12:04:03 -0700
commit0a1d2ca42c8b31d6b0e70163795f0185d4622f87 (patch)
treed453ae5039eeccfba70958dadab2c98766126ede /streaming/src/test
parente8bdcdeabb2df139a656f86686cdb53c891b1f4b (diff)
downloadspark-0a1d2ca42c8b31d6b0e70163795f0185d4622f87.tar.gz
spark-0a1d2ca42c8b31d6b0e70163795f0185d4622f87.tar.bz2
spark-0a1d2ca42c8b31d6b0e70163795f0185d4622f87.zip
[SPARK-8979] Add a PID based rate estimator
Based on #7600 /cc tdas Author: Iulian Dragos <jaguarul@gmail.com> Author: François Garillot <francois@garillot.net> Closes #7648 from dragos/topic/streaming-bp/pid and squashes the following commits: aa5b097 [Iulian Dragos] Add more comments, made all PID constant parameters positive, a couple more tests. 93b74f8 [Iulian Dragos] Better explanation of historicalError. 7975b0c [Iulian Dragos] Add configuration for PID. 26cfd78 [Iulian Dragos] A couple of variable renames. d0bdf7c [Iulian Dragos] Update to latest version of the code, various style and name improvements. d58b845 [François Garillot] [SPARK-8979][Streaming] Implements a PIDRateEstimator
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimatorSuite.scala137
1 files changed, 137 insertions, 0 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimatorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimatorSuite.scala
new file mode 100644
index 0000000000..97c32d8f2d
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimatorSuite.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler.rate
+
+import scala.util.Random
+
+import org.scalatest.Inspectors.forAll
+import org.scalatest.Matchers
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.streaming.Seconds
+
+class PIDRateEstimatorSuite extends SparkFunSuite with Matchers {
+
+ test("the right estimator is created") {
+ val conf = new SparkConf
+ conf.set("spark.streaming.backpressure.rateEstimator", "pid")
+ val pid = RateEstimator.create(conf, Seconds(1))
+ pid.getClass should equal(classOf[PIDRateEstimator])
+ }
+
+ test("estimator checks ranges") {
+ intercept[IllegalArgumentException] {
+ new PIDRateEstimator(0, 1, 2, 3)
+ }
+ intercept[IllegalArgumentException] {
+ new PIDRateEstimator(100, -1, 2, 3)
+ }
+ intercept[IllegalArgumentException] {
+ new PIDRateEstimator(100, 0, -1, 3)
+ }
+ intercept[IllegalArgumentException] {
+ new PIDRateEstimator(100, 0, 0, -1)
+ }
+ }
+
+ private def createDefaultEstimator: PIDRateEstimator = {
+ new PIDRateEstimator(20, 1D, 0D, 0D)
+ }
+
+ test("first bound is None") {
+ val p = createDefaultEstimator
+ p.compute(0, 10, 10, 0) should equal(None)
+ }
+
+ test("second bound is rate") {
+ val p = createDefaultEstimator
+ p.compute(0, 10, 10, 0)
+ // 1000 elements / s
+ p.compute(10, 10, 10, 0) should equal(Some(1000))
+ }
+
+ test("works even with no time between updates") {
+ val p = createDefaultEstimator
+ p.compute(0, 10, 10, 0)
+ p.compute(10, 10, 10, 0)
+ p.compute(10, 10, 10, 0) should equal(None)
+ }
+
+ test("bound is never negative") {
+ val p = new PIDRateEstimator(20, 1D, 1D, 0D)
+ // prepare a series of batch updates, one every 20ms, 0 processed elements, 2ms of processing
+ // this might point the estimator to try and decrease the bound, but we test it never
+ // goes below zero, which would be nonsensical.
+ val times = List.tabulate(50)(x => x * 20) // every 20ms
+ val elements = List.fill(50)(0) // no processing
+ val proc = List.fill(50)(20) // 20ms of processing
+ val sched = List.fill(50)(100) // strictly positive accumulation
+ val res = for (i <- List.range(0, 50)) yield p.compute(times(i), elements(i), proc(i), sched(i))
+ res.head should equal(None)
+ res.tail should equal(List.fill(49)(Some(0D)))
+ }
+
+ test("with no accumulated or positive error, |I| > 0, follow the processing speed") {
+ val p = new PIDRateEstimator(20, 1D, 1D, 0D)
+ // prepare a series of batch updates, one every 20ms with an increasing number of processed
+ // elements in each batch, but constant processing time, and no accumulated error. Even though
+ // the integral part is non-zero, the estimated rate should follow only the proportional term
+ val times = List.tabulate(50)(x => x * 20) // every 20ms
+ val elements = List.tabulate(50)(x => x * 20) // increasing
+ val proc = List.fill(50)(20) // 20ms of processing
+ val sched = List.fill(50)(0)
+ val res = for (i <- List.range(0, 50)) yield p.compute(times(i), elements(i), proc(i), sched(i))
+ res.head should equal(None)
+ res.tail should equal(List.tabulate(50)(x => Some(x * 1000D)).tail)
+ }
+
+ test("with no accumulated but some positive error, |I| > 0, follow the processing speed") {
+ val p = new PIDRateEstimator(20, 1D, 1D, 0D)
+ // prepare a series of batch updates, one every 20ms with an decreasing number of processed
+ // elements in each batch, but constant processing time, and no accumulated error. Even though
+ // the integral part is non-zero, the estimated rate should follow only the proportional term,
+ // asking for less and less elements
+ val times = List.tabulate(50)(x => x * 20) // every 20ms
+ val elements = List.tabulate(50)(x => (50 - x) * 20) // decreasing
+ val proc = List.fill(50)(20) // 20ms of processing
+ val sched = List.fill(50)(0)
+ val res = for (i <- List.range(0, 50)) yield p.compute(times(i), elements(i), proc(i), sched(i))
+ res.head should equal(None)
+ res.tail should equal(List.tabulate(50)(x => Some((50 - x) * 1000D)).tail)
+ }
+
+ test("with some accumulated and some positive error, |I| > 0, stay below the processing speed") {
+ val p = new PIDRateEstimator(20, 1D, .01D, 0D)
+ val times = List.tabulate(50)(x => x * 20) // every 20ms
+ val rng = new Random()
+ val elements = List.tabulate(50)(x => rng.nextInt(1000))
+ val procDelayMs = 20
+ val proc = List.fill(50)(procDelayMs) // 20ms of processing
+ val sched = List.tabulate(50)(x => rng.nextInt(19)) // random wait
+ val speeds = elements map ((x) => x.toDouble / procDelayMs * 1000)
+
+ val res = for (i <- List.range(0, 50)) yield p.compute(times(i), elements(i), proc(i), sched(i))
+ res.head should equal(None)
+ forAll(List.range(1, 50)) { (n) =>
+ res(n) should not be None
+ if (res(n).get > 0 && sched(n) > 0) {
+ res(n).get should be < speeds(n)
+ }
+ }
+ }
+}