aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authoreklavya <sr.eklavya@gmail.com>2014-01-13 17:56:10 +0530
committereklavya <sr.eklavya@gmail.com>2014-01-13 17:56:10 +0530
commitdbadc6b994ff54f86b726c71fa08837a6b1e7238 (patch)
tree7c4b5fd05975057ce3fad894a9fb395b870d6214 /core
parentaae8a014259b0ae71afd7052c8e22797cfebc82e (diff)
downloadspark-dbadc6b994ff54f86b726c71fa08837a6b1e7238.tar.gz
spark-dbadc6b994ff54f86b726c71fa08837a6b1e7238.tar.bz2
spark-dbadc6b994ff54f86b726c71fa08837a6b1e7238.zip
Added mapPartitions method to JavaRDD.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala13
1 files changed, 12 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index 6c91edaf5c..568ae1575b 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -21,8 +21,10 @@ import scala.reflect.ClassTag
import org.apache.spark._
import org.apache.spark.rdd.RDD
-import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.api.java.function.{Function => JFunction, FlatMapFunction => JFMap}
import org.apache.spark.storage.StorageLevel
+import java.util.{Iterator => JIterator}
+import scala.collection.JavaConversions._
class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) extends
JavaRDDLike[T, JavaRDD[T]] {
@@ -138,6 +140,15 @@ JavaRDDLike[T, JavaRDD[T]] {
def setGenerator(_generator: String) = {
rdd.generator = _generator
}
+
+ /**
+ * Return a new RDD by applying a function to each partition of this RDD.
+ */
+ def mapPartitions[U: ClassTag](
+ f: JFMap[JIterator[T], U], preservesPartitioning: Boolean = false): JavaRDD[U] = {
+ rdd.mapPartitions[U]((x => f(asJavaIterator(x)).iterator), preservesPartitioning)
+ }
+
}
object JavaRDD {