aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authoreklavya <sr.eklavya@gmail.com>2014-01-23 17:40:36 +0530
committereklavya <sr.eklavya@gmail.com>2014-01-23 17:40:36 +0530
commit60e7457266eef18f562ef5cb93d62db1af821fdf (patch)
tree791944bd891c336c5bd0c9b85c433c3cfa9384e7 /core
parent1442cd5d5099de71747b1cccf463b94fdedcda1f (diff)
downloadspark-60e7457266eef18f562ef5cb93d62db1af821fdf.tar.gz
spark-60e7457266eef18f562ef5cb93d62db1af821fdf.tar.bz2
spark-60e7457266eef18f562ef5cb93d62db1af821fdf.zip
fixed ClassTag in mapPartitions
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala17
1 files changed, 9 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 808c907d37..9680c6f3e1 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -134,13 +134,21 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
+ def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U], preservesPartitioning: Boolean): JavaRDD[U] = {
+ def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+ JavaRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning)(f.elementType()))(f.elementType())
+ }
+
+ /**
+ * Return a new RDD by applying a function to each partition of this RDD.
+ */
def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: java.lang.Double) => x.doubleValue()))
}
/**
- * Return a new RDD by applying a function to each partition of this RDD.
+ * Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
JavaPairRDD[K2, V2] = {
@@ -148,13 +156,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(f.keyType(), f.valueType())
}
- /**
- * Return a new RDD by applying a function to each partition of this RDD.
- */
- def mapPartitions[U](
- f: FlatMapFunction[java.util.Iterator[T], U], preservesPartitioning: Boolean): JavaRDD[U] = {
- rdd.mapPartitions[U]((x => f(asJavaIterator(x)).iterator), preservesPartitioning)
- }
/**
* Return a new RDD by applying a function to each partition of this RDD.