aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala6
-rw-r--r--docs/streaming-programming-guide.md45
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala35
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala293
-rw-r--r--streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala2
5 files changed, 331 insertions, 50 deletions
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 07ae2d647c..d95b66ad78 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -199,9 +199,9 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
/**
- * Merge the values for each key using an associative reduce function. This will also perform
- * the merging locally on each mapper before sending results to a reducer, similarly to a
- * "combiner" in MapReduce.
+ * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
+ * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
+ * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
*/
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other, partitioner).flatMapValues {
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 05a88ce7bd..b6da7af654 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -43,7 +43,7 @@ A complete list of input sources is available in the [StreamingContext API docum
# DStream Operations
-Once an input stream has been created, you can transform it using _stream operators_. Most of these operators return new DStreams which you can further transform. Eventually, you'll need to call an _output operator_, which forces evaluation of the stream by writing data out to an external source.
+Once an input DStream has been created, you can transform it using _DStream operators_. Most of these operators return new DStreams which you can further transform. Eventually, you'll need to call an _output operator_, which forces evaluation of the DStream by writing data out to an external source.
## Transformations
@@ -53,11 +53,11 @@ DStreams support many of the transformations available on normal Spark RDD's:
<tr><th style="width:25%">Transformation</th><th>Meaning</th></tr>
<tr>
<td> <b>map</b>(<i>func</i>) </td>
- <td> Return a new stream formed by passing each element of the source through a function <i>func</i>. </td>
+ <td> Returns a new DStream formed by passing each element of the source through a function <i>func</i>. </td>
</tr>
<tr>
<td> <b>filter</b>(<i>func</i>) </td>
- <td> Return a new stream formed by selecting those elements of the source on which <i>func</i> returns true. </td>
+ <td> Returns a new stream formed by selecting those elements of the source on which <i>func</i> returns true. </td>
</tr>
<tr>
<td> <b>flatMap</b>(<i>func</i>) </td>
@@ -88,55 +88,60 @@ DStreams support many of the transformations available on normal Spark RDD's:
</tr>
<tr>
<td> <b>cogroup</b>(<i>otherStream</i>, [<i>numTasks</i>]) </td>
- <td> When called on streams of type (K, V) and (K, W), returns a stream of (K, Seq[V], Seq[W]) tuples. This operation is also called <code>groupWith</code>. </td>
+ <td> When called on DStream of type (K, V) and (K, W), returns a DStream of (K, Seq[V], Seq[W]) tuples.</td>
</tr>
<tr>
<td> <b>reduce</b>(<i>func</i>) </td>
- <td> Create a new single-element stream by aggregating the elements of the stream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed correctly in parallel. </td>
+ <td> Returns a new DStream of single-element RDDs by aggregating the elements of the stream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed correctly in parallel. </td>
+</tr>
+<tr>
+ <td> <b>transform</b>(<i>func</i>) </td>
+ <td> Returns a new DStream by applying func (a RDD-to-RDD function) to every RDD of the stream. This can be used to do arbitrary RDD operations on the DStream. </td>
</tr>
</table>
-Spark Streaming features windowed computations, which allow you to report statistics over a sliding window of data. All window functions take a <i>windowTime</i>, which represents the width of the window and a <i>slideTime</i>, which represents the frequency during which the window is calculated.
+Spark Streaming features windowed computations, which allow you to report statistics over a sliding window of data. All window functions take a <i>windowDuration</i>, which represents the width of the window and a <i>slideTime</i>, which represents the frequency during which the window is calculated.
<table class="table">
<tr><th style="width:25%">Transformation</th><th>Meaning</th></tr>
<tr>
- <td> <b>window</b>(<i>windowTime</i>, </i>slideTime</i>) </td>
- <td> Return a new stream which is computed based on windowed batches of the source stream. <i>windowTime</i> is the width of the window and <i>slideTime</i> is the frequency during which the window is calculated. Both times must be multiples of the batch interval.
+ <td> <b>window</b>(<i>windowDuration</i>, </i>slideTime</i>) </td>
+ <td> Return a new stream which is computed based on windowed batches of the source stream. <i>windowDuration</i> is the width of the window and <i>slideTime</i> is the frequency during which the window is calculated. Both times must be multiples of the batch interval.
</td>
</tr>
<tr>
- <td> <b>countByWindow</b>(<i>windowTime</i>, </i>slideTime</i>) </td>
- <td> Return a sliding count of elements in the stream. <i>windowTime</i> and <i>slideTime</i> are exactly as defined in <code>window()</code>.
+ <td> <b>countByWindow</b>(<i>windowDuration</i>, </i>slideTime</i>) </td>
+ <td> Return a sliding count of elements in the stream. <i>windowDuration</i> and <i>slideDuration</i> are exactly as defined in <code>window()</code>.
</td>
</tr>
<tr>
- <td> <b>reduceByWindow</b>(<i>func</i>, <i>windowTime</i>, </i>slideTime</i>) </td>
- <td> Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using <i>func</i>. The function should be associative so that it can be computed correctly in parallel. <i>windowTime</i> and <i>slideTime</i> are exactly as defined in <code>window()</code>.
+ <td> <b>reduceByWindow</b>(<i>func</i>, <i>windowDuration</i>, </i>slideDuration</i>) </td>
+ <td> Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using <i>func</i>. The function should be associative so that it can be computed correctly in parallel. <i>windowDuration</i> and <i>slideDuration</i> are exactly as defined in <code>window()</code>.
</td>
</tr>
<tr>
- <td> <b>groupByKeyAndWindow</b>(windowTime, slideTime, [<i>numTasks</i>])
+ <td> <b>groupByKeyAndWindow</b>(windowDuration, slideDuration, [<i>numTasks</i>])
</td>
<td> When called on a stream of (K, V) pairs, returns a stream of (K, Seq[V]) pairs over a sliding window. <br />
-<b>Note:</b> By default, this uses only 8 parallel tasks to do the grouping. You can pass an optional <code>numTasks</code> argument to set a different number of tasks. <i>windowTime</i> and <i>slideTime</i> are exactly as defined in <code>window()</code>.
+<b>Note:</b> By default, this uses only 8 parallel tasks to do the grouping. You can pass an optional <code>numTasks</code> argument to set a different number of tasks. <i>windowDuration</i> and <i>slideDuration</i> are exactly as defined in <code>window()</code>.
</td>
</tr>
<tr>
<td> <b>reduceByKeyAndWindow</b>(<i>func</i>, [<i>numTasks</i>]) </td>
<td> When called on a stream of (K, V) pairs, returns a stream of (K, V) pairs where the values for each key are aggregated using the given reduce function over batches within a sliding window. Like in <code>groupByKeyAndWindow</code>, the number of reduce tasks is configurable through an optional second argument.
- <i>windowTime</i> and <i>slideTime</i> are exactly as defined in <code>window()</code>.
+ <i>windowDuration</i> and <i>slideDuration</i> are exactly as defined in <code>window()</code>.
</td>
</tr>
<tr>
<td> <b>countByKeyAndWindow</b>([<i>numTasks</i>]) </td>
<td> When called on a stream of (K, V) pairs, returns a stream of (K, Int) pairs where the values for each key are the count within a sliding window. Like in <code>countByKeyAndWindow</code>, the number of reduce tasks is configurable through an optional second argument.
- <i>windowTime</i> and <i>slideTime</i> are exactly as defined in <code>window()</code>.
+ <i>windowDuration</i> and <i>slideDuration</i> are exactly as defined in <code>window()</code>.
</td>
</tr>
</table>
+A complete list of DStream operations is available in the API documentation of [DStream](api/streaming/index.html#spark.streaming.DStream) and [PairDStreamFunctions](api/streaming/index.html#spark.streaming.PairDStreamFunctions).
## Output Operations
When an output operator is called, it triggers the computation of a stream. Currently the following output operators are defined:
@@ -144,7 +149,7 @@ When an output operator is called, it triggers the computation of a stream. Curr
<table class="table">
<tr><th style="width:25%">Operator</th><th>Meaning</th></tr>
<tr>
- <td> <b>foreachRDD</b>(<i>func</i>) </td>
+ <td> <b>foreach</b>(<i>func</i>) </td>
<td> The fundamental output operator. Applies a function, <i>func</i>, to each RDD generated from the stream. This function should have side effects, such as printing output, saving the RDD to external files, or writing it over the network to an external system. </td>
</tr>
@@ -155,18 +160,18 @@ When an output operator is called, it triggers the computation of a stream. Curr
<tr>
<td> <b>saveAsObjectFiles</b>(<i>prefix</i>, [<i>suffix</i>]) </td>
- <td> Save this DStream's contents as a <code>SequenceFile</code> of serialized objects. The file name at each batch interval is calculated based on <i>prefix</i> and <i>suffix</i>: <i>"prefix-TIME_IN_MS[.suffix]"</i>.
+ <td> Save this DStream's contents as a <code>SequenceFile</code> of serialized objects. The file name at each batch interval is generated based on <i>prefix</i> and <i>suffix</i>: <i>"prefix-TIME_IN_MS[.suffix]"</i>.
</td>
</tr>
<tr>
<td> <b>saveAsTextFiles</b>(<i>prefix</i>, [<i>suffix</i>]) </td>
- <td> Save this DStream's contents as a text files. The file name at each batch interval is calculated based on <i>prefix</i> and <i>suffix</i>: <i>"prefix-TIME_IN_MS[.suffix]"</i>. </td>
+ <td> Save this DStream's contents as a text files. The file name at each batch interval is generated based on <i>prefix</i> and <i>suffix</i>: <i>"prefix-TIME_IN_MS[.suffix]"</i>. </td>
</tr>
<tr>
<td> <b>saveAsHadoopFiles</b>(<i>prefix</i>, [<i>suffix</i>]) </td>
- <td> Save this DStream's contents as a Hadoop file. The file name at each batch interval is calculated based on <i>prefix</i> and <i>suffix</i>: <i>"prefix-TIME_IN_MS[.suffix]"</i>. </td>
+ <td> Save this DStream's contents as a Hadoop file. The file name at each batch interval is generated based on <i>prefix</i> and <i>suffix</i>: <i>"prefix-TIME_IN_MS[.suffix]"</i>. </td>
</tr>
</table>
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index c89fb7723e..d94548a4f3 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -471,7 +471,7 @@ abstract class DStream[T: ClassManifest] (
* Returns a new DStream in which each RDD has a single element generated by counting each RDD
* of this DStream.
*/
- def count(): DStream[Int] = this.map(_ => 1).reduce(_ + _)
+ def count(): DStream[Long] = this.map(_ => 1L).reduce(_ + _)
/**
* Applies a function to each RDD in this DStream. This is an output operator, so
@@ -529,17 +529,16 @@ abstract class DStream[T: ClassManifest] (
* Return a new DStream which is computed based on windowed batches of this DStream.
* The new DStream generates RDDs with the same interval as this DStream.
* @param windowDuration width of the window; must be a multiple of this DStream's interval.
- * @return
*/
def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)
/**
* Return a new DStream which is computed based on windowed batches of this DStream.
- * @param windowDuration duration (i.e., width) of the window;
- * must be a multiple of this DStream's interval
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
- * the new DStream will generate RDDs); must be a multiple of this
- * DStream's interval
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
*/
def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = {
new WindowedDStream(this, windowDuration, slideDuration)
@@ -548,16 +547,22 @@ abstract class DStream[T: ClassManifest] (
/**
* Returns a new DStream which computed based on tumbling window on this DStream.
* This is equivalent to window(batchTime, batchTime).
- * @param batchTime tumbling window duration; must be a multiple of this DStream's interval
+ * @param batchTime tumbling window duration; must be a multiple of this DStream's
+ * batching interval
*/
def tumble(batchTime: Duration): DStream[T] = window(batchTime, batchTime)
/**
* Returns a new DStream in which each RDD has a single element generated by reducing all
- * elements in a window over this DStream. windowDuration and slideDuration are as defined in the
- * window() operation. This is equivalent to window(windowDuration, slideDuration).reduce(reduceFunc)
+ * elements in a window over this DStream. windowDuration and slideDuration are as defined
+ * in the window() operation. This is equivalent to
+ * window(windowDuration, slideDuration).reduce(reduceFunc)
*/
- def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T] = {
+ def reduceByWindow(
+ reduceFunc: (T, T) => T,
+ windowDuration: Duration,
+ slideDuration: Duration
+ ): DStream[T] = {
this.window(windowDuration, slideDuration).reduce(reduceFunc)
}
@@ -577,8 +582,8 @@ abstract class DStream[T: ClassManifest] (
* of elements in a window over this DStream. windowDuration and slideDuration are as defined in the
* window() operation. This is equivalent to window(windowDuration, slideDuration).count()
*/
- def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Int] = {
- this.map(_ => 1).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
+ def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long] = {
+ this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
}
/**
@@ -612,6 +617,8 @@ abstract class DStream[T: ClassManifest] (
/**
* Saves each RDD in this DStream as a Sequence file of serialized objects.
+ * The file name at each batch interval is generated based on `prefix` and
+ * `suffix`: "prefix-TIME_IN_MS.suffix".
*/
def saveAsObjectFiles(prefix: String, suffix: String = "") {
val saveFunc = (rdd: RDD[T], time: Time) => {
@@ -622,7 +629,9 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Saves each RDD in this DStream as at text file, using string representation of elements.
+ * Saves each RDD in this DStream as at text file, using string representation
+ * of elements. The file name at each batch interval is generated based on
+ * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
def saveAsTextFiles(prefix: String, suffix: String = "") {
val saveFunc = (rdd: RDD[T], time: Time) => {
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index 482d01300d..3952457339 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -25,34 +25,76 @@ extends Serializable {
new HashPartitioner(numPartitions)
}
+ /**
+ * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream.
+ * Therefore, the values for each key in `this` DStream's RDDs are grouped into a
+ * single sequence to generate the RDDs of the new DStream. Hash partitioning is
+ * used to generate the RDDs with Spark's default number of partitions.
+ */
def groupByKey(): DStream[(K, Seq[V])] = {
groupByKey(defaultPartitioner())
}
+ /**
+ * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream.
+ * Therefore, the values for each key in `this` DStream's RDDs are grouped into a
+ * single sequence to generate the RDDs of the new DStream. Hash partitioning is
+ * used to generate the RDDs with `numPartitions` partitions.
+ */
def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = {
groupByKey(defaultPartitioner(numPartitions))
}
+ /**
+ * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream.
+ * Therefore, the values for each key in `this` DStream's RDDs are grouped into a
+ * single sequence to generate the RDDs of the new DStream. [[spark.Partitioner]]
+ * is used to control the partitioning of each RDD.
+ */
def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {
val createCombiner = (v: V) => ArrayBuffer[V](v)
val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v)
val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2)
- combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner).asInstanceOf[DStream[(K, Seq[V])]]
+ combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner)
+ .asInstanceOf[DStream[(K, Seq[V])]]
}
+ /**
+ * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream.
+ * Therefore, the values for each key in `this` DStream's RDDs is merged using the
+ * associative reduce function to generate the RDDs of the new DStream.
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ */
def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = {
reduceByKey(reduceFunc, defaultPartitioner())
}
+ /**
+ * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream.
+ * Therefore, the values for each key in `this` DStream's RDDs is merged using the
+ * associative reduce function to generate the RDDs of the new DStream.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ */
def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = {
reduceByKey(reduceFunc, defaultPartitioner(numPartitions))
}
+ /**
+ * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream.
+ * Therefore, the values for each key in `this` DStream's RDDs is merged using the
+ * associative reduce function to generate the RDDs of the new DStream.
+ * [[spark.Partitioner]] is used to control the partitioning of each RDD.
+ */
def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner)
}
+ /**
+ * Generic function to combine elements of each key in DStream's RDDs using custom function.
+ * This is similar to the combineByKey for RDDs. Please refer to combineByKey in
+ * [[spark.PairRDDFunctions]] for more information.
+ */
def combineByKey[C: ClassManifest](
createCombiner: V => C,
mergeValue: (C, V) => C,
@@ -61,14 +103,52 @@ extends Serializable {
new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner)
}
+ /**
+ * Creates a new DStream by counting the number of values of each key in each RDD
+ * of `this` DStream. Hash partitioning is used to generate the RDDs with Spark's
+ * `numPartitions` partitions.
+ */
def countByKey(numPartitions: Int = self.ssc.sc.defaultParallelism): DStream[(K, Long)] = {
self.map(x => (x._1, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
}
+ /**
+ * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * This is similar to `DStream.groupByKey()` but applies it over a sliding window.
+ * The new DStream generates RDDs with the same interval as this DStream.
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ */
+ def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Seq[V])] = {
+ groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner())
+ }
+
+ /**
+ * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * This is similar to `DStream.groupByKey()` but applies it over a sliding window.
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = {
groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner())
}
+ /**
+ * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * This is similar to `DStream.groupByKey()` but applies it over a sliding window.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions Number of partitions of each RDD in the new DStream.
+ */
def groupByKeyAndWindow(
windowDuration: Duration,
slideDuration: Duration,
@@ -77,6 +157,16 @@ extends Serializable {
groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions))
}
+ /**
+ * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * This is similar to `DStream.groupByKey()` but applies it over a sliding window.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ */
def groupByKeyAndWindow(
windowDuration: Duration,
slideDuration: Duration,
@@ -85,6 +175,15 @@ extends Serializable {
self.window(windowDuration, slideDuration).groupByKey(partitioner)
}
+ /**
+ * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
+ * This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
+ * The new DStream generates RDDs with the same interval as this DStream.
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ */
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
windowDuration: Duration
@@ -92,6 +191,17 @@ extends Serializable {
reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner())
}
+ /**
+ * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
+ * This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
windowDuration: Duration,
@@ -100,6 +210,18 @@ extends Serializable {
reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner())
}
+ /**
+ * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
+ * This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions Number of partitions of each RDD in the new DStream.
+ */
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
windowDuration: Duration,
@@ -109,6 +231,17 @@ extends Serializable {
reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions))
}
+ /**
+ * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
+ * This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ */
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
windowDuration: Duration,
@@ -121,12 +254,23 @@ extends Serializable {
.reduceByKey(cleanedReduceFunc, partitioner)
}
- // This method is the efficient sliding window reduce operation,
- // which requires the specification of an inverse reduce function,
- // so that new elements introduced in the window can be "added" using
- // reduceFunc to the previous window's result and old elements can be
- // "subtracted using invReduceFunc.
-
+ /**
+ * Creates a new DStream by reducing over a window in a smarter way.
+ * The reduced value of over a new window is calculated incrementally by using the
+ * old window's reduce value :
+ * 1. reduce the new values that entered the window (e.g., adding new counts)
+ * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
+ * However, it is applicable to only "invertible reduce functions".
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
@@ -138,6 +282,24 @@ extends Serializable {
reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner())
}
+ /**
+ * Creates a new DStream by reducing over a window in a smarter way.
+ * The reduced value of over a new window is calculated incrementally by using the
+ * old window's reduce value :
+ * 1. reduce the new values that entered the window (e.g., adding new counts)
+ * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
+ * However, it is applicable to only "invertible reduce functions".
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions Number of partitions of each RDD in the new DStream.
+ */
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
@@ -150,6 +312,23 @@ extends Serializable {
reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions))
}
+ /**
+ * Creates a new DStream by reducing over a window in a smarter way.
+ * The reduced value of over a new window is calculated incrementally by using the
+ * old window's reduce value :
+ * 1. reduce the new values that entered the window (e.g., adding new counts)
+ * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
+ * However, it is applicable to only "invertible reduce functions".
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ */
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
@@ -164,6 +343,16 @@ extends Serializable {
self, cleanedReduceFunc, cleanedInvReduceFunc, windowDuration, slideDuration, partitioner)
}
+ /**
+ * Creates a new DStream by counting the number of values for each key over a window.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions Number of partitions of each RDD in the new DStream.
+ */
def countByKeyAndWindow(
windowDuration: Duration,
slideDuration: Duration,
@@ -179,17 +368,30 @@ extends Serializable {
)
}
- // TODO:
- //
- //
- //
- //
+ /**
+ * Creates a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of the key from
+ * `this` DStream. Hash partitioning is used to generate the RDDs with Spark's default
+ * number of partitions.
+ * @param updateFunc State update function. If `this` function returns None, then
+ * corresponding state key-value pair will be eliminated.
+ * @tparam S State type
+ */
def updateStateByKey[S <: AnyRef : ClassManifest](
updateFunc: (Seq[V], Option[S]) => Option[S]
): DStream[(K, S)] = {
updateStateByKey(updateFunc, defaultPartitioner())
}
+ /**
+ * Creates a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of the key from
+ * `this` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * @param updateFunc State update function. If `this` function returns None, then
+ * corresponding state key-value pair will be eliminated.
+ * @param numPartitions Number of partitions of each RDD in the new DStream.
+ * @tparam S State type
+ */
def updateStateByKey[S <: AnyRef : ClassManifest](
updateFunc: (Seq[V], Option[S]) => Option[S],
numPartitions: Int
@@ -197,6 +399,15 @@ extends Serializable {
updateStateByKey(updateFunc, defaultPartitioner(numPartitions))
}
+ /**
+ * Creates a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of the key from
+ * `this` DStream. [[spark.Partitioner]] is used to control the partitioning of each RDD.
+ * @param updateFunc State update function. If `this` function returns None, then
+ * corresponding state key-value pair will be eliminated.
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @tparam S State type
+ */
def updateStateByKey[S <: AnyRef : ClassManifest](
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner
@@ -207,6 +418,19 @@ extends Serializable {
updateStateByKey(newUpdateFunc, partitioner, true)
}
+ /**
+ * Creates a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of the key from
+ * `this` DStream. [[spark.Partitioner]] is used to control the partitioning of each RDD.
+ * @param updateFunc State update function. If `this` function returns None, then
+ * corresponding state key-value pair will be eliminated. Note, that
+ * this function may generate a different a tuple with a different key
+ * than the input key. It is up to the developer to decide whether to
+ * remember the partitioner despite the key being changed.
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
+ * @tparam S State type
+ */
def updateStateByKey[S <: AnyRef : ClassManifest](
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
@@ -226,10 +450,24 @@ extends Serializable {
new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc)
}
+ /**
+ * Cogroups `this` DStream with `other` DStream. Each RDD of the new DStream will
+ * be generated by cogrouping RDDs from`this`and `other` DStreams. Therefore, for
+ * each key k in corresponding RDDs of `this` or `other` DStreams, the generated RDD
+ * will contains a tuple with the list of values for that key in both RDDs.
+ * HashPartitioner is used to partition each generated RDD into default number of partitions.
+ */
def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
cogroup(other, defaultPartitioner())
}
+ /**
+ * Cogroups `this` DStream with `other` DStream. Each RDD of the new DStream will
+ * be generated by cogrouping RDDs from`this`and `other` DStreams. Therefore, for
+ * each key k in corresponding RDDs of `this` or `other` DStreams, the generated RDD
+ * will contains a tuple with the list of values for that key in both RDDs.
+ * Partitioner is used to partition each generated RDD.
+ */
def cogroup[W: ClassManifest](
other: DStream[(K, W)],
partitioner: Partitioner
@@ -249,11 +487,24 @@ extends Serializable {
}
}
+ /**
+ * Joins `this` DStream with `other` DStream. Each RDD of the new DStream will
+ * be generated by joining RDDs from `this` and `other` DStreams. HashPartitioner is used
+ * to partition each generated RDD into default number of partitions.
+ */
def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
join[W](other, defaultPartitioner())
}
- def join[W: ClassManifest](other: DStream[(K, W)], partitioner: Partitioner): DStream[(K, (V, W))] = {
+ /**
+ * Joins `this` DStream with `other` DStream, that is, each RDD of the new DStream will
+ * be generated by joining RDDs from `this` and other DStream. Uses the given
+ * Partitioner to partition each generated RDD.
+ */
+ def join[W: ClassManifest](
+ other: DStream[(K, W)],
+ partitioner: Partitioner
+ ): DStream[(K, (V, W))] = {
this.cogroup(other, partitioner)
.flatMapValues{
case (vs, ws) =>
@@ -261,6 +512,10 @@ extends Serializable {
}
}
+ /**
+ * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
+ * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
+ */
def saveAsHadoopFiles[F <: OutputFormat[K, V]](
prefix: String,
suffix: String
@@ -268,6 +523,10 @@ extends Serializable {
saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
}
+ /**
+ * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
+ * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
+ */
def saveAsHadoopFiles(
prefix: String,
suffix: String,
@@ -283,6 +542,10 @@ extends Serializable {
self.foreach(saveFunc)
}
+ /**
+ * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
+ * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ */
def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](
prefix: String,
suffix: String
@@ -290,6 +553,10 @@ extends Serializable {
saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
}
+ /**
+ * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
+ * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ */
def saveAsNewAPIHadoopFiles(
prefix: String,
suffix: String,
diff --git a/streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala
index f31ae39a16..03749d4a94 100644
--- a/streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala
+++ b/streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala
@@ -81,7 +81,7 @@ object RawTextHelper {
* before real workload starts.
*/
def warmUp(sc: SparkContext) {
- for(i <- 0 to 4) {
+ for(i <- 0 to 1) {
sc.parallelize(1 to 200000, 1000)
.map(_ % 1331).map(_.toString)
.mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10)