diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-01-08 22:24:03 -0800 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-01-08 22:24:03 -0800 |
commit | d0bae072eac528a343617f0a2eaa85d05d8040a9 (patch) | |
tree | aa87fc5669f832731a8f44783242e3edc453bd65 | |
parent | a37adfa67bac51b2630c6e1673f8607a87273402 (diff) | |
parent | 8ac0f35be42765fcd6f02dcf0f070f2ef2377a85 (diff) | |
download | spark-d0bae072eac528a343617f0a2eaa85d05d8040a9.tar.gz spark-d0bae072eac528a343617f0a2eaa85d05d8040a9.tar.bz2 spark-d0bae072eac528a343617f0a2eaa85d05d8040a9.zip |
Merge pull request #353 from stephenh/tupleBy
Add RDD.tupleBy.
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 7 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/java/JavaRDDLike.scala | 8 | ||||
-rw-r--r-- | core/src/test/scala/spark/JavaAPISuite.java | 12 | ||||
-rw-r--r-- | core/src/test/scala/spark/RDDSuite.scala | 1 |
4 files changed, 28 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 5163c80134..3b9ced1946 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -517,6 +517,13 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial .saveAsSequenceFile(path) } + /** + * Creates tuples of the elements in this RDD by applying `f`. + */ + def keyBy[K](f: T => K): RDD[(K, T)] = { + map(x => (f(x), x)) + } + /** A private method for tests, to look at the contents of each partition */ private[spark] def collectPartitions(): Array[Array[T]] = { sc.runJob(this, (iter: Iterator[T]) => iter.toArray) diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala index 81d3a94466..d15f6dd02f 100644 --- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala @@ -298,4 +298,12 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Save this RDD as a SequenceFile of serialized objects. */ def saveAsObjectFile(path: String) = rdd.saveAsObjectFile(path) + + /** + * Creates tuples of the elements in this RDD by applying `f`. + */ + def keyBy[K](f: JFunction[T, K]): JavaPairRDD[K, T] = { + implicit val kcm: ClassManifest[K] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] + JavaPairRDD.fromRDD(rdd.keyBy(f)) + } } diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java index 0817d1146c..c61913fc82 100644 --- a/core/src/test/scala/spark/JavaAPISuite.java +++ b/core/src/test/scala/spark/JavaAPISuite.java @@ -629,4 +629,16 @@ public class JavaAPISuite implements Serializable { floatAccum.setValue(5.0f); Assert.assertEquals((Float) 5.0f, floatAccum.value()); } + + @Test + public void keyBy() { + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2)); + List<Tuple2<String, Integer>> s = rdd.keyBy(new Function<Integer, String>() { + public String call(Integer t) throws Exception { + return t.toString(); + } + }).collect(); + Assert.assertEquals(new Tuple2<String, Integer>("1", 1), s.get(0)); + Assert.assertEquals(new Tuple2<String, Integer>("2", 2), s.get(1)); + } } diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 872b06fd08..d74e9786c3 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -36,6 +36,7 @@ class RDDSuite extends FunSuite with BeforeAndAfter { assert(nums.union(nums).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4)) assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4))) assert(nums.collect({ case i if i >= 3 => i.toString }).collect().toList === List("3", "4")) + assert(nums.keyBy(_.toString).collect().toList === List(("1", 1), ("2", 2), ("3", 3), ("4", 4))) val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _))) assert(partitionSums.collect().toList === List(3, 7)) |