From fb7e21797ed618d9754545a44f8f95f75b66757a Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Fri, 19 Feb 2016 10:26:38 +0000 Subject: [SPARK-13339][DOCS] Clarify commutative / associative operator requirements for reduce, fold Clarify that reduce functions need to be commutative, and fold functions do not See https://github.com/apache/spark/pull/11091 Author: Sean Owen Closes #11217 from srowen/SPARK-13339. --- .../spark/streaming/api/java/JavaDStreamLike.scala | 6 +++--- .../spark/streaming/api/java/JavaPairDStream.scala | 18 +++++++++--------- .../org/apache/spark/streaming/dstream/DStream.scala | 4 ++-- .../spark/streaming/dstream/PairDStreamFunctions.scala | 16 ++++++++-------- 4 files changed, 22 insertions(+), 22 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index f10de485d0..9931a46d33 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -214,7 +214,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD has a single element generated by reducing all * elements in a sliding window over this DStream. - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -234,7 +234,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T /** * Return a new DStream in which each RDD has a single element generated by reducing all * elements in a sliding window over this DStream. - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -257,7 +257,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * This is more efficient than reduceByWindow without "inverse reduce" function. * However, it is applicable to only "invertible reduce functions". - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param invReduceFunc inverse reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index d718f1d6fc..aad9a12c15 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -138,8 +138,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are - * merged using the associative reduce function. Hash partitioning is used to generate the RDDs - * with Spark's default number of partitions. + * merged using the associative and commutative reduce function. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. */ def reduceByKey(func: JFunction2[V, V, V]): JavaPairDStream[K, V] = dstream.reduceByKey(func) @@ -257,7 +257,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate * the RDDs with Spark's default number of partitions. - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval */ @@ -270,7 +270,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to * generate the RDDs with Spark's default number of partitions. - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -289,7 +289,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to * generate the RDDs with `numPartitions` partitions. - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -309,7 +309,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to * `DStream.reduceByKey()`, but applies it over a sliding window. - * @param reduceFunc associative reduce function + * @param reduceFunc associative rand commutative educe function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -335,7 +335,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. * However, it is applicable to only "invertible reduce functions". * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param invReduceFunc inverse function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval @@ -360,7 +360,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. * However, it is applicable to only "invertible reduce functions". * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param invReduceFunc inverse function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval @@ -397,7 +397,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. * However, it is applicable to only "invertible reduce functions". - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param invReduceFunc inverse function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index db79eeab9c..70e1d8abde 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -791,7 +791,7 @@ abstract class DStream[T: ClassTag] ( /** * Return a new DStream in which each RDD has a single element generated by reducing all * elements in a sliding window over this DStream. - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -814,7 +814,7 @@ abstract class DStream[T: ClassTag] ( * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * This is more efficient than reduceByWindow without "inverse reduce" function. * However, it is applicable to only "invertible reduce functions". - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param invReduceFunc inverse reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index babc722709..1dcdb64e28 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -75,8 +75,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)]) /** * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are - * merged using the associative reduce function. Hash partitioning is used to generate the RDDs - * with Spark's default number of partitions. + * merged using the associative and commutative reduce function. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. */ def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = ssc.withScope { reduceByKey(reduceFunc, defaultPartitioner()) @@ -204,7 +204,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)]) * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate * the RDDs with Spark's default number of partitions. - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval */ @@ -219,7 +219,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)]) * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to * generate the RDDs with Spark's default number of partitions. - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -238,7 +238,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)]) * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to * generate the RDDs with `numPartitions` partitions. - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -259,7 +259,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)]) /** * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to * `DStream.reduceByKey()`, but applies it over a sliding window. - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -289,7 +289,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)]) * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. * However, it is applicable to only "invertible reduce functions". * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param invReduceFunc inverse reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval @@ -320,7 +320,7 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)]) * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. * However, it is applicable to only "invertible reduce functions". - * @param reduceFunc associative reduce function + * @param reduceFunc associative and commutative reduce function * @param invReduceFunc inverse reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval -- cgit v1.2.3