diff options
author | Saurabh Rawat <sr.eklavya@gmail.com> | 2014-01-14 14:19:02 +0530 |
---|---|---|
committer | Saurabh Rawat <sr.eklavya@gmail.com> | 2014-01-14 14:19:02 +0530 |
commit | 1442cd5d5099de71747b1cccf463b94fdedcda1f (patch) | |
tree | 819137ed5daa67838d76cbc8e11e1c862e2e3283 /core | |
parent | e92297337387b435c9e46f56aa1a403b78647afe (diff) | |
download | spark-1442cd5d5099de71747b1cccf463b94fdedcda1f.tar.gz spark-1442cd5d5099de71747b1cccf463b94fdedcda1f.tar.bz2 spark-1442cd5d5099de71747b1cccf463b94fdedcda1f.zip |
Modifications as suggested in PR feedback-
- more variants of mapPartitions added to JavaRDDLike
- move setGenerator to JavaRDDLike
- clean up
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala | 9 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala | 22 |
2 files changed, 23 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index e687bbdd99..7d48ce01cf 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -21,10 +21,8 @@ import scala.reflect.ClassTag import org.apache.spark._ import org.apache.spark.rdd.RDD -import org.apache.spark.api.java.function.{Function => JFunction, FlatMapFunction => JFMap, VoidFunction} +import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.storage.StorageLevel -import java.util.{Iterator => JIterator} -import scala.collection.JavaConversions._ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) extends JavaRDDLike[T, JavaRDD[T]] { @@ -135,11 +133,6 @@ JavaRDDLike[T, JavaRDD[T]] { rdd.setName(name) this } - - /** Reset generator*/ - def setGenerator(_generator: String) = { - rdd.setGenerator(_generator) - } } object JavaRDD { diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index eb8e34e240..808c907d37 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -157,6 +157,23 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { } /** + * Return a new RDD by applying a function to each partition of this RDD. + */ + def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]], preservesPartitioning: Boolean): JavaDoubleRDD = { + def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) + new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning).map((x: java.lang.Double) => x.doubleValue())) + } + + /** + * Return a new RDD by applying a function to each partition of this RDD. + */ + def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2], preservesPartitioning: Boolean): + JavaPairRDD[K2, V2] = { + def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) + JavaPairRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning))(f.keyType(), f.valueType()) + } + + /** * Applies a function f to each partition of this RDD. */ def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) { @@ -476,4 +493,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD) def name(): String = rdd.name + + /** Reset generator */ + def setGenerator(_generator: String) = { + rdd.setGenerator(_generator) + } } |