diff options
author | Iulian Dragos <jaguarul@gmail.com> | 2015-07-31 12:04:03 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-07-31 12:04:03 -0700 |
commit | 0a1d2ca42c8b31d6b0e70163795f0185d4622f87 (patch) | |
tree | d453ae5039eeccfba70958dadab2c98766126ede /streaming/src/main | |
parent | e8bdcdeabb2df139a656f86686cdb53c891b1f4b (diff) | |
download | spark-0a1d2ca42c8b31d6b0e70163795f0185d4622f87.tar.gz spark-0a1d2ca42c8b31d6b0e70163795f0185d4622f87.tar.bz2 spark-0a1d2ca42c8b31d6b0e70163795f0185d4622f87.zip |
[SPARK-8979] Add a PID based rate estimator
Based on #7600
/cc tdas
Author: Iulian Dragos <jaguarul@gmail.com>
Author: François Garillot <francois@garillot.net>
Closes #7648 from dragos/topic/streaming-bp/pid and squashes the following commits:
aa5b097 [Iulian Dragos] Add more comments, made all PID constant parameters positive, a couple more tests.
93b74f8 [Iulian Dragos] Better explanation of historicalError.
7975b0c [Iulian Dragos] Add configuration for PID.
26cfd78 [Iulian Dragos] A couple of variable renames.
d0bdf7c [Iulian Dragos] Update to latest version of the code, various style and name improvements.
d58b845 [François Garillot] [SPARK-8979][Streaming] Implements a PIDRateEstimator
Diffstat (limited to 'streaming/src/main')
3 files changed, 139 insertions, 5 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index 646a8c3530..670ef8d296 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -46,7 +46,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont */ override protected[streaming] val rateController: Option[RateController] = { if (RateController.isBackPressureEnabled(ssc.conf)) { - RateEstimator.create(ssc.conf).map { new ReceiverRateController(id, _) } + Some(new ReceiverRateController(id, RateEstimator.create(ssc.conf, ssc.graph.batchDuration))) } else { None } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala new file mode 100644 index 0000000000..6ae56a68ad --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/PIDRateEstimator.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.scheduler.rate + +/** + * Implements a proportional-integral-derivative (PID) controller which acts on + * the speed of ingestion of elements into Spark Streaming. A PID controller works + * by calculating an '''error''' between a measured output and a desired value. In the + * case of Spark Streaming the error is the difference between the measured processing + * rate (number of elements/processing delay) and the previous rate. + * + * @see https://en.wikipedia.org/wiki/PID_controller + * + * @param batchDurationMillis the batch duration, in milliseconds + * @param proportional how much the correction should depend on the current + * error. This term usually provides the bulk of correction and should be positive or zero. + * A value too large would make the controller overshoot the setpoint, while a small value + * would make the controller too insensitive. The default value is 1. + * @param integral how much the correction should depend on the accumulation + * of past errors. This value should be positive or 0. This term accelerates the movement + * towards the desired value, but a large value may lead to overshooting. The default value + * is 0.2. + * @param derivative how much the correction should depend on a prediction + * of future errors, based on current rate of change. This value should be positive or 0. + * This term is not used very often, as it impacts stability of the system. The default + * value is 0. + */ +private[streaming] class PIDRateEstimator( + batchIntervalMillis: Long, + proportional: Double = 1D, + integral: Double = .2D, + derivative: Double = 0D) + extends RateEstimator { + + private var firstRun: Boolean = true + private var latestTime: Long = -1L + private var latestRate: Double = -1D + private var latestError: Double = -1L + + require( + batchIntervalMillis > 0, + s"Specified batch interval $batchIntervalMillis in PIDRateEstimator is invalid.") + require( + proportional >= 0, + s"Proportional term $proportional in PIDRateEstimator should be >= 0.") + require( + integral >= 0, + s"Integral term $integral in PIDRateEstimator should be >= 0.") + require( + derivative >= 0, + s"Derivative term $derivative in PIDRateEstimator should be >= 0.") + + + def compute(time: Long, // in milliseconds + numElements: Long, + processingDelay: Long, // in milliseconds + schedulingDelay: Long // in milliseconds + ): Option[Double] = { + + this.synchronized { + if (time > latestTime && processingDelay > 0 && batchIntervalMillis > 0) { + + // in seconds, should be close to batchDuration + val delaySinceUpdate = (time - latestTime).toDouble / 1000 + + // in elements/second + val processingRate = numElements.toDouble / processingDelay * 1000 + + // In our system `error` is the difference between the desired rate and the measured rate + // based on the latest batch information. We consider the desired rate to be latest rate, + // which is what this estimator calculated for the previous batch. + // in elements/second + val error = latestRate - processingRate + + // The error integral, based on schedulingDelay as an indicator for accumulated errors. + // A scheduling delay s corresponds to s * processingRate overflowing elements. Those + // are elements that couldn't be processed in previous batches, leading to this delay. + // In the following, we assume the processingRate didn't change too much. + // From the number of overflowing elements we can calculate the rate at which they would be + // processed by dividing it by the batch interval. This rate is our "historical" error, + // or integral part, since if we subtracted this rate from the previous "calculated rate", + // there wouldn't have been any overflowing elements, and the scheduling delay would have + // been zero. + // (in elements/second) + val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis + + // in elements/(second ^ 2) + val dError = (error - latestError) / delaySinceUpdate + + val newRate = (latestRate - proportional * error - + integral * historicalError - + derivative * dError).max(0.0) + latestTime = time + if (firstRun) { + latestRate = processingRate + latestError = 0D + firstRun = false + + None + } else { + latestRate = newRate + latestError = error + + Some(newRate) + } + } else None + } + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala index a08685119e..17ccebc1ed 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala @@ -19,6 +19,7 @@ package org.apache.spark.streaming.scheduler.rate import org.apache.spark.SparkConf import org.apache.spark.SparkException +import org.apache.spark.streaming.Duration /** * A component that estimates the rate at wich an InputDStream should ingest @@ -48,12 +49,21 @@ object RateEstimator { /** * Return a new RateEstimator based on the value of `spark.streaming.RateEstimator`. * - * @return None if there is no configured estimator, otherwise an instance of RateEstimator + * The only known estimator right now is `pid`. + * + * @return An instance of RateEstimator * @throws IllegalArgumentException if there is a configured RateEstimator that doesn't match any * known estimators. */ - def create(conf: SparkConf): Option[RateEstimator] = - conf.getOption("spark.streaming.backpressure.rateEstimator").map { estimator => - throw new IllegalArgumentException(s"Unkown rate estimator: $estimator") + def create(conf: SparkConf, batchInterval: Duration): RateEstimator = + conf.get("spark.streaming.backpressure.rateEstimator", "pid") match { + case "pid" => + val proportional = conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0) + val integral = conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2) + val derived = conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0) + new PIDRateEstimator(batchInterval.milliseconds, proportional, integral, derived) + + case estimator => + throw new IllegalArgumentException(s"Unkown rate estimator: $estimator") } } |