aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMark Hamstra <markhamstra@gmail.com>2013-03-16 13:31:01 -0700
committerMark Hamstra <markhamstra@gmail.com>2013-03-16 13:31:01 -0700
commitca9f81e8fcf46bceb032f3bd0efb9f6ed2c25656 (patch)
treed9b81d3c6c059c896e48f6813aed0a11e3f0d683 /core
parent1fb192ef404bb7aae126e30ad8a7663e7df6e618 (diff)
downloadspark-ca9f81e8fcf46bceb032f3bd0efb9f6ed2c25656.tar.gz
spark-ca9f81e8fcf46bceb032f3bd0efb9f6ed2c25656.tar.bz2
spark-ca9f81e8fcf46bceb032f3bd0efb9f6ed2c25656.zip
refactor foldByKey to use combineByKey
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala14
-rw-r--r--core/src/main/scala/spark/api/java/JavaPairRDD.scala12
2 files changed, 19 insertions, 7 deletions
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index ffa6336ceb..e1dde1b497 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -89,22 +89,28 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
/**
- * Merge the values for each key using an associative function and a neutral "zero value".
+ * Merge the values for each key using an associative function and a neutral "zero value" which may
+ * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
+ * list concatenation, 0 for addition, or 1 for multiplication.).
*/
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = {
- groupByKey(partitioner).mapValues(seq => seq.fold[V](zeroValue)(func))
+ combineByKey[V]({v: V => func(zeroValue, v)}, func, func, partitioner)
}
/**
- * Merge the values for each key using an associative function and a neutral "zero value".
+ * Merge the values for each key using an associative function and a neutral "zero value" which may
+ * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
+ * list concatenation, 0 for addition, or 1 for multiplication.).
*/
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = {
foldByKey(zeroValue, new HashPartitioner(numPartitions))(func)
}
/**
- * Merge the values for each key using an associative function and a neutral "zero value".
+ * Merge the values for each key using an associative function and a neutral "zero value" which may
+ * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
+ * list concatenation, 0 for addition, or 1 for multiplication.).
*/
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = {
foldByKey(zeroValue, defaultPartitioner(self))(func)
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index 1dc73a3584..49aaabf835 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -161,19 +161,25 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
rdd.countByKeyApprox(timeout, confidence).map(mapAsJavaMap)
/**
- * Merge the values for each key using an associative function and a neutral "zero value".
+ * Merge the values for each key using an associative function and a neutral "zero value" which may
+ * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
+ * list concatenation, 0 for addition, or 1 for multiplication.).
*/
def foldByKey(zeroValue: V, partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
fromRDD(rdd.foldByKey(zeroValue, partitioner)(func))
/**
- * Merge the values for each key using an associative function and a neutral "zero value".
+ * Merge the values for each key using an associative function and a neutral "zero value" which may
+ * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
+ * list concatenation, 0 for addition, or 1 for multiplication.).
*/
def foldByKey(zeroValue: V, numPartitions: Int, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
fromRDD(rdd.foldByKey(zeroValue, numPartitions)(func))
/**
- * Merge the values for each key using an associative function and a neutral "zero value".
+ * Merge the values for each key using an associative function and a neutral "zero value" which may
+ * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
+ * list concatenation, 0 for addition, or 1 for multiplication.).
*/
def foldByKey(zeroValue: V, func: JFunction2[V, V, V]): JavaPairRDD[K, V] =
fromRDD(rdd.foldByKey(zeroValue)(func))