diff options
author | eklavya <sr.eklavya@gmail.com> | 2014-01-23 17:40:36 +0530 |
---|---|---|
committer | eklavya <sr.eklavya@gmail.com> | 2014-01-23 17:40:36 +0530 |
commit | 60e7457266eef18f562ef5cb93d62db1af821fdf (patch) | |
tree | 791944bd891c336c5bd0c9b85c433c3cfa9384e7 /core | |
parent | 1442cd5d5099de71747b1cccf463b94fdedcda1f (diff) | |
download | spark-60e7457266eef18f562ef5cb93d62db1af821fdf.tar.gz spark-60e7457266eef18f562ef5cb93d62db1af821fdf.tar.bz2 spark-60e7457266eef18f562ef5cb93d62db1af821fdf.zip |
fixed ClassTag in mapPartitions
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala | 17 |
1 files changed, 9 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 808c907d37..9680c6f3e1 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -134,13 +134,21 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Return a new RDD by applying a function to each partition of this RDD. */ + def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U], preservesPartitioning: Boolean): JavaRDD[U] = { + def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) + JavaRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning)(f.elementType()))(f.elementType()) + } + + /** + * Return a new RDD by applying a function to each partition of this RDD. + */ def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = { def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: java.lang.Double) => x.doubleValue())) } /** - * Return a new RDD by applying a function to each partition of this RDD. + * Return a new RDD by applying a function to each partition of this RDD. */ def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]): JavaPairRDD[K2, V2] = { @@ -148,13 +156,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(f.keyType(), f.valueType()) } - /** - * Return a new RDD by applying a function to each partition of this RDD. - */ - def mapPartitions[U]( - f: FlatMapFunction[java.util.Iterator[T], U], preservesPartitioning: Boolean): JavaRDD[U] = { - rdd.mapPartitions[U]((x => f(asJavaIterator(x)).iterator), preservesPartitioning) - } /** * Return a new RDD by applying a function to each partition of this RDD. |