aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala22
1 files changed, 12 insertions, 10 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
index d7210f64fc..7b2ef6881d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
@@ -21,18 +21,20 @@ import org.apache.spark.SparkConf
import org.apache.spark.streaming.Duration
/**
- * A component that estimates the rate at wich an InputDStream should ingest
- * elements, based on updates at every batch completion.
+ * A component that estimates the rate at which an `InputDStream` should ingest
+ * records, based on updates at every batch completion.
+ *
+ * @see [[org.apache.spark.streaming.scheduler.RateController]]
*/
private[streaming] trait RateEstimator extends Serializable {
/**
- * Computes the number of elements the stream attached to this `RateEstimator`
+ * Computes the number of records the stream attached to this `RateEstimator`
* should ingest per second, given an update on the size and completion
* times of the latest batch.
*
- * @param time The timetamp of the current batch interval that just finished
- * @param elements The number of elements that were processed in this batch
+ * @param time The timestamp of the current batch interval that just finished
+ * @param elements The number of records that were processed in this batch
* @param processingDelay The time in ms that took for the job to complete
* @param schedulingDelay The time in ms that the job spent in the scheduling queue
*/
@@ -46,13 +48,13 @@ private[streaming] trait RateEstimator extends Serializable {
object RateEstimator {
/**
- * Return a new RateEstimator based on the value of `spark.streaming.RateEstimator`.
+ * Return a new `RateEstimator` based on the value of
+ * `spark.streaming.backpressure.rateEstimator`.
*
- * The only known estimator right now is `pid`.
+ * The only known and acceptable estimator right now is `pid`.
*
* @return An instance of RateEstimator
- * @throws IllegalArgumentException if there is a configured RateEstimator that doesn't match any
- * known estimators.
+ * @throws IllegalArgumentException if the configured RateEstimator is not `pid`.
*/
def create(conf: SparkConf, batchInterval: Duration): RateEstimator =
conf.get("spark.streaming.backpressure.rateEstimator", "pid") match {
@@ -64,6 +66,6 @@ object RateEstimator {
new PIDRateEstimator(batchInterval.milliseconds, proportional, integral, derived, minRate)
case estimator =>
- throw new IllegalArgumentException(s"Unkown rate estimator: $estimator")
+ throw new IllegalArgumentException(s"Unknown rate estimator: $estimator")
}
}