From 6069446356d1daf28054b87ff1a3bf724a22df03 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 14 Jan 2013 10:34:13 -0800 Subject: Making comments consistent w/ Spark style --- .../src/main/scala/spark/streaming/DStream.scala | 52 +++---- .../spark/streaming/PairDStreamFunctions.scala | 170 +++++++++------------ .../spark/streaming/api/java/JavaDStream.scala | 14 +- .../spark/streaming/api/java/JavaDStreamLike.scala | 26 ++-- .../spark/streaming/api/java/JavaPairDStream.scala | 166 +++++++++----------- 5 files changed, 196 insertions(+), 232 deletions(-) diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index fbe3cebd6d..036763fe2f 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -98,10 +98,10 @@ abstract class DStream[T: ClassManifest] ( this } - /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER) - /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ def cache(): DStream[T] = persist() /** @@ -119,7 +119,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * This method initializes the DStream by setting the "zero" time, based on which + * Initialize the DStream by setting the "zero" time, based on which * the validity of future times is calculated. This method also recursively initializes * its parent DStreams. */ @@ -244,7 +244,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * Retrieves a precomputed RDD of this DStream, or computes the RDD. This is an internal + * Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal * method that should not be called directly. */ protected[streaming] def getOrCompute(time: Time): Option[RDD[T]] = { @@ -283,7 +283,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * Generates a SparkStreaming job for the given time. This is an internal method that + * Generate a SparkStreaming job for the given time. This is an internal method that * should not be called directly. This default implementation creates a job * that materializes the corresponding RDD. Subclasses of DStream may override this * (eg. ForEachDStream). @@ -302,7 +302,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * Dereferences RDDs that are older than rememberDuration. + * Dereference RDDs that are older than rememberDuration. */ protected[streaming] def forgetOldRDDs(time: Time) { val keys = generatedRDDs.keys @@ -328,7 +328,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * Refreshes the list of checkpointed RDDs that will be saved along with checkpoint of + * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of * this stream. This is an internal method that should not be called directly. This is * a default implementation that saves only the file names of the checkpointed RDDs to * checkpointData. Subclasses of DStream (especially those of InputDStream) may override @@ -373,7 +373,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * Restores the RDDs in generatedRDDs from the checkpointData. This is an internal method + * Restore the RDDs in generatedRDDs from the checkpointData. This is an internal method * that should not be called directly. This is a default implementation that recreates RDDs * from the checkpoint file names stored in checkpointData. Subclasses of DStream that * override the updateCheckpointData() method would also need to override this method. @@ -425,20 +425,20 @@ abstract class DStream[T: ClassManifest] ( // DStream operations // ======================================================================= - /** Returns a new DStream by applying a function to all elements of this DStream. */ + /** Return a new DStream by applying a function to all elements of this DStream. */ def map[U: ClassManifest](mapFunc: T => U): DStream[U] = { new MappedDStream(this, ssc.sc.clean(mapFunc)) } /** - * Returns a new DStream by applying a function to all elements of this DStream, + * Return a new DStream by applying a function to all elements of this DStream, * and then flattening the results */ def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]): DStream[U] = { new FlatMappedDStream(this, ssc.sc.clean(flatMapFunc)) } - /** Returns a new DStream containing only the elements that satisfy a predicate. */ + /** Return a new DStream containing only the elements that satisfy a predicate. */ def filter(filterFunc: T => Boolean): DStream[T] = new FilteredDStream(this, filterFunc) /** @@ -461,20 +461,20 @@ abstract class DStream[T: ClassManifest] ( } /** - * Returns a new DStream in which each RDD has a single element generated by reducing each RDD + * Return a new DStream in which each RDD has a single element generated by reducing each RDD * of this DStream. */ def reduce(reduceFunc: (T, T) => T): DStream[T] = this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2) /** - * Returns a new DStream in which each RDD has a single element generated by counting each RDD + * Return a new DStream in which each RDD has a single element generated by counting each RDD * of this DStream. */ def count(): DStream[Long] = this.map(_ => 1L).reduce(_ + _) /** - * Applies a function to each RDD in this DStream. This is an output operator, so + * Apply a function to each RDD in this DStream. This is an output operator, so * this DStream will be registered as an output stream and therefore materialized. */ def foreach(foreachFunc: RDD[T] => Unit) { @@ -482,7 +482,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * Applies a function to each RDD in this DStream. This is an output operator, so + * Apply a function to each RDD in this DStream. This is an output operator, so * this DStream will be registered as an output stream and therefore materialized. */ def foreach(foreachFunc: (RDD[T], Time) => Unit) { @@ -492,7 +492,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * Returns a new DStream in which each RDD is generated by applying a function + * Return a new DStream in which each RDD is generated by applying a function * on each RDD of this DStream. */ def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = { @@ -500,7 +500,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * Returns a new DStream in which each RDD is generated by applying a function + * Return a new DStream in which each RDD is generated by applying a function * on each RDD of this DStream. */ def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { @@ -508,7 +508,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * Prints the first ten elements of each RDD generated in this DStream. This is an output + * Print the first ten elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */ def print() { @@ -545,7 +545,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * Returns a new DStream which computed based on tumbling window on this DStream. + * Return a new DStream which computed based on tumbling window on this DStream. * This is equivalent to window(batchTime, batchTime). * @param batchDuration tumbling window duration; must be a multiple of this DStream's * batching interval @@ -553,7 +553,7 @@ abstract class DStream[T: ClassManifest] ( def tumble(batchDuration: Duration): DStream[T] = window(batchDuration, batchDuration) /** - * Returns a new DStream in which each RDD has a single element generated by reducing all + * Return a new DStream in which each RDD has a single element generated by reducing all * elements in a window over this DStream. windowDuration and slideDuration are as defined * in the window() operation. This is equivalent to * window(windowDuration, slideDuration).reduce(reduceFunc) @@ -578,7 +578,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * Returns a new DStream in which each RDD has a single element generated by counting the number + * Return a new DStream in which each RDD has a single element generated by counting the number * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the * window() operation. This is equivalent to window(windowDuration, slideDuration).count() */ @@ -587,20 +587,20 @@ abstract class DStream[T: ClassManifest] ( } /** - * Returns a new DStream by unifying data of another DStream with this DStream. + * Return a new DStream by unifying data of another DStream with this DStream. * @param that Another DStream having the same slideDuration as this DStream. */ def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that)) /** - * Returns all the RDDs defined by the Interval object (both end times included) + * Return all the RDDs defined by the Interval object (both end times included) */ protected[streaming] def slice(interval: Interval): Seq[RDD[T]] = { slice(interval.beginTime, interval.endTime) } /** - * Returns all the RDDs between 'fromTime' to 'toTime' (both included) + * Return all the RDDs between 'fromTime' to 'toTime' (both included) */ def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = { val rdds = new ArrayBuffer[RDD[T]]() @@ -616,7 +616,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * Saves each RDD in this DStream as a Sequence file of serialized objects. + * Save each RDD in this DStream as a Sequence file of serialized objects. * The file name at each batch interval is generated based on `prefix` and * `suffix`: "prefix-TIME_IN_MS.suffix". */ @@ -629,7 +629,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * Saves each RDD in this DStream as at text file, using string representation + * Save each RDD in this DStream as at text file, using string representation * of elements. The file name at each batch interval is generated based on * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala index 3952457339..f63279512b 100644 --- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -26,29 +26,23 @@ extends Serializable { } /** - * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream. - * Therefore, the values for each key in `this` DStream's RDDs are grouped into a - * single sequence to generate the RDDs of the new DStream. Hash partitioning is - * used to generate the RDDs with Spark's default number of partitions. + * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. */ def groupByKey(): DStream[(K, Seq[V])] = { groupByKey(defaultPartitioner()) } /** - * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream. - * Therefore, the values for each key in `this` DStream's RDDs are grouped into a - * single sequence to generate the RDDs of the new DStream. Hash partitioning is - * used to generate the RDDs with `numPartitions` partitions. + * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to + * generate the RDDs with `numPartitions` partitions. */ def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = { groupByKey(defaultPartitioner(numPartitions)) } /** - * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream. - * Therefore, the values for each key in `this` DStream's RDDs are grouped into a - * single sequence to generate the RDDs of the new DStream. [[spark.Partitioner]] + * Create a new DStream by applying `groupByKey` on each RDD. The supplied [[spark.Partitioner]] * is used to control the partitioning of each RDD. */ def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = { @@ -60,30 +54,27 @@ extends Serializable { } /** - * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream. - * Therefore, the values for each key in `this` DStream's RDDs is merged using the - * associative reduce function to generate the RDDs of the new DStream. - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the associative reduce function. Hash partitioning is used to generate the RDDs + * with Spark's default number of partitions. */ def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = { reduceByKey(reduceFunc, defaultPartitioner()) } /** - * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream. - * Therefore, the values for each key in `this` DStream's RDDs is merged using the - * associative reduce function to generate the RDDs of the new DStream. - * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs + * with `numPartitions` partitions. */ def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = { reduceByKey(reduceFunc, defaultPartitioner(numPartitions)) } /** - * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream. - * Therefore, the values for each key in `this` DStream's RDDs is merged using the - * associative reduce function to generate the RDDs of the new DStream. - * [[spark.Partitioner]] is used to control the partitioning of each RDD. + * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the supplied reduce function. [[spark.Partitioner]] is used to control the + * partitioning of each RDD. */ def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = { val cleanedReduceFunc = ssc.sc.clean(reduceFunc) @@ -91,9 +82,9 @@ extends Serializable { } /** - * Generic function to combine elements of each key in DStream's RDDs using custom function. - * This is similar to the combineByKey for RDDs. Please refer to combineByKey in - * [[spark.PairRDDFunctions]] for more information. + * Combine elements of each key in DStream's RDDs using custom function. This is similar to the + * combineByKey for RDDs. Please refer to combineByKey in [[spark.PairRDDFunctions]] for more + * information. */ def combineByKey[C: ClassManifest]( createCombiner: V => C, @@ -104,19 +95,18 @@ extends Serializable { } /** - * Creates a new DStream by counting the number of values of each key in each RDD - * of `this` DStream. Hash partitioning is used to generate the RDDs with Spark's - * `numPartitions` partitions. + * Create a new DStream by counting the number of values of each key in each RDD. Hash + * partitioning is used to generate the RDDs with Spark's `numPartitions` partitions. */ def countByKey(numPartitions: Int = self.ssc.sc.defaultParallelism): DStream[(K, Long)] = { self.map(x => (x._1, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions) } /** - * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream. - * This is similar to `DStream.groupByKey()` but applies it over a sliding window. - * The new DStream generates RDDs with the same interval as this DStream. - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * Creates a new DStream by applying `groupByKey` over a sliding window. This is similar to + * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs + * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with + * Spark's default number of partitions. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval */ @@ -125,9 +115,9 @@ extends Serializable { } /** - * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream. - * This is similar to `DStream.groupByKey()` but applies it over a sliding window. - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * Create a new DStream by applying `groupByKey` over a sliding window. Similar to + * `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -139,8 +129,8 @@ extends Serializable { } /** - * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream. - * This is similar to `DStream.groupByKey()` but applies it over a sliding window. + * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * Similar to `DStream.groupByKey()`, but applies it over a sliding window. * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval @@ -158,8 +148,8 @@ extends Serializable { } /** - * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream. - * This is similar to `DStream.groupByKey()` but applies it over a sliding window. + * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * Similar to `DStream.groupByKey()`, but applies it over a sliding window. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -176,10 +166,10 @@ extends Serializable { } /** - * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. - * This is similar to `DStream.reduceByKey()` but applies it over a sliding window. - * The new DStream generates RDDs with the same interval as this DStream. - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * Create a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. + * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream + * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate + * the RDDs with Spark's default number of partitions. * @param reduceFunc associative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval @@ -192,9 +182,9 @@ extends Serializable { } /** - * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. - * This is similar to `DStream.reduceByKey()` but applies it over a sliding window. - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to + * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. * @param reduceFunc associative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval @@ -211,9 +201,9 @@ extends Serializable { } /** - * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. - * This is similar to `DStream.reduceByKey()` but applies it over a sliding window. - * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to + * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with `numPartitions` partitions. * @param reduceFunc associative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval @@ -232,8 +222,8 @@ extends Serializable { } /** - * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. - * This is similar to `DStream.reduceByKey()` but applies it over a sliding window. + * Create a new DStream by applying `reduceByKey` over a sliding window. Similar to + * `DStream.reduceByKey()`, but applies it over a sliding window. * @param reduceFunc associative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval @@ -255,9 +245,8 @@ extends Serializable { } /** - * Creates a new DStream by reducing over a window in a smarter way. - * The reduced value of over a new window is calculated incrementally by using the - * old window's reduce value : + * Create a new DStream by reducing over a using incremental computation. + * The reduced value of over a new window is calculated using the old window's reduce value : * 1. reduce the new values that entered the window (e.g., adding new counts) * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. @@ -283,9 +272,8 @@ extends Serializable { } /** - * Creates a new DStream by reducing over a window in a smarter way. - * The reduced value of over a new window is calculated incrementally by using the - * old window's reduce value : + * Create a new DStream by reducing over a using incremental computation. + * The reduced value of over a new window is calculated using the old window's reduce value : * 1. reduce the new values that entered the window (e.g., adding new counts) * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. @@ -313,9 +301,8 @@ extends Serializable { } /** - * Creates a new DStream by reducing over a window in a smarter way. - * The reduced value of over a new window is calculated incrementally by using the - * old window's reduce value : + * Create a new DStream by reducing over a using incremental computation. + * The reduced value of over a new window is calculated using the old window's reduce value : * 1. reduce the new values that entered the window (e.g., adding new counts) * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. @@ -344,7 +331,7 @@ extends Serializable { } /** - * Creates a new DStream by counting the number of values for each key over a window. + * Create a new DStream by counting the number of values for each key over a window. * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval @@ -369,10 +356,9 @@ extends Serializable { } /** - * Creates a new "state" DStream where the state for each key is updated by applying - * the given function on the previous state of the key and the new values of the key from - * `this` DStream. Hash partitioning is used to generate the RDDs with Spark's default - * number of partitions. + * Create a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. * @tparam S State type @@ -384,9 +370,9 @@ extends Serializable { } /** - * Creates a new "state" DStream where the state for each key is updated by applying - * the given function on the previous state of the key and the new values of the key from - * `this` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * Create a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. * @param numPartitions Number of partitions of each RDD in the new DStream. @@ -400,9 +386,9 @@ extends Serializable { } /** - * Creates a new "state" DStream where the state for each key is updated by applying - * the given function on the previous state of the key and the new values of the key from - * `this` DStream. [[spark.Partitioner]] is used to control the partitioning of each RDD. + * Create a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of the key. + * [[spark.Partitioner]] is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. @@ -419,9 +405,9 @@ extends Serializable { } /** - * Creates a new "state" DStream where the state for each key is updated by applying - * the given function on the previous state of the key and the new values of the key from - * `this` DStream. [[spark.Partitioner]] is used to control the partitioning of each RDD. + * Create a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * [[spark.Paxrtitioner]] is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. Note, that * this function may generate a different a tuple with a different key @@ -451,22 +437,19 @@ extends Serializable { } /** - * Cogroups `this` DStream with `other` DStream. Each RDD of the new DStream will - * be generated by cogrouping RDDs from`this`and `other` DStreams. Therefore, for - * each key k in corresponding RDDs of `this` or `other` DStreams, the generated RDD - * will contains a tuple with the list of values for that key in both RDDs. - * HashPartitioner is used to partition each generated RDD into default number of partitions. + * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this` + * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that + * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number + * of partitions. */ def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = { cogroup(other, defaultPartitioner()) } /** - * Cogroups `this` DStream with `other` DStream. Each RDD of the new DStream will - * be generated by cogrouping RDDs from`this`and `other` DStreams. Therefore, for - * each key k in corresponding RDDs of `this` or `other` DStreams, the generated RDD - * will contains a tuple with the list of values for that key in both RDDs. - * Partitioner is used to partition each generated RDD. + * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this` + * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that + * key in both RDDs. Partitioner is used to partition each generated RDD. */ def cogroup[W: ClassManifest]( other: DStream[(K, W)], @@ -488,8 +471,7 @@ extends Serializable { } /** - * Joins `this` DStream with `other` DStream. Each RDD of the new DStream will - * be generated by joining RDDs from `this` and `other` DStreams. HashPartitioner is used + * Join `this` DStream with `other` DStream. HashPartitioner is used * to partition each generated RDD into default number of partitions. */ def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = { @@ -497,7 +479,7 @@ extends Serializable { } /** - * Joins `this` DStream with `other` DStream, that is, each RDD of the new DStream will + * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will * be generated by joining RDDs from `this` and other DStream. Uses the given * Partitioner to partition each generated RDD. */ @@ -513,7 +495,7 @@ extends Serializable { } /** - * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" */ def saveAsHadoopFiles[F <: OutputFormat[K, V]]( @@ -524,7 +506,7 @@ extends Serializable { } /** - * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" */ def saveAsHadoopFiles( @@ -543,8 +525,8 @@ extends Serializable { } /** - * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated - * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]]( prefix: String, @@ -554,8 +536,8 @@ extends Serializable { } /** - * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated - * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ def saveAsNewAPIHadoopFiles( prefix: String, diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala index f85864df5d..32faef5670 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala @@ -9,20 +9,20 @@ import spark.storage.StorageLevel class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T]) extends JavaDStreamLike[T, JavaDStream[T]] { - /** Returns a new DStream containing only the elements that satisfy a predicate. */ + /** Return a new DStream containing only the elements that satisfy a predicate. */ def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] = dstream.filter((x => f(x).booleanValue())) - /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ def cache(): JavaDStream[T] = dstream.cache() - /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ def persist(): JavaDStream[T] = dstream.cache() - /** Persists the RDDs of this DStream with the given storage level */ + /** Persist the RDDs of this DStream with the given storage level */ def persist(storageLevel: StorageLevel): JavaDStream[T] = dstream.persist(storageLevel) - /** Method that generates a RDD for the given duration */ + /** Generate an RDD for the given duration */ def compute(validTime: Time): JavaRDD[T] = { dstream.compute(validTime) match { case Some(rdd) => new JavaRDD(rdd) @@ -51,7 +51,7 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM dstream.window(windowDuration, slideDuration) /** - * Returns a new DStream which computed based on tumbling window on this DStream. + * Return a new DStream which computed based on tumbling window on this DStream. * This is equivalent to window(batchDuration, batchDuration). * @param batchDuration tumbling window duration; must be a multiple of this DStream's interval */ @@ -59,7 +59,7 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM dstream.tumble(batchDuration) /** - * Returns a new DStream by unifying data of another DStream with this DStream. + * Return a new DStream by unifying data of another DStream with this DStream. * @param that Another DStream having the same interval (i.e., slideDuration) as this DStream. */ def union(that: JavaDStream[T]): JavaDStream[T] = diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala index 4257ecd583..32df665a98 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala @@ -22,19 +22,19 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable } /** - * Prints the first ten elements of each RDD generated in this DStream. This is an output + * Print the first ten elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */ def print() = dstream.print() /** - * Returns a new DStream in which each RDD has a single element generated by counting each RDD + * Return a new DStream in which each RDD has a single element generated by counting each RDD * of this DStream. */ def count(): JavaDStream[JLong] = dstream.count() /** - * Returns a new DStream in which each RDD has a single element generated by counting the number + * Return a new DStream in which each RDD has a single element generated by counting the number * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the * window() operation. This is equivalent to window(windowDuration, slideDuration).count() */ @@ -50,15 +50,15 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable def glom(): JavaDStream[JList[T]] = new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq))) - /** Returns the StreamingContext associated with this DStream */ + /** Return the StreamingContext associated with this DStream */ def context(): StreamingContext = dstream.context() - /** Returns a new DStream by applying a function to all elements of this DStream. */ + /** Return a new DStream by applying a function to all elements of this DStream. */ def map[R](f: JFunction[T, R]): JavaDStream[R] = { new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType()) } - /** Returns a new DStream by applying a function to all elements of this DStream. */ + /** Return a new DStream by applying a function to all elements of this DStream. */ def map[K, V](f: PairFunction[T, K, V]): JavaPairDStream[K, V] = { def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType()) @@ -86,13 +86,13 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable } /** - * Returns a new DStream in which each RDD has a single element generated by reducing each RDD + * Return a new DStream in which each RDD has a single element generated by reducing each RDD * of this DStream. */ def reduce(f: JFunction2[T, T, T]): JavaDStream[T] = dstream.reduce(f) /** - * Returns a new DStream in which each RDD has a single element generated by reducing all + * Return a new DStream in which each RDD has a single element generated by reducing all * elements in a window over this DStream. windowDuration and slideDuration are as defined in the * window() operation. This is equivalent to window(windowDuration, slideDuration).reduce(reduceFunc) */ @@ -106,14 +106,14 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable } /** - * Returns all the RDDs between 'fromDuration' to 'toDuration' (both included) + * Return all the RDDs between 'fromDuration' to 'toDuration' (both included) */ def slice(fromDuration: Duration, toDuration: Duration): JList[JavaRDD[T]] = { new util.ArrayList(dstream.slice(fromDuration, toDuration).map(new JavaRDD(_)).toSeq) } /** - * Applies a function to each RDD in this DStream. This is an output operator, so + * Apply a function to each RDD in this DStream. This is an output operator, so * this DStream will be registered as an output stream and therefore materialized. */ def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) { @@ -121,7 +121,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable } /** - * Applies a function to each RDD in this DStream. This is an output operator, so + * Apply a function to each RDD in this DStream. This is an output operator, so * this DStream will be registered as an output stream and therefore materialized. */ def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) { @@ -129,7 +129,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable } /** - * Returns a new DStream in which each RDD is generated by applying a function + * Return a new DStream in which each RDD is generated by applying a function * on each RDD of this DStream. */ def transform[U](transformFunc: JFunction[JavaRDD[T], JavaRDD[U]]): JavaDStream[U] = { @@ -141,7 +141,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable } /** - * Returns a new DStream in which each RDD is generated by applying a function + * Return a new DStream in which each RDD is generated by applying a function * on each RDD of this DStream. */ def transform[U](transformFunc: JFunction2[JavaRDD[T], Time, JavaRDD[U]]): JavaDStream[U] = { diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala index c761fdd3bd..16b476ec90 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala @@ -86,19 +86,15 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( // ======================================================================= /** - * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream. - * Therefore, the values for each key in `this` DStream's RDDs are grouped into a - * single sequence to generate the RDDs of the new DStream. Hash partitioning is - * used to generate the RDDs with Spark's default number of partitions. + * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. */ def groupByKey(): JavaPairDStream[K, JList[V]] = dstream.groupByKey().mapValues(seqAsJavaList _) /** - * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream. - * Therefore, the values for each key in `this` DStream's RDDs are grouped into a - * single sequence to generate the RDDs of the new DStream. Hash partitioning is - * used to generate the RDDs with `numPartitions` partitions. + * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to + * generate the RDDs with `numPartitions` partitions. */ def groupByKey(numPartitions: Int): JavaPairDStream[K, JList[V]] = dstream.groupByKey(numPartitions).mapValues(seqAsJavaList _) @@ -113,37 +109,34 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.groupByKey(partitioner).mapValues(seqAsJavaList _) /** - * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream. - * Therefore, the values for each key in `this` DStream's RDDs is merged using the - * associative reduce function to generate the RDDs of the new DStream. - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the associative reduce function. Hash partitioning is used to generate the RDDs + * with Spark's default number of partitions. */ def reduceByKey(func: JFunction2[V, V, V]): JavaPairDStream[K, V] = dstream.reduceByKey(func) /** - * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream. - * Therefore, the values for each key in `this` DStream's RDDs is merged using the - * associative reduce function to generate the RDDs of the new DStream. - * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs + * with `numPartitions` partitions. */ def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairDStream[K, V] = dstream.reduceByKey(func, numPartitions) /** - * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream. - * Therefore, the values for each key in `this` DStream's RDDs is merged using the - * associative reduce function to generate the RDDs of the new DStream. - * [[spark.Partitioner]] is used to control the partitioning of each RDD. + * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the supplied reduce function. [[spark.Partitioner]] is used to control the + * partitioning of each RDD. */ def reduceByKey(func: JFunction2[V, V, V], partitioner: Partitioner): JavaPairDStream[K, V] = { dstream.reduceByKey(func, partitioner) } /** - * Generic function to combine elements of each key in DStream's RDDs using custom function. - * This is similar to the combineByKey for RDDs. Please refer to combineByKey in - * [[spark.PairRDDFunctions]] for more information. + * Combine elements of each key in DStream's RDDs using custom function. This is similar to the + * combineByKey for RDDs. Please refer to combineByKey in [[spark.PairRDDFunctions]] for more + * information. */ def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], @@ -156,8 +149,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Creates a new DStream by counting the number of values of each key in each RDD - * of `this` DStream. Hash partitioning is used to generate the RDDs. + * Create a new DStream by counting the number of values of each key in each RDD. Hash + * partitioning is used to generate the RDDs with Spark's `numPartitions` partitions. */ def countByKey(numPartitions: Int): JavaPairDStream[K, JLong] = { JavaPairDStream.scalaToJavaLong(dstream.countByKey(numPartitions)); @@ -165,19 +158,18 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** - * Creates a new DStream by counting the number of values of each key in each RDD - * of `this` DStream. Hash partitioning is used to generate the RDDs with Spark's - * `numPartitions` partitions. + * Create a new DStream by counting the number of values of each key in each RDD. Hash + * partitioning is used to generate the RDDs with the default number of partitions. */ def countByKey(): JavaPairDStream[K, JLong] = { JavaPairDStream.scalaToJavaLong(dstream.countByKey()); } /** - * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream. - * This is similar to `DStream.groupByKey()` but applies it over a sliding window. - * The new DStream generates RDDs with the same interval as this DStream. - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * Creates a new DStream by applying `groupByKey` over a sliding window. This is similar to + * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs + * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with + * Spark's default number of partitions. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval */ @@ -186,9 +178,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream. - * This is similar to `DStream.groupByKey()` but applies it over a sliding window. - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * Create a new DStream by applying `groupByKey` over a sliding window. Similar to + * `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -201,8 +193,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream. - * This is similar to `DStream.groupByKey()` but applies it over a sliding window. + * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * Similar to `DStream.groupByKey()`, but applies it over a sliding window. * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval @@ -218,8 +210,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream. - * This is similar to `DStream.groupByKey()` but applies it over a sliding window. + * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * Similar to `DStream.groupByKey()`, but applies it over a sliding window. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -237,10 +229,10 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. - * This is similar to `DStream.reduceByKey()` but applies it over a sliding window. - * The new DStream generates RDDs with the same interval as this DStream. - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * Create a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. + * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream + * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate + * the RDDs with Spark's default number of partitions. * @param reduceFunc associative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval @@ -251,9 +243,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. - * This is similar to `DStream.reduceByKey()` but applies it over a sliding window. - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to + * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. * @param reduceFunc associative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval @@ -270,9 +262,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. - * This is similar to `DStream.reduceByKey()` but applies it over a sliding window. - * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to + * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with `numPartitions` partitions. * @param reduceFunc associative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval @@ -291,8 +283,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. - * This is similar to `DStream.reduceByKey()` but applies it over a sliding window. + * Create a new DStream by applying `reduceByKey` over a sliding window. Similar to + * `DStream.reduceByKey()`, but applies it over a sliding window. * @param reduceFunc associative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval @@ -310,11 +302,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, partitioner) } - /** - * Creates a new DStream by reducing over a window in a smarter way. - * The reduced value of over a new window is calculated incrementally by using the - * old window's reduce value : + * Create a new DStream by reducing over a using incremental computation. + * The reduced value of over a new window is calculated using the old window's reduce value : * 1. reduce the new values that entered the window (e.g., adding new counts) * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. @@ -338,9 +328,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Creates a new DStream by reducing over a window in a smarter way. - * The reduced value of over a new window is calculated incrementally by using the - * old window's reduce value : + * Create a new DStream by reducing over a using incremental computation. + * The reduced value of over a new window is calculated using the old window's reduce value : * 1. reduce the new values that entered the window (e.g., adding new counts) * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. @@ -371,9 +360,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Creates a new DStream by reducing over a window in a smarter way. - * The reduced value of over a new window is calculated incrementally by using the - * old window's reduce value : + * Create a new DStream by reducing over a using incremental computation. + * The reduced value of over a new window is calculated using the old window's reduce value : * 1. reduce the new values that entered the window (e.g., adding new counts) * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. @@ -403,7 +391,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Creates a new DStream by counting the number of values for each key over a window. + * Create a new DStream by counting the number of values for each key over a window. * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval @@ -417,7 +405,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Creates a new DStream by counting the number of values for each key over a window. + * Create a new DStream by counting the number of values for each key over a window. * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval @@ -449,24 +437,19 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Cogroups `this` DStream with `other` DStream. Each RDD of the new DStream will - * be generated by cogrouping RDDs from`this`and `other` DStreams. Therefore, for - * each key k in corresponding RDDs of `this` or `other` DStreams, the generated RDD - * will contains a tuple with the list of values for that key in both RDDs. - * HashPartitioner is used to partition each generated RDD into default number of partitions. + * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this` + * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that + * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number + * of partitions. */ - def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = { - implicit val cm: ClassManifest[W] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] - dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) + def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = { + cogroup(other, defaultPartitioner()) } /** - * Cogroups `this` DStream with `other` DStream. Each RDD of the new DStream will - * be generated by cogrouping RDDs from`this`and `other` DStreams. Therefore, for - * each key k in corresponding RDDs of `this` or `other` DStreams, the generated RDD - * will contains a tuple with the list of values for that key in both RDDs. - * Partitioner is used to partition each generated RDD. + * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this` + * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that + * key in both RDDs. Partitioner is used to partition each generated RDD. */ def cogroup[W](other: JavaPairDStream[K, W], partitioner: Partitioner) : JavaPairDStream[K, (JList[V], JList[W])] = { @@ -477,8 +460,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Joins `this` DStream with `other` DStream. Each RDD of the new DStream will - * be generated by joining RDDs from `this` and `other` DStreams. HashPartitioner is used + * Join `this` DStream with `other` DStream. HashPartitioner is used * to partition each generated RDD into default number of partitions. */ def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = { @@ -488,7 +470,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Joins `this` DStream with `other` DStream, that is, each RDD of the new DStream will + * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will * be generated by joining RDDs from `this` and other DStream. Uses the given * Partitioner to partition each generated RDD. */ @@ -500,16 +482,16 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated - * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ def saveAsHadoopFiles[F <: OutputFormat[K, V]](prefix: String, suffix: String) { dstream.saveAsHadoopFiles(prefix, suffix) } /** - * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated - * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ def saveAsHadoopFiles( prefix: String, @@ -521,8 +503,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated - * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ def saveAsHadoopFiles( prefix: String, @@ -535,16 +517,16 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated - * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](prefix: String, suffix: String) { dstream.saveAsNewAPIHadoopFiles(prefix, suffix) } /** - * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated - * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ def saveAsNewAPIHadoopFiles( prefix: String, @@ -556,8 +538,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } /** - * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated - * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ def saveAsNewAPIHadoopFiles( prefix: String, -- cgit v1.2.3