diff options
author | Josh Rosen <rosenville@gmail.com> | 2012-07-22 15:10:01 -0700 |
---|---|---|
committer | Josh Rosen <rosenville@gmail.com> | 2012-07-22 15:10:01 -0700 |
commit | e23938c3beaefe7d373c1f82d7b475352bd9c9bb (patch) | |
tree | afd706b2436cdcaba49ec27ab27d858b0dbb8bd1 | |
parent | 460da878fcefc861e77b40719c6329cc2a960de8 (diff) | |
download | spark-e23938c3beaefe7d373c1f82d7b475352bd9c9bb.tar.gz spark-e23938c3beaefe7d373c1f82d7b475352bd9c9bb.tar.bz2 spark-e23938c3beaefe7d373c1f82d7b475352bd9c9bb.zip |
Use mapValues() in JavaPairRDD.cogroupResultToJava().
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 8 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/java/JavaPairRDD.scala | 23 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/java/JavaRDDLike.scala | 5 |
3 files changed, 14 insertions, 22 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index bf94773214..429e9c936f 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -361,14 +361,6 @@ class MappedRDD[U: ClassManifest, T: ClassManifest]( override def compute(split: Split) = prev.iterator(split).map(f) } -class PartitioningPreservingMappedRDD[U: ClassManifest, T: ClassManifest]( - prev: RDD[T], - f: T => U) - extends MappedRDD[U, T](prev, f) { - - override val partitioner = prev.partitioner -} - class FlatMappedRDD[U: ClassManifest, T: ClassManifest]( prev: RDD[T], f: T => TraversableOnce[U]) diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala index dbd6ce526c..3db7dd8b8f 100644 --- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala @@ -253,17 +253,18 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif object JavaPairRDD { def groupByResultToJava[K, T](rdd: RDD[(K, Seq[T])])(implicit kcm: ClassManifest[K], vcm: ClassManifest[T]): RDD[(K, JList[T])] = - new PartitioningPreservingMappedRDD(rdd, (x: (K, Seq[T])) => (x._1, seqAsJavaList(x._2))) - - def cogroupResultToJava[W, K, V](rdd: RDD[(K, (Seq[V], Seq[W]))]) - : RDD[(K, (JList[V], JList[W]))] = new PartitioningPreservingMappedRDD(rdd, - (x: (K, (Seq[V], Seq[W]))) => (x._1, (seqAsJavaList(x._2._1), seqAsJavaList(x._2._2)))) - - def cogroupResult2ToJava[W1, W2, K, V](rdd: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))]) - : RDD[(K, (JList[V], JList[W1], JList[W2]))] = new PartitioningPreservingMappedRDD(rdd, - (x: (K, (Seq[V], Seq[W1], Seq[W2]))) => (x._1, (seqAsJavaList(x._2._1), - seqAsJavaList(x._2._2), - seqAsJavaList(x._2._3)))) + rddToPairRDDFunctions(rdd).mapValues(seqAsJavaList _) + + def cogroupResultToJava[W, K, V](rdd: RDD[(K, (Seq[V], Seq[W]))])(implicit kcm: ClassManifest[K], + vcm: ClassManifest[V]): RDD[(K, (JList[V], JList[W]))] = rddToPairRDDFunctions(rdd).mapValues((x: (Seq[V], + Seq[W])) => (seqAsJavaList(x._1), seqAsJavaList(x._2))) + + def cogroupResult2ToJava[W1, W2, K, V](rdd: RDD[(K, (Seq[V], Seq[W1], + Seq[W2]))])(implicit kcm: ClassManifest[K]) : RDD[(K, (JList[V], JList[W1], + JList[W2]))] = rddToPairRDDFunctions(rdd).mapValues( + (x: (Seq[V], Seq[W1], Seq[W2])) => (seqAsJavaList(x._1), + seqAsJavaList(x._2), + seqAsJavaList(x._3))) def fromRDD[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd) diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala index 9f6674df56..5e0cb15042 100644 --- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala @@ -1,6 +1,6 @@ package spark.api.java -import spark.{PartitioningPreservingMappedRDD, Split, RDD} +import spark.{Split, RDD} import spark.api.java.JavaPairRDD._ import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _} import spark.partial.{PartialResult, BoundedDouble} @@ -48,8 +48,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def flatMap(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = { import scala.collection.JavaConverters._ def fn = (x: T) => f.apply(x).asScala - new JavaDoubleRDD(new PartitioningPreservingMappedRDD(rdd.flatMap(fn), - ((x: java.lang.Double) => x.doubleValue()))) + new JavaDoubleRDD(rdd.flatMap(fn).map((x: java.lang.Double) => x.doubleValue())) } def flatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairRDD[K, V] = { |