aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorHolden Karau <holden@pigscanfly.ca>2013-09-14 15:53:42 -0700
committerHolden Karau <holden@pigscanfly.ca>2013-09-14 15:53:42 -0700
commitbfcddf4700023f53d5eed92ef8ef75c072af3ced (patch)
treee99b10ddd860f4b550a720eb4a54e89dec472f3b /core
parent74f710f6cda31c1489e8f0cc130021ce4e9e60c6 (diff)
downloadspark-bfcddf4700023f53d5eed92ef8ef75c072af3ced.tar.gz
spark-bfcddf4700023f53d5eed92ef8ef75c072af3ced.tar.bz2
spark-bfcddf4700023f53d5eed92ef8ef75c072af3ced.zip
Make mapPartitionsWithIndex work with JavaRDD's
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala5
1 files changed, 3 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 9ad175ec19..264c4bc3de 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -71,9 +71,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*/
- def mapPartitionsWithIndex(f: JFunction2[Int, T, R],
+ def mapPartitionsWithIndex[R: ClassManifest](f: JFunction2[Int, java.util.Iterator[T], java.util.Iterator[R]],
preservesPartitioning: Boolean = false): JavaRDD[R] =
- new JavaRDD(MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning))
+ new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))),
+ preservesPartitioning))
/**
* Return a new RDD by applying a function to all elements of this RDD.