aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSaurabh Rawat <sr.eklavya@gmail.com>2014-01-14 14:19:02 +0530
committerSaurabh Rawat <sr.eklavya@gmail.com>2014-01-14 14:19:02 +0530
commit1442cd5d5099de71747b1cccf463b94fdedcda1f (patch)
tree819137ed5daa67838d76cbc8e11e1c862e2e3283
parente92297337387b435c9e46f56aa1a403b78647afe (diff)
downloadspark-1442cd5d5099de71747b1cccf463b94fdedcda1f.tar.gz
spark-1442cd5d5099de71747b1cccf463b94fdedcda1f.tar.bz2
spark-1442cd5d5099de71747b1cccf463b94fdedcda1f.zip
Modifications as suggested in PR feedback-
- more variants of mapPartitions added to JavaRDDLike - move setGenerator to JavaRDDLike - clean up
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala22
2 files changed, 23 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index e687bbdd99..7d48ce01cf 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -21,10 +21,8 @@ import scala.reflect.ClassTag
import org.apache.spark._
import org.apache.spark.rdd.RDD
-import org.apache.spark.api.java.function.{Function => JFunction, FlatMapFunction => JFMap, VoidFunction}
+import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.storage.StorageLevel
-import java.util.{Iterator => JIterator}
-import scala.collection.JavaConversions._
class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) extends
JavaRDDLike[T, JavaRDD[T]] {
@@ -135,11 +133,6 @@ JavaRDDLike[T, JavaRDD[T]] {
rdd.setName(name)
this
}
-
- /** Reset generator*/
- def setGenerator(_generator: String) = {
- rdd.setGenerator(_generator)
- }
}
object JavaRDD {
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index eb8e34e240..808c907d37 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -157,6 +157,23 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
}
/**
+ * Return a new RDD by applying a function to each partition of this RDD.
+ */
+ def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]], preservesPartitioning: Boolean): JavaDoubleRDD = {
+ def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+ new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning).map((x: java.lang.Double) => x.doubleValue()))
+ }
+
+ /**
+ * Return a new RDD by applying a function to each partition of this RDD.
+ */
+ def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2], preservesPartitioning: Boolean):
+ JavaPairRDD[K2, V2] = {
+ def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+ JavaPairRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning))(f.keyType(), f.valueType())
+ }
+
+ /**
* Applies a function f to each partition of this RDD.
*/
def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) {
@@ -476,4 +493,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
def name(): String = rdd.name
+
+ /** Reset generator */
+ def setGenerator(_generator: String) = {
+ rdd.setGenerator(_generator)
+ }
}