aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-14 10:34:13 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-14 10:42:05 -0800
commit6069446356d1daf28054b87ff1a3bf724a22df03 (patch)
tree758bbc526fc32d785508b236bbc25db53a23fcc7 /streaming
parentd182a57cae6455804773db23d9498d2dcdd02172 (diff)
downloadspark-6069446356d1daf28054b87ff1a3bf724a22df03.tar.gz
spark-6069446356d1daf28054b87ff1a3bf724a22df03.tar.bz2
spark-6069446356d1daf28054b87ff1a3bf724a22df03.zip
Making comments consistent w/ Spark style
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala52
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala170
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala14
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala26
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala166
5 files changed, 196 insertions, 232 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index fbe3cebd6d..036763fe2f 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -98,10 +98,10 @@ abstract class DStream[T: ClassManifest] (
this
}
- /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
+ /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)
- /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
+ /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def cache(): DStream[T] = persist()
/**
@@ -119,7 +119,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * This method initializes the DStream by setting the "zero" time, based on which
+ * Initialize the DStream by setting the "zero" time, based on which
* the validity of future times is calculated. This method also recursively initializes
* its parent DStreams.
*/
@@ -244,7 +244,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Retrieves a precomputed RDD of this DStream, or computes the RDD. This is an internal
+ * Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal
* method that should not be called directly.
*/
protected[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
@@ -283,7 +283,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Generates a SparkStreaming job for the given time. This is an internal method that
+ * Generate a SparkStreaming job for the given time. This is an internal method that
* should not be called directly. This default implementation creates a job
* that materializes the corresponding RDD. Subclasses of DStream may override this
* (eg. ForEachDStream).
@@ -302,7 +302,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Dereferences RDDs that are older than rememberDuration.
+ * Dereference RDDs that are older than rememberDuration.
*/
protected[streaming] def forgetOldRDDs(time: Time) {
val keys = generatedRDDs.keys
@@ -328,7 +328,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Refreshes the list of checkpointed RDDs that will be saved along with checkpoint of
+ * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of
* this stream. This is an internal method that should not be called directly. This is
* a default implementation that saves only the file names of the checkpointed RDDs to
* checkpointData. Subclasses of DStream (especially those of InputDStream) may override
@@ -373,7 +373,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Restores the RDDs in generatedRDDs from the checkpointData. This is an internal method
+ * Restore the RDDs in generatedRDDs from the checkpointData. This is an internal method
* that should not be called directly. This is a default implementation that recreates RDDs
* from the checkpoint file names stored in checkpointData. Subclasses of DStream that
* override the updateCheckpointData() method would also need to override this method.
@@ -425,20 +425,20 @@ abstract class DStream[T: ClassManifest] (
// DStream operations
// =======================================================================
- /** Returns a new DStream by applying a function to all elements of this DStream. */
+ /** Return a new DStream by applying a function to all elements of this DStream. */
def map[U: ClassManifest](mapFunc: T => U): DStream[U] = {
new MappedDStream(this, ssc.sc.clean(mapFunc))
}
/**
- * Returns a new DStream by applying a function to all elements of this DStream,
+ * Return a new DStream by applying a function to all elements of this DStream,
* and then flattening the results
*/
def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]): DStream[U] = {
new FlatMappedDStream(this, ssc.sc.clean(flatMapFunc))
}
- /** Returns a new DStream containing only the elements that satisfy a predicate. */
+ /** Return a new DStream containing only the elements that satisfy a predicate. */
def filter(filterFunc: T => Boolean): DStream[T] = new FilteredDStream(this, filterFunc)
/**
@@ -461,20 +461,20 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Returns a new DStream in which each RDD has a single element generated by reducing each RDD
+ * Return a new DStream in which each RDD has a single element generated by reducing each RDD
* of this DStream.
*/
def reduce(reduceFunc: (T, T) => T): DStream[T] =
this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
/**
- * Returns a new DStream in which each RDD has a single element generated by counting each RDD
+ * Return a new DStream in which each RDD has a single element generated by counting each RDD
* of this DStream.
*/
def count(): DStream[Long] = this.map(_ => 1L).reduce(_ + _)
/**
- * Applies a function to each RDD in this DStream. This is an output operator, so
+ * Apply a function to each RDD in this DStream. This is an output operator, so
* this DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: RDD[T] => Unit) {
@@ -482,7 +482,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Applies a function to each RDD in this DStream. This is an output operator, so
+ * Apply a function to each RDD in this DStream. This is an output operator, so
* this DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: (RDD[T], Time) => Unit) {
@@ -492,7 +492,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Returns a new DStream in which each RDD is generated by applying a function
+ * Return a new DStream in which each RDD is generated by applying a function
* on each RDD of this DStream.
*/
def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
@@ -500,7 +500,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Returns a new DStream in which each RDD is generated by applying a function
+ * Return a new DStream in which each RDD is generated by applying a function
* on each RDD of this DStream.
*/
def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
@@ -508,7 +508,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Prints the first ten elements of each RDD generated in this DStream. This is an output
+ * Print the first ten elements of each RDD generated in this DStream. This is an output
* operator, so this DStream will be registered as an output stream and there materialized.
*/
def print() {
@@ -545,7 +545,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Returns a new DStream which computed based on tumbling window on this DStream.
+ * Return a new DStream which computed based on tumbling window on this DStream.
* This is equivalent to window(batchTime, batchTime).
* @param batchDuration tumbling window duration; must be a multiple of this DStream's
* batching interval
@@ -553,7 +553,7 @@ abstract class DStream[T: ClassManifest] (
def tumble(batchDuration: Duration): DStream[T] = window(batchDuration, batchDuration)
/**
- * Returns a new DStream in which each RDD has a single element generated by reducing all
+ * Return a new DStream in which each RDD has a single element generated by reducing all
* elements in a window over this DStream. windowDuration and slideDuration are as defined
* in the window() operation. This is equivalent to
* window(windowDuration, slideDuration).reduce(reduceFunc)
@@ -578,7 +578,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Returns a new DStream in which each RDD has a single element generated by counting the number
+ * Return a new DStream in which each RDD has a single element generated by counting the number
* of elements in a window over this DStream. windowDuration and slideDuration are as defined in the
* window() operation. This is equivalent to window(windowDuration, slideDuration).count()
*/
@@ -587,20 +587,20 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Returns a new DStream by unifying data of another DStream with this DStream.
+ * Return a new DStream by unifying data of another DStream with this DStream.
* @param that Another DStream having the same slideDuration as this DStream.
*/
def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that))
/**
- * Returns all the RDDs defined by the Interval object (both end times included)
+ * Return all the RDDs defined by the Interval object (both end times included)
*/
protected[streaming] def slice(interval: Interval): Seq[RDD[T]] = {
slice(interval.beginTime, interval.endTime)
}
/**
- * Returns all the RDDs between 'fromTime' to 'toTime' (both included)
+ * Return all the RDDs between 'fromTime' to 'toTime' (both included)
*/
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
val rdds = new ArrayBuffer[RDD[T]]()
@@ -616,7 +616,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Saves each RDD in this DStream as a Sequence file of serialized objects.
+ * Save each RDD in this DStream as a Sequence file of serialized objects.
* The file name at each batch interval is generated based on `prefix` and
* `suffix`: "prefix-TIME_IN_MS.suffix".
*/
@@ -629,7 +629,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Saves each RDD in this DStream as at text file, using string representation
+ * Save each RDD in this DStream as at text file, using string representation
* of elements. The file name at each batch interval is generated based on
* `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index 3952457339..f63279512b 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -26,29 +26,23 @@ extends Serializable {
}
/**
- * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream.
- * Therefore, the values for each key in `this` DStream's RDDs are grouped into a
- * single sequence to generate the RDDs of the new DStream. Hash partitioning is
- * used to generate the RDDs with Spark's default number of partitions.
+ * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
+ * generate the RDDs with Spark's default number of partitions.
*/
def groupByKey(): DStream[(K, Seq[V])] = {
groupByKey(defaultPartitioner())
}
/**
- * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream.
- * Therefore, the values for each key in `this` DStream's RDDs are grouped into a
- * single sequence to generate the RDDs of the new DStream. Hash partitioning is
- * used to generate the RDDs with `numPartitions` partitions.
+ * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
+ * generate the RDDs with `numPartitions` partitions.
*/
def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = {
groupByKey(defaultPartitioner(numPartitions))
}
/**
- * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream.
- * Therefore, the values for each key in `this` DStream's RDDs are grouped into a
- * single sequence to generate the RDDs of the new DStream. [[spark.Partitioner]]
+ * Create a new DStream by applying `groupByKey` on each RDD. The supplied [[spark.Partitioner]]
* is used to control the partitioning of each RDD.
*/
def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {
@@ -60,30 +54,27 @@ extends Serializable {
}
/**
- * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream.
- * Therefore, the values for each key in `this` DStream's RDDs is merged using the
- * associative reduce function to generate the RDDs of the new DStream.
- * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * merged using the associative reduce function. Hash partitioning is used to generate the RDDs
+ * with Spark's default number of partitions.
*/
def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = {
reduceByKey(reduceFunc, defaultPartitioner())
}
/**
- * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream.
- * Therefore, the values for each key in `this` DStream's RDDs is merged using the
- * associative reduce function to generate the RDDs of the new DStream.
- * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs
+ * with `numPartitions` partitions.
*/
def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = {
reduceByKey(reduceFunc, defaultPartitioner(numPartitions))
}
/**
- * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream.
- * Therefore, the values for each key in `this` DStream's RDDs is merged using the
- * associative reduce function to generate the RDDs of the new DStream.
- * [[spark.Partitioner]] is used to control the partitioning of each RDD.
+ * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * merged using the supplied reduce function. [[spark.Partitioner]] is used to control the
+ * partitioning of each RDD.
*/
def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
@@ -91,9 +82,9 @@ extends Serializable {
}
/**
- * Generic function to combine elements of each key in DStream's RDDs using custom function.
- * This is similar to the combineByKey for RDDs. Please refer to combineByKey in
- * [[spark.PairRDDFunctions]] for more information.
+ * Combine elements of each key in DStream's RDDs using custom function. This is similar to the
+ * combineByKey for RDDs. Please refer to combineByKey in [[spark.PairRDDFunctions]] for more
+ * information.
*/
def combineByKey[C: ClassManifest](
createCombiner: V => C,
@@ -104,19 +95,18 @@ extends Serializable {
}
/**
- * Creates a new DStream by counting the number of values of each key in each RDD
- * of `this` DStream. Hash partitioning is used to generate the RDDs with Spark's
- * `numPartitions` partitions.
+ * Create a new DStream by counting the number of values of each key in each RDD. Hash
+ * partitioning is used to generate the RDDs with Spark's `numPartitions` partitions.
*/
def countByKey(numPartitions: Int = self.ssc.sc.defaultParallelism): DStream[(K, Long)] = {
self.map(x => (x._1, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
}
/**
- * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
- * This is similar to `DStream.groupByKey()` but applies it over a sliding window.
- * The new DStream generates RDDs with the same interval as this DStream.
- * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * Creates a new DStream by applying `groupByKey` over a sliding window. This is similar to
+ * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs
+ * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with
+ * Spark's default number of partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
*/
@@ -125,9 +115,9 @@ extends Serializable {
}
/**
- * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
- * This is similar to `DStream.groupByKey()` but applies it over a sliding window.
- * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * Create a new DStream by applying `groupByKey` over a sliding window. Similar to
+ * `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to
+ * generate the RDDs with Spark's default number of partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
@@ -139,8 +129,8 @@ extends Serializable {
}
/**
- * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
- * This is similar to `DStream.groupByKey()` but applies it over a sliding window.
+ * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * Similar to `DStream.groupByKey()`, but applies it over a sliding window.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
@@ -158,8 +148,8 @@ extends Serializable {
}
/**
- * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
- * This is similar to `DStream.groupByKey()` but applies it over a sliding window.
+ * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * Similar to `DStream.groupByKey()`, but applies it over a sliding window.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
@@ -176,10 +166,10 @@ extends Serializable {
}
/**
- * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
- * This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
- * The new DStream generates RDDs with the same interval as this DStream.
- * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * Create a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
+ * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream
+ * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate
+ * the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
@@ -192,9 +182,9 @@ extends Serializable {
}
/**
- * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
- * This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
- * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to
+ * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
+ * generate the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
@@ -211,9 +201,9 @@ extends Serializable {
}
/**
- * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
- * This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
- * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to
+ * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
+ * generate the RDDs with `numPartitions` partitions.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
@@ -232,8 +222,8 @@ extends Serializable {
}
/**
- * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
- * This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
+ * Create a new DStream by applying `reduceByKey` over a sliding window. Similar to
+ * `DStream.reduceByKey()`, but applies it over a sliding window.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
@@ -255,9 +245,8 @@ extends Serializable {
}
/**
- * Creates a new DStream by reducing over a window in a smarter way.
- * The reduced value of over a new window is calculated incrementally by using the
- * old window's reduce value :
+ * Create a new DStream by reducing over a using incremental computation.
+ * The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
@@ -283,9 +272,8 @@ extends Serializable {
}
/**
- * Creates a new DStream by reducing over a window in a smarter way.
- * The reduced value of over a new window is calculated incrementally by using the
- * old window's reduce value :
+ * Create a new DStream by reducing over a using incremental computation.
+ * The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
@@ -313,9 +301,8 @@ extends Serializable {
}
/**
- * Creates a new DStream by reducing over a window in a smarter way.
- * The reduced value of over a new window is calculated incrementally by using the
- * old window's reduce value :
+ * Create a new DStream by reducing over a using incremental computation.
+ * The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
@@ -344,7 +331,7 @@ extends Serializable {
}
/**
- * Creates a new DStream by counting the number of values for each key over a window.
+ * Create a new DStream by counting the number of values for each key over a window.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
@@ -369,10 +356,9 @@ extends Serializable {
}
/**
- * Creates a new "state" DStream where the state for each key is updated by applying
- * the given function on the previous state of the key and the new values of the key from
- * `this` DStream. Hash partitioning is used to generate the RDDs with Spark's default
- * number of partitions.
+ * Create a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of each key.
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @tparam S State type
@@ -384,9 +370,9 @@ extends Serializable {
}
/**
- * Creates a new "state" DStream where the state for each key is updated by applying
- * the given function on the previous state of the key and the new values of the key from
- * `this` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * Create a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of each key.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @param numPartitions Number of partitions of each RDD in the new DStream.
@@ -400,9 +386,9 @@ extends Serializable {
}
/**
- * Creates a new "state" DStream where the state for each key is updated by applying
- * the given function on the previous state of the key and the new values of the key from
- * `this` DStream. [[spark.Partitioner]] is used to control the partitioning of each RDD.
+ * Create a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of the key.
+ * [[spark.Partitioner]] is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
@@ -419,9 +405,9 @@ extends Serializable {
}
/**
- * Creates a new "state" DStream where the state for each key is updated by applying
- * the given function on the previous state of the key and the new values of the key from
- * `this` DStream. [[spark.Partitioner]] is used to control the partitioning of each RDD.
+ * Create a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of each key.
+ * [[spark.Paxrtitioner]] is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated. Note, that
* this function may generate a different a tuple with a different key
@@ -451,22 +437,19 @@ extends Serializable {
}
/**
- * Cogroups `this` DStream with `other` DStream. Each RDD of the new DStream will
- * be generated by cogrouping RDDs from`this`and `other` DStreams. Therefore, for
- * each key k in corresponding RDDs of `this` or `other` DStreams, the generated RDD
- * will contains a tuple with the list of values for that key in both RDDs.
- * HashPartitioner is used to partition each generated RDD into default number of partitions.
+ * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
+ * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
+ * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number
+ * of partitions.
*/
def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
cogroup(other, defaultPartitioner())
}
/**
- * Cogroups `this` DStream with `other` DStream. Each RDD of the new DStream will
- * be generated by cogrouping RDDs from`this`and `other` DStreams. Therefore, for
- * each key k in corresponding RDDs of `this` or `other` DStreams, the generated RDD
- * will contains a tuple with the list of values for that key in both RDDs.
- * Partitioner is used to partition each generated RDD.
+ * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
+ * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
+ * key in both RDDs. Partitioner is used to partition each generated RDD.
*/
def cogroup[W: ClassManifest](
other: DStream[(K, W)],
@@ -488,8 +471,7 @@ extends Serializable {
}
/**
- * Joins `this` DStream with `other` DStream. Each RDD of the new DStream will
- * be generated by joining RDDs from `this` and `other` DStreams. HashPartitioner is used
+ * Join `this` DStream with `other` DStream. HashPartitioner is used
* to partition each generated RDD into default number of partitions.
*/
def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
@@ -497,7 +479,7 @@ extends Serializable {
}
/**
- * Joins `this` DStream with `other` DStream, that is, each RDD of the new DStream will
+ * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will
* be generated by joining RDDs from `this` and other DStream. Uses the given
* Partitioner to partition each generated RDD.
*/
@@ -513,7 +495,7 @@ extends Serializable {
}
/**
- * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
* based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
*/
def saveAsHadoopFiles[F <: OutputFormat[K, V]](
@@ -524,7 +506,7 @@ extends Serializable {
}
/**
- * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
* based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
*/
def saveAsHadoopFiles(
@@ -543,8 +525,8 @@ extends Serializable {
}
/**
- * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
- * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+ * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](
prefix: String,
@@ -554,8 +536,8 @@ extends Serializable {
}
/**
- * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
- * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+ * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
def saveAsNewAPIHadoopFiles(
prefix: String,
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
index f85864df5d..32faef5670 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
@@ -9,20 +9,20 @@ import spark.storage.StorageLevel
class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T])
extends JavaDStreamLike[T, JavaDStream[T]] {
- /** Returns a new DStream containing only the elements that satisfy a predicate. */
+ /** Return a new DStream containing only the elements that satisfy a predicate. */
def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] =
dstream.filter((x => f(x).booleanValue()))
- /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
+ /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def cache(): JavaDStream[T] = dstream.cache()
- /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
+ /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def persist(): JavaDStream[T] = dstream.cache()
- /** Persists the RDDs of this DStream with the given storage level */
+ /** Persist the RDDs of this DStream with the given storage level */
def persist(storageLevel: StorageLevel): JavaDStream[T] = dstream.persist(storageLevel)
- /** Method that generates a RDD for the given duration */
+ /** Generate an RDD for the given duration */
def compute(validTime: Time): JavaRDD[T] = {
dstream.compute(validTime) match {
case Some(rdd) => new JavaRDD(rdd)
@@ -51,7 +51,7 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM
dstream.window(windowDuration, slideDuration)
/**
- * Returns a new DStream which computed based on tumbling window on this DStream.
+ * Return a new DStream which computed based on tumbling window on this DStream.
* This is equivalent to window(batchDuration, batchDuration).
* @param batchDuration tumbling window duration; must be a multiple of this DStream's interval
*/
@@ -59,7 +59,7 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM
dstream.tumble(batchDuration)
/**
- * Returns a new DStream by unifying data of another DStream with this DStream.
+ * Return a new DStream by unifying data of another DStream with this DStream.
* @param that Another DStream having the same interval (i.e., slideDuration) as this DStream.
*/
def union(that: JavaDStream[T]): JavaDStream[T] =
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
index 4257ecd583..32df665a98 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
@@ -22,19 +22,19 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
}
/**
- * Prints the first ten elements of each RDD generated in this DStream. This is an output
+ * Print the first ten elements of each RDD generated in this DStream. This is an output
* operator, so this DStream will be registered as an output stream and there materialized.
*/
def print() = dstream.print()
/**
- * Returns a new DStream in which each RDD has a single element generated by counting each RDD
+ * Return a new DStream in which each RDD has a single element generated by counting each RDD
* of this DStream.
*/
def count(): JavaDStream[JLong] = dstream.count()
/**
- * Returns a new DStream in which each RDD has a single element generated by counting the number
+ * Return a new DStream in which each RDD has a single element generated by counting the number
* of elements in a window over this DStream. windowDuration and slideDuration are as defined in the
* window() operation. This is equivalent to window(windowDuration, slideDuration).count()
*/
@@ -50,15 +50,15 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
def glom(): JavaDStream[JList[T]] =
new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq)))
- /** Returns the StreamingContext associated with this DStream */
+ /** Return the StreamingContext associated with this DStream */
def context(): StreamingContext = dstream.context()
- /** Returns a new DStream by applying a function to all elements of this DStream. */
+ /** Return a new DStream by applying a function to all elements of this DStream. */
def map[R](f: JFunction[T, R]): JavaDStream[R] = {
new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType())
}
- /** Returns a new DStream by applying a function to all elements of this DStream. */
+ /** Return a new DStream by applying a function to all elements of this DStream. */
def map[K, V](f: PairFunction[T, K, V]): JavaPairDStream[K, V] = {
def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
@@ -86,13 +86,13 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
}
/**
- * Returns a new DStream in which each RDD has a single element generated by reducing each RDD
+ * Return a new DStream in which each RDD has a single element generated by reducing each RDD
* of this DStream.
*/
def reduce(f: JFunction2[T, T, T]): JavaDStream[T] = dstream.reduce(f)
/**
- * Returns a new DStream in which each RDD has a single element generated by reducing all
+ * Return a new DStream in which each RDD has a single element generated by reducing all
* elements in a window over this DStream. windowDuration and slideDuration are as defined in the
* window() operation. This is equivalent to window(windowDuration, slideDuration).reduce(reduceFunc)
*/
@@ -106,14 +106,14 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
}
/**
- * Returns all the RDDs between 'fromDuration' to 'toDuration' (both included)
+ * Return all the RDDs between 'fromDuration' to 'toDuration' (both included)
*/
def slice(fromDuration: Duration, toDuration: Duration): JList[JavaRDD[T]] = {
new util.ArrayList(dstream.slice(fromDuration, toDuration).map(new JavaRDD(_)).toSeq)
}
/**
- * Applies a function to each RDD in this DStream. This is an output operator, so
+ * Apply a function to each RDD in this DStream. This is an output operator, so
* this DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) {
@@ -121,7 +121,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
}
/**
- * Applies a function to each RDD in this DStream. This is an output operator, so
+ * Apply a function to each RDD in this DStream. This is an output operator, so
* this DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) {
@@ -129,7 +129,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
}
/**
- * Returns a new DStream in which each RDD is generated by applying a function
+ * Return a new DStream in which each RDD is generated by applying a function
* on each RDD of this DStream.
*/
def transform[U](transformFunc: JFunction[JavaRDD[T], JavaRDD[U]]): JavaDStream[U] = {
@@ -141,7 +141,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
}
/**
- * Returns a new DStream in which each RDD is generated by applying a function
+ * Return a new DStream in which each RDD is generated by applying a function
* on each RDD of this DStream.
*/
def transform[U](transformFunc: JFunction2[JavaRDD[T], Time, JavaRDD[U]]): JavaDStream[U] = {
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
index c761fdd3bd..16b476ec90 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -86,19 +86,15 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
// =======================================================================
/**
- * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream.
- * Therefore, the values for each key in `this` DStream's RDDs are grouped into a
- * single sequence to generate the RDDs of the new DStream. Hash partitioning is
- * used to generate the RDDs with Spark's default number of partitions.
+ * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
+ * generate the RDDs with Spark's default number of partitions.
*/
def groupByKey(): JavaPairDStream[K, JList[V]] =
dstream.groupByKey().mapValues(seqAsJavaList _)
/**
- * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream.
- * Therefore, the values for each key in `this` DStream's RDDs are grouped into a
- * single sequence to generate the RDDs of the new DStream. Hash partitioning is
- * used to generate the RDDs with `numPartitions` partitions.
+ * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
+ * generate the RDDs with `numPartitions` partitions.
*/
def groupByKey(numPartitions: Int): JavaPairDStream[K, JList[V]] =
dstream.groupByKey(numPartitions).mapValues(seqAsJavaList _)
@@ -113,37 +109,34 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.groupByKey(partitioner).mapValues(seqAsJavaList _)
/**
- * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream.
- * Therefore, the values for each key in `this` DStream's RDDs is merged using the
- * associative reduce function to generate the RDDs of the new DStream.
- * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * merged using the associative reduce function. Hash partitioning is used to generate the RDDs
+ * with Spark's default number of partitions.
*/
def reduceByKey(func: JFunction2[V, V, V]): JavaPairDStream[K, V] =
dstream.reduceByKey(func)
/**
- * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream.
- * Therefore, the values for each key in `this` DStream's RDDs is merged using the
- * associative reduce function to generate the RDDs of the new DStream.
- * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs
+ * with `numPartitions` partitions.
*/
def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairDStream[K, V] =
dstream.reduceByKey(func, numPartitions)
/**
- * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream.
- * Therefore, the values for each key in `this` DStream's RDDs is merged using the
- * associative reduce function to generate the RDDs of the new DStream.
- * [[spark.Partitioner]] is used to control the partitioning of each RDD.
+ * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * merged using the supplied reduce function. [[spark.Partitioner]] is used to control the
+ * partitioning of each RDD.
*/
def reduceByKey(func: JFunction2[V, V, V], partitioner: Partitioner): JavaPairDStream[K, V] = {
dstream.reduceByKey(func, partitioner)
}
/**
- * Generic function to combine elements of each key in DStream's RDDs using custom function.
- * This is similar to the combineByKey for RDDs. Please refer to combineByKey in
- * [[spark.PairRDDFunctions]] for more information.
+ * Combine elements of each key in DStream's RDDs using custom function. This is similar to the
+ * combineByKey for RDDs. Please refer to combineByKey in [[spark.PairRDDFunctions]] for more
+ * information.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
@@ -156,8 +149,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Creates a new DStream by counting the number of values of each key in each RDD
- * of `this` DStream. Hash partitioning is used to generate the RDDs.
+ * Create a new DStream by counting the number of values of each key in each RDD. Hash
+ * partitioning is used to generate the RDDs with Spark's `numPartitions` partitions.
*/
def countByKey(numPartitions: Int): JavaPairDStream[K, JLong] = {
JavaPairDStream.scalaToJavaLong(dstream.countByKey(numPartitions));
@@ -165,19 +158,18 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
- * Creates a new DStream by counting the number of values of each key in each RDD
- * of `this` DStream. Hash partitioning is used to generate the RDDs with Spark's
- * `numPartitions` partitions.
+ * Create a new DStream by counting the number of values of each key in each RDD. Hash
+ * partitioning is used to generate the RDDs with the default number of partitions.
*/
def countByKey(): JavaPairDStream[K, JLong] = {
JavaPairDStream.scalaToJavaLong(dstream.countByKey());
}
/**
- * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
- * This is similar to `DStream.groupByKey()` but applies it over a sliding window.
- * The new DStream generates RDDs with the same interval as this DStream.
- * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * Creates a new DStream by applying `groupByKey` over a sliding window. This is similar to
+ * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs
+ * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with
+ * Spark's default number of partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
*/
@@ -186,9 +178,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
- * This is similar to `DStream.groupByKey()` but applies it over a sliding window.
- * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * Create a new DStream by applying `groupByKey` over a sliding window. Similar to
+ * `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to
+ * generate the RDDs with Spark's default number of partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
@@ -201,8 +193,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
- * This is similar to `DStream.groupByKey()` but applies it over a sliding window.
+ * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * Similar to `DStream.groupByKey()`, but applies it over a sliding window.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
@@ -218,8 +210,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
- * This is similar to `DStream.groupByKey()` but applies it over a sliding window.
+ * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * Similar to `DStream.groupByKey()`, but applies it over a sliding window.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
@@ -237,10 +229,10 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
- * This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
- * The new DStream generates RDDs with the same interval as this DStream.
- * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * Create a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
+ * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream
+ * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate
+ * the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
@@ -251,9 +243,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
- * This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
- * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to
+ * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
+ * generate the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
@@ -270,9 +262,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
- * This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
- * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to
+ * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
+ * generate the RDDs with `numPartitions` partitions.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
@@ -291,8 +283,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
- * This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
+ * Create a new DStream by applying `reduceByKey` over a sliding window. Similar to
+ * `DStream.reduceByKey()`, but applies it over a sliding window.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
@@ -310,11 +302,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, partitioner)
}
-
/**
- * Creates a new DStream by reducing over a window in a smarter way.
- * The reduced value of over a new window is calculated incrementally by using the
- * old window's reduce value :
+ * Create a new DStream by reducing over a using incremental computation.
+ * The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
@@ -338,9 +328,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Creates a new DStream by reducing over a window in a smarter way.
- * The reduced value of over a new window is calculated incrementally by using the
- * old window's reduce value :
+ * Create a new DStream by reducing over a using incremental computation.
+ * The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
@@ -371,9 +360,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Creates a new DStream by reducing over a window in a smarter way.
- * The reduced value of over a new window is calculated incrementally by using the
- * old window's reduce value :
+ * Create a new DStream by reducing over a using incremental computation.
+ * The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
@@ -403,7 +391,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Creates a new DStream by counting the number of values for each key over a window.
+ * Create a new DStream by counting the number of values for each key over a window.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
@@ -417,7 +405,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Creates a new DStream by counting the number of values for each key over a window.
+ * Create a new DStream by counting the number of values for each key over a window.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
@@ -449,24 +437,19 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Cogroups `this` DStream with `other` DStream. Each RDD of the new DStream will
- * be generated by cogrouping RDDs from`this`and `other` DStreams. Therefore, for
- * each key k in corresponding RDDs of `this` or `other` DStreams, the generated RDD
- * will contains a tuple with the list of values for that key in both RDDs.
- * HashPartitioner is used to partition each generated RDD into default number of partitions.
+ * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
+ * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
+ * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number
+ * of partitions.
*/
- def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = {
- implicit val cm: ClassManifest[W] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
- dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
+ def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
+ cogroup(other, defaultPartitioner())
}
/**
- * Cogroups `this` DStream with `other` DStream. Each RDD of the new DStream will
- * be generated by cogrouping RDDs from`this`and `other` DStreams. Therefore, for
- * each key k in corresponding RDDs of `this` or `other` DStreams, the generated RDD
- * will contains a tuple with the list of values for that key in both RDDs.
- * Partitioner is used to partition each generated RDD.
+ * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
+ * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
+ * key in both RDDs. Partitioner is used to partition each generated RDD.
*/
def cogroup[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
: JavaPairDStream[K, (JList[V], JList[W])] = {
@@ -477,8 +460,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Joins `this` DStream with `other` DStream. Each RDD of the new DStream will
- * be generated by joining RDDs from `this` and `other` DStreams. HashPartitioner is used
+ * Join `this` DStream with `other` DStream. HashPartitioner is used
* to partition each generated RDD into default number of partitions.
*/
def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = {
@@ -488,7 +470,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Joins `this` DStream with `other` DStream, that is, each RDD of the new DStream will
+ * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will
* be generated by joining RDDs from `this` and other DStream. Uses the given
* Partitioner to partition each generated RDD.
*/
@@ -500,16 +482,16 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
- * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+ * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
def saveAsHadoopFiles[F <: OutputFormat[K, V]](prefix: String, suffix: String) {
dstream.saveAsHadoopFiles(prefix, suffix)
}
/**
- * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
- * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+ * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
def saveAsHadoopFiles(
prefix: String,
@@ -521,8 +503,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
- * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+ * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
def saveAsHadoopFiles(
prefix: String,
@@ -535,16 +517,16 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
- * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+ * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](prefix: String, suffix: String) {
dstream.saveAsNewAPIHadoopFiles(prefix, suffix)
}
/**
- * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
- * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+ * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
def saveAsNewAPIHadoopFiles(
prefix: String,
@@ -556,8 +538,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
/**
- * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
- * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+ * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
def saveAsNewAPIHadoopFiles(
prefix: String,