aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJacek Laskowski <jacek@japila.pl>2016-04-02 08:12:04 -0700
committerSean Owen <sowen@cloudera.com>2016-04-02 08:12:04 -0700
commit06694f1c68cb752ea311144f0dbe50e92e1393cf (patch)
tree3e4de2ebd92f4909b3245f6686a400aefe56eae1
parent67d753516da9b6318cd4001bb7ae91703aaf098d (diff)
downloadspark-06694f1c68cb752ea311144f0dbe50e92e1393cf.tar.gz
spark-06694f1c68cb752ea311144f0dbe50e92e1393cf.tar.bz2
spark-06694f1c68cb752ea311144f0dbe50e92e1393cf.zip
[MINOR] Typo fixes
## What changes were proposed in this pull request? Typo fixes. No functional changes. ## How was this patch tested? Built the sources and ran with samples. Author: Jacek Laskowski <jacek@japila.pl> Closes #11802 from jaceklaskowski/typo-fixes.
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/functions.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala13
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala22
16 files changed, 52 insertions, 49 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
index 05f8e65d65..b6b8bc33f7 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
@@ -141,7 +141,7 @@ object RecoverableNetworkWordCount {
def main(args: Array[String]) {
if (args.length != 4) {
- System.err.println("You arguments were " + args.mkString("[", ", ", "]"))
+ System.err.println("Your arguments were " + args.mkString("[", ", ", "]"))
System.err.println(
"""
|Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>
diff --git a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
index 3a99979a88..afefaaa883 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
@@ -147,7 +147,7 @@ class Pipeline @Since("1.4.0") (
t
case _ =>
throw new IllegalArgumentException(
- s"Do not support stage $stage of type ${stage.getClass}")
+ s"Does not support stage $stage of type ${stage.getClass}")
}
if (index < indexOfLastEstimator) {
curDataset = transformer.transform(curDataset)
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index ba5ad4c072..2633c06f40 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -58,7 +58,7 @@ private[regression] trait LinearRegressionParams extends PredictorParams
* The specific squared error loss function used is:
* L = 1/2n ||A coefficients - y||^2^
*
- * This support multiple types of regularization:
+ * This supports multiple types of regularization:
* - none (a.k.a. ordinary least squares)
* - L2 (ridge regression)
* - L1 (Lasso)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index ecf4285c46..aceeb8aadc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -79,13 +79,13 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
/**
* Computes [[Statistics]] for this plan. The default implementation assumes the output
- * cardinality is the product of of all child plan's cardinality, i.e. applies in the case
+ * cardinality is the product of all child plan's cardinality, i.e. applies in the case
* of cartesian joins.
*
* [[LeafNode]]s must override this.
*/
def statistics: Statistics = {
- if (children.size == 0) {
+ if (children.isEmpty) {
throw new UnsupportedOperationException(s"LeafNode $nodeName must implement statistics.")
}
Statistics(sizeInBytes = children.map(_.statistics.sizeInBytes).product)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala b/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala
index d7cd84fd24..c5df028485 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala
@@ -37,7 +37,7 @@ class ExperimentalMethods private[sql]() {
/**
* Allows extra strategies to be injected into the query planner at runtime. Note this API
- * should be consider experimental and is not intended to be stable across releases.
+ * should be considered experimental and is not intended to be stable across releases.
*
* @since 1.3.0
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index f5b083c216..0ed1ed41b0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -32,7 +32,7 @@ import org.apache.spark.util.collection.CompactBuffer
/**
* Performs an inner hash join of two child relations. When the output RDD of this operator is
* being constructed, a Spark job is asynchronously started to calculate the values for the
- * broadcasted relation. This data is then placed in a Spark broadcast variable. The streamed
+ * broadcast relation. This data is then placed in a Spark broadcast variable. The streamed
* relation is not shuffled.
*/
case class BroadcastHashJoin(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 74906050ac..baf947d037 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -2232,7 +2232,7 @@ object functions {
/**
* Splits str around pattern (pattern is a regular expression).
- * NOTE: pattern is a string represent the regular expression.
+ * NOTE: pattern is a string representation of the regular expression.
*
* @group string_funcs
* @since 1.5.0
@@ -2267,9 +2267,9 @@ object functions {
/**
* Translate any character in the src by a character in replaceString.
- * The characters in replaceString is corresponding to the characters in matchingString.
- * The translate will happen when any character in the string matching with the character
- * in the matchingString.
+ * The characters in replaceString correspond to the characters in matchingString.
+ * The translate will happen when any character in the string matches the character
+ * in the `matchingString`.
*
* @group string_funcs
* @since 1.5.0
@@ -2692,7 +2692,7 @@ object functions {
//////////////////////////////////////////////////////////////////////////////////////////////
/**
- * Returns true if the array contain the value
+ * Returns true if the array contains `value`
* @group collection_funcs
* @since 1.5.0
*/
@@ -2920,7 +2920,7 @@ object functions {
/**
* Defines a user-defined function (UDF) using a Scala closure. For this variant, the caller must
- * specifcy the output data type, and there is no automatic input type coercion.
+ * specify the output data type, and there is no automatic input type coercion.
*
* @param f A closure in Scala
* @param dataType The output data type of the UDF
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 3a664c4f5c..c1e151d08b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -132,7 +132,7 @@ class StreamingContext private[streaming] (
"both SparkContext and checkpoint as null")
}
- private[streaming] val isCheckpointPresent = (_cp != null)
+ private[streaming] val isCheckpointPresent: Boolean = _cp != null
private[streaming] val sc: SparkContext = {
if (_sc != null) {
@@ -213,8 +213,8 @@ class StreamingContext private[streaming] (
def sparkContext: SparkContext = sc
/**
- * Set each DStreams in this context to remember RDDs it generated in the last given duration.
- * DStreams remember RDDs only for a limited duration of time and releases them for garbage
+ * Set each DStream in this context to remember RDDs it generated in the last given duration.
+ * DStreams remember RDDs only for a limited duration of time and release them for garbage
* collection. This method allows the developer to specify how long to remember the RDDs (
* if the developer wishes to query old data outside the DStream computation).
* @param duration Minimum duration that each DStream should remember its RDDs
@@ -282,13 +282,14 @@ class StreamingContext private[streaming] (
}
/**
- * Create a input stream from TCP source hostname:port. Data is received using
+ * Creates an input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
* lines.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
+ * @see [[socketStream]]
*/
def socketTextStream(
hostname: String,
@@ -299,7 +300,7 @@ class StreamingContext private[streaming] (
}
/**
- * Create a input stream from TCP source hostname:port. Data is received using
+ * Creates an input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes it interpreted as object using the given
* converter.
* @param hostname Hostname to connect to for receiving data
@@ -860,7 +861,7 @@ private class StreamingContextPythonHelper {
*/
def tryRecoverFromCheckpoint(checkpointPath: String): Option[StreamingContext] = {
val checkpointOption = CheckpointReader.read(
- checkpointPath, new SparkConf(), SparkHadoopUtil.get.conf, false)
+ checkpointPath, new SparkConf(), SparkHadoopUtil.get.conf, ignoreReadError = false)
checkpointOption.map(new StreamingContext(null, _, null))
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
index b5f86fe779..995470ec8d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
@@ -23,7 +23,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{StreamingContext, Time}
/**
- * An input stream that always returns the same RDD on each timestep. Useful for testing.
+ * An input stream that always returns the same RDD on each time step. Useful for testing.
*/
class ConstantInputDStream[T: ClassTag](_ssc: StreamingContext, rdd: RDD[T])
extends InputDStream[T](_ssc) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index eb7b64eaf4..c40beeff97 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -83,7 +83,7 @@ abstract class DStream[T: ClassTag] (
// RDDs generated, marked as private[streaming] so that testsuites can access it
@transient
- private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
+ private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]()
// Time zero for the DStream
private[streaming] var zeroTime: Time = null
@@ -269,7 +269,7 @@ abstract class DStream[T: ClassTag] (
checkpointDuration == null || rememberDuration > checkpointDuration,
s"The remember duration for ${this.getClass.getSimpleName} has been set to " +
s" $rememberDuration which is not more than the checkpoint interval" +
- s" ($checkpointDuration). Please set it to higher than $checkpointDuration."
+ s" ($checkpointDuration). Please set it to a value higher than $checkpointDuration."
)
dependencies.foreach(_.validateAtStart())
@@ -277,7 +277,7 @@ abstract class DStream[T: ClassTag] (
logInfo(s"Slide time = $slideDuration")
logInfo(s"Storage level = ${storageLevel.description}")
logInfo(s"Checkpoint interval = $checkpointDuration")
- logInfo(s"Remember duration = $rememberDuration")
+ logInfo(s"Remember interval = $rememberDuration")
logInfo(s"Initialized and validated $this")
}
@@ -535,7 +535,7 @@ abstract class DStream[T: ClassTag] (
private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
logDebug(s"${this.getClass().getSimpleName}.readObject used")
ois.defaultReadObject()
- generatedRDDs = new HashMap[Time, RDD[T]] ()
+ generatedRDDs = new HashMap[Time, RDD[T]]()
}
// =======================================================================
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index 365a6bc417..431c9dbe2c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -29,7 +29,7 @@ import org.apache.spark.streaming.Time
import org.apache.spark.util.Utils
private[streaming]
-class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
+class DStreamCheckpointData[T: ClassTag](dstream: DStream[T])
extends Serializable with Logging {
protected val data = new HashMap[Time, AnyRef]()
@@ -45,7 +45,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
/**
* Updates the checkpoint data of the DStream. This gets called every time
* the graph checkpoint is initiated. Default implementation records the
- * checkpoint files to which the generate RDDs of the DStream has been saved.
+ * checkpoint files at which the generated RDDs of the DStream have been saved.
*/
def update(time: Time) {
@@ -103,7 +103,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
/**
* Restore the checkpoint data. This gets called once when the DStream graph
- * (along with its DStreams) are being restored from a graph checkpoint file.
+ * (along with its output DStreams) is being restored from a graph checkpoint file.
* Default implementation restores the RDDs from their checkpoint files.
*/
def restore() {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index 0b6b191dbe..dc88349db5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -39,7 +39,7 @@ import org.apache.spark.util.Utils
*
* @param _ssc Streaming context that will execute this input stream
*/
-abstract class InputDStream[T: ClassTag] (_ssc: StreamingContext)
+abstract class InputDStream[T: ClassTag](_ssc: StreamingContext)
extends DStream[T](_ssc) {
private[streaming] var lastValidTime: Time = null
@@ -90,8 +90,8 @@ abstract class InputDStream[T: ClassTag] (_ssc: StreamingContext)
} else {
// Time is valid, but check it it is more than lastValidTime
if (lastValidTime != null && time < lastValidTime) {
- logWarning("isTimeValid called with " + time + " where as last valid time is " +
- lastValidTime)
+ logWarning(s"isTimeValid called with $time whereas the last valid time " +
+ s"is $lastValidTime")
}
lastValidTime = time
true
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index a9be2f213f..a9e93838b8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -87,7 +87,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
logDebug("Window time = " + windowDuration)
logDebug("Slide time = " + slideDuration)
- logDebug("ZeroTime = " + zeroTime)
+ logDebug("Zero time = " + zeroTime)
logDebug("Current window = " + currentWindow)
logDebug("Previous window = " + previousWindow)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index 68eff89030..0379957e58 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -70,7 +70,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
// Try to get the parent RDD
parent.getOrCompute(validTime) match {
case Some(parentRDD) => { // If parent RDD exists, then compute as usual
- computeUsingPreviousRDD (parentRDD, prevStateRDD)
+ computeUsingPreviousRDD(parentRDD, prevStateRDD)
}
case None => { // If parent RDD does not exist
@@ -98,15 +98,15 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
// and then apply the update function
val updateFuncLocal = updateFunc
val finalFunc = (iterator: Iterator[(K, Iterable[V])]) => {
- updateFuncLocal (iterator.map (tuple => (tuple._1, tuple._2.toSeq, None)))
+ updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2.toSeq, None)))
}
- val groupedRDD = parentRDD.groupByKey (partitioner)
- val sessionRDD = groupedRDD.mapPartitions (finalFunc, preservePartitioning)
+ val groupedRDD = parentRDD.groupByKey(partitioner)
+ val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning)
// logDebug("Generating state RDD for time " + validTime + " (first)")
- Some (sessionRDD)
+ Some(sessionRDD)
}
- case Some (initialStateRDD) => {
+ case Some(initialStateRDD) => {
computeUsingPreviousRDD(parentRDD, initialStateRDD)
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index 9c8e68b03d..5d9a8ac0d9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -119,7 +119,7 @@ private[streaming] class ReceivedBlockTracker(
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
} else {
- logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
+ logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
}
} else {
// This situation occurs when:
@@ -129,7 +129,7 @@ private[streaming] class ReceivedBlockTracker(
// 2. Slow checkpointing makes recovered batch time older than WAL recovered
// lastAllocatedBatchTime.
// This situation will only occurs in recovery time.
- logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
+ logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
index d7210f64fc..7b2ef6881d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
@@ -21,18 +21,20 @@ import org.apache.spark.SparkConf
import org.apache.spark.streaming.Duration
/**
- * A component that estimates the rate at wich an InputDStream should ingest
- * elements, based on updates at every batch completion.
+ * A component that estimates the rate at which an `InputDStream` should ingest
+ * records, based on updates at every batch completion.
+ *
+ * @see [[org.apache.spark.streaming.scheduler.RateController]]
*/
private[streaming] trait RateEstimator extends Serializable {
/**
- * Computes the number of elements the stream attached to this `RateEstimator`
+ * Computes the number of records the stream attached to this `RateEstimator`
* should ingest per second, given an update on the size and completion
* times of the latest batch.
*
- * @param time The timetamp of the current batch interval that just finished
- * @param elements The number of elements that were processed in this batch
+ * @param time The timestamp of the current batch interval that just finished
+ * @param elements The number of records that were processed in this batch
* @param processingDelay The time in ms that took for the job to complete
* @param schedulingDelay The time in ms that the job spent in the scheduling queue
*/
@@ -46,13 +48,13 @@ private[streaming] trait RateEstimator extends Serializable {
object RateEstimator {
/**
- * Return a new RateEstimator based on the value of `spark.streaming.RateEstimator`.
+ * Return a new `RateEstimator` based on the value of
+ * `spark.streaming.backpressure.rateEstimator`.
*
- * The only known estimator right now is `pid`.
+ * The only known and acceptable estimator right now is `pid`.
*
* @return An instance of RateEstimator
- * @throws IllegalArgumentException if there is a configured RateEstimator that doesn't match any
- * known estimators.
+ * @throws IllegalArgumentException if the configured RateEstimator is not `pid`.
*/
def create(conf: SparkConf, batchInterval: Duration): RateEstimator =
conf.get("spark.streaming.backpressure.rateEstimator", "pid") match {
@@ -64,6 +66,6 @@ object RateEstimator {
new PIDRateEstimator(batchInterval.milliseconds, proportional, integral, derived, minRate)
case estimator =>
- throw new IllegalArgumentException(s"Unkown rate estimator: $estimator")
+ throw new IllegalArgumentException(s"Unknown rate estimator: $estimator")
}
}