aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorJacek Laskowski <jacek@japila.pl>2016-04-02 08:12:04 -0700
committerSean Owen <sowen@cloudera.com>2016-04-02 08:12:04 -0700
commit06694f1c68cb752ea311144f0dbe50e92e1393cf (patch)
tree3e4de2ebd92f4909b3245f6686a400aefe56eae1 /streaming
parent67d753516da9b6318cd4001bb7ae91703aaf098d (diff)
downloadspark-06694f1c68cb752ea311144f0dbe50e92e1393cf.tar.gz
spark-06694f1c68cb752ea311144f0dbe50e92e1393cf.tar.bz2
spark-06694f1c68cb752ea311144f0dbe50e92e1393cf.zip
[MINOR] Typo fixes
## What changes were proposed in this pull request? Typo fixes. No functional changes. ## How was this patch tested? Built the sources and ran with samples. Author: Jacek Laskowski <jacek@japila.pl> Closes #11802 from jaceklaskowski/typo-fixes.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala13
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala22
9 files changed, 39 insertions, 36 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 3a664c4f5c..c1e151d08b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -132,7 +132,7 @@ class StreamingContext private[streaming] (
"both SparkContext and checkpoint as null")
}
- private[streaming] val isCheckpointPresent = (_cp != null)
+ private[streaming] val isCheckpointPresent: Boolean = _cp != null
private[streaming] val sc: SparkContext = {
if (_sc != null) {
@@ -213,8 +213,8 @@ class StreamingContext private[streaming] (
def sparkContext: SparkContext = sc
/**
- * Set each DStreams in this context to remember RDDs it generated in the last given duration.
- * DStreams remember RDDs only for a limited duration of time and releases them for garbage
+ * Set each DStream in this context to remember RDDs it generated in the last given duration.
+ * DStreams remember RDDs only for a limited duration of time and release them for garbage
* collection. This method allows the developer to specify how long to remember the RDDs (
* if the developer wishes to query old data outside the DStream computation).
* @param duration Minimum duration that each DStream should remember its RDDs
@@ -282,13 +282,14 @@ class StreamingContext private[streaming] (
}
/**
- * Create a input stream from TCP source hostname:port. Data is received using
+ * Creates an input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
* lines.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
+ * @see [[socketStream]]
*/
def socketTextStream(
hostname: String,
@@ -299,7 +300,7 @@ class StreamingContext private[streaming] (
}
/**
- * Create a input stream from TCP source hostname:port. Data is received using
+ * Creates an input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes it interpreted as object using the given
* converter.
* @param hostname Hostname to connect to for receiving data
@@ -860,7 +861,7 @@ private class StreamingContextPythonHelper {
*/
def tryRecoverFromCheckpoint(checkpointPath: String): Option[StreamingContext] = {
val checkpointOption = CheckpointReader.read(
- checkpointPath, new SparkConf(), SparkHadoopUtil.get.conf, false)
+ checkpointPath, new SparkConf(), SparkHadoopUtil.get.conf, ignoreReadError = false)
checkpointOption.map(new StreamingContext(null, _, null))
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
index b5f86fe779..995470ec8d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ConstantInputDStream.scala
@@ -23,7 +23,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{StreamingContext, Time}
/**
- * An input stream that always returns the same RDD on each timestep. Useful for testing.
+ * An input stream that always returns the same RDD on each time step. Useful for testing.
*/
class ConstantInputDStream[T: ClassTag](_ssc: StreamingContext, rdd: RDD[T])
extends InputDStream[T](_ssc) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index eb7b64eaf4..c40beeff97 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -83,7 +83,7 @@ abstract class DStream[T: ClassTag] (
// RDDs generated, marked as private[streaming] so that testsuites can access it
@transient
- private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
+ private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]()
// Time zero for the DStream
private[streaming] var zeroTime: Time = null
@@ -269,7 +269,7 @@ abstract class DStream[T: ClassTag] (
checkpointDuration == null || rememberDuration > checkpointDuration,
s"The remember duration for ${this.getClass.getSimpleName} has been set to " +
s" $rememberDuration which is not more than the checkpoint interval" +
- s" ($checkpointDuration). Please set it to higher than $checkpointDuration."
+ s" ($checkpointDuration). Please set it to a value higher than $checkpointDuration."
)
dependencies.foreach(_.validateAtStart())
@@ -277,7 +277,7 @@ abstract class DStream[T: ClassTag] (
logInfo(s"Slide time = $slideDuration")
logInfo(s"Storage level = ${storageLevel.description}")
logInfo(s"Checkpoint interval = $checkpointDuration")
- logInfo(s"Remember duration = $rememberDuration")
+ logInfo(s"Remember interval = $rememberDuration")
logInfo(s"Initialized and validated $this")
}
@@ -535,7 +535,7 @@ abstract class DStream[T: ClassTag] (
private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
logDebug(s"${this.getClass().getSimpleName}.readObject used")
ois.defaultReadObject()
- generatedRDDs = new HashMap[Time, RDD[T]] ()
+ generatedRDDs = new HashMap[Time, RDD[T]]()
}
// =======================================================================
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index 365a6bc417..431c9dbe2c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -29,7 +29,7 @@ import org.apache.spark.streaming.Time
import org.apache.spark.util.Utils
private[streaming]
-class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
+class DStreamCheckpointData[T: ClassTag](dstream: DStream[T])
extends Serializable with Logging {
protected val data = new HashMap[Time, AnyRef]()
@@ -45,7 +45,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
/**
* Updates the checkpoint data of the DStream. This gets called every time
* the graph checkpoint is initiated. Default implementation records the
- * checkpoint files to which the generate RDDs of the DStream has been saved.
+ * checkpoint files at which the generated RDDs of the DStream have been saved.
*/
def update(time: Time) {
@@ -103,7 +103,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
/**
* Restore the checkpoint data. This gets called once when the DStream graph
- * (along with its DStreams) are being restored from a graph checkpoint file.
+ * (along with its output DStreams) is being restored from a graph checkpoint file.
* Default implementation restores the RDDs from their checkpoint files.
*/
def restore() {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index 0b6b191dbe..dc88349db5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -39,7 +39,7 @@ import org.apache.spark.util.Utils
*
* @param _ssc Streaming context that will execute this input stream
*/
-abstract class InputDStream[T: ClassTag] (_ssc: StreamingContext)
+abstract class InputDStream[T: ClassTag](_ssc: StreamingContext)
extends DStream[T](_ssc) {
private[streaming] var lastValidTime: Time = null
@@ -90,8 +90,8 @@ abstract class InputDStream[T: ClassTag] (_ssc: StreamingContext)
} else {
// Time is valid, but check it it is more than lastValidTime
if (lastValidTime != null && time < lastValidTime) {
- logWarning("isTimeValid called with " + time + " where as last valid time is " +
- lastValidTime)
+ logWarning(s"isTimeValid called with $time whereas the last valid time " +
+ s"is $lastValidTime")
}
lastValidTime = time
true
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index a9be2f213f..a9e93838b8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -87,7 +87,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
logDebug("Window time = " + windowDuration)
logDebug("Slide time = " + slideDuration)
- logDebug("ZeroTime = " + zeroTime)
+ logDebug("Zero time = " + zeroTime)
logDebug("Current window = " + currentWindow)
logDebug("Previous window = " + previousWindow)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index 68eff89030..0379957e58 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -70,7 +70,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
// Try to get the parent RDD
parent.getOrCompute(validTime) match {
case Some(parentRDD) => { // If parent RDD exists, then compute as usual
- computeUsingPreviousRDD (parentRDD, prevStateRDD)
+ computeUsingPreviousRDD(parentRDD, prevStateRDD)
}
case None => { // If parent RDD does not exist
@@ -98,15 +98,15 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
// and then apply the update function
val updateFuncLocal = updateFunc
val finalFunc = (iterator: Iterator[(K, Iterable[V])]) => {
- updateFuncLocal (iterator.map (tuple => (tuple._1, tuple._2.toSeq, None)))
+ updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2.toSeq, None)))
}
- val groupedRDD = parentRDD.groupByKey (partitioner)
- val sessionRDD = groupedRDD.mapPartitions (finalFunc, preservePartitioning)
+ val groupedRDD = parentRDD.groupByKey(partitioner)
+ val sessionRDD = groupedRDD.mapPartitions(finalFunc, preservePartitioning)
// logDebug("Generating state RDD for time " + validTime + " (first)")
- Some (sessionRDD)
+ Some(sessionRDD)
}
- case Some (initialStateRDD) => {
+ case Some(initialStateRDD) => {
computeUsingPreviousRDD(parentRDD, initialStateRDD)
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index 9c8e68b03d..5d9a8ac0d9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -119,7 +119,7 @@ private[streaming] class ReceivedBlockTracker(
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
} else {
- logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
+ logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
}
} else {
// This situation occurs when:
@@ -129,7 +129,7 @@ private[streaming] class ReceivedBlockTracker(
// 2. Slow checkpointing makes recovered batch time older than WAL recovered
// lastAllocatedBatchTime.
// This situation will only occurs in recovery time.
- logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
+ logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
index d7210f64fc..7b2ef6881d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
@@ -21,18 +21,20 @@ import org.apache.spark.SparkConf
import org.apache.spark.streaming.Duration
/**
- * A component that estimates the rate at wich an InputDStream should ingest
- * elements, based on updates at every batch completion.
+ * A component that estimates the rate at which an `InputDStream` should ingest
+ * records, based on updates at every batch completion.
+ *
+ * @see [[org.apache.spark.streaming.scheduler.RateController]]
*/
private[streaming] trait RateEstimator extends Serializable {
/**
- * Computes the number of elements the stream attached to this `RateEstimator`
+ * Computes the number of records the stream attached to this `RateEstimator`
* should ingest per second, given an update on the size and completion
* times of the latest batch.
*
- * @param time The timetamp of the current batch interval that just finished
- * @param elements The number of elements that were processed in this batch
+ * @param time The timestamp of the current batch interval that just finished
+ * @param elements The number of records that were processed in this batch
* @param processingDelay The time in ms that took for the job to complete
* @param schedulingDelay The time in ms that the job spent in the scheduling queue
*/
@@ -46,13 +48,13 @@ private[streaming] trait RateEstimator extends Serializable {
object RateEstimator {
/**
- * Return a new RateEstimator based on the value of `spark.streaming.RateEstimator`.
+ * Return a new `RateEstimator` based on the value of
+ * `spark.streaming.backpressure.rateEstimator`.
*
- * The only known estimator right now is `pid`.
+ * The only known and acceptable estimator right now is `pid`.
*
* @return An instance of RateEstimator
- * @throws IllegalArgumentException if there is a configured RateEstimator that doesn't match any
- * known estimators.
+ * @throws IllegalArgumentException if the configured RateEstimator is not `pid`.
*/
def create(conf: SparkConf, batchInterval: Duration): RateEstimator =
conf.get("spark.streaming.backpressure.rateEstimator", "pid") match {
@@ -64,6 +66,6 @@ object RateEstimator {
new PIDRateEstimator(batchInterval.milliseconds, proportional, integral, derived, minRate)
case estimator =>
- throw new IllegalArgumentException(s"Unkown rate estimator: $estimator")
+ throw new IllegalArgumentException(s"Unknown rate estimator: $estimator")
}
}