aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@apache.org>2014-01-23 11:14:15 -0800
committerJosh Rosen <joshrosen@apache.org>2014-01-23 11:14:15 -0800
commitfad6aacfb0a2ac3766417e4a0e3933277ce99d98 (patch)
tree809aacfe14da5a2b69f6c7546a29623973bb8e67 /core
parenta2b47dae66a437f02bc053e9bde5c1472cff0fc6 (diff)
parent60e7457266eef18f562ef5cb93d62db1af821fdf (diff)
downloadspark-fad6aacfb0a2ac3766417e4a0e3933277ce99d98.tar.gz
spark-fad6aacfb0a2ac3766417e4a0e3933277ce99d98.tar.bz2
spark-fad6aacfb0a2ac3766417e4a0e3933277ce99d98.zip
Merge pull request #406 from eklavya/master
Extending Java API coverage Hi, I have added three new methods to JavaRDD. Please review and merge.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala40
1 files changed, 39 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index ebbbbd8806..9680c6f3e1 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -134,13 +134,21 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
+ def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U], preservesPartitioning: Boolean): JavaRDD[U] = {
+ def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+ JavaRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning)(f.elementType()))(f.elementType())
+ }
+
+ /**
+ * Return a new RDD by applying a function to each partition of this RDD.
+ */
def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: java.lang.Double) => x.doubleValue()))
}
/**
- * Return a new RDD by applying a function to each partition of this RDD.
+ * Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
JavaPairRDD[K2, V2] = {
@@ -148,6 +156,31 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(f.keyType(), f.valueType())
}
+
+ /**
+ * Return a new RDD by applying a function to each partition of this RDD.
+ */
+ def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]], preservesPartitioning: Boolean): JavaDoubleRDD = {
+ def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+ new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning).map((x: java.lang.Double) => x.doubleValue()))
+ }
+
+ /**
+ * Return a new RDD by applying a function to each partition of this RDD.
+ */
+ def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2], preservesPartitioning: Boolean):
+ JavaPairRDD[K2, V2] = {
+ def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+ JavaPairRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning))(f.keyType(), f.valueType())
+ }
+
+ /**
+ * Applies a function f to each partition of this RDD.
+ */
+ def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) {
+ rdd.foreachPartition((x => f(asJavaIterator(x))))
+ }
+
/**
* Return an RDD created by coalescing all elements within each partition into an array.
*/
@@ -461,4 +494,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
def name(): String = rdd.name
+
+ /** Reset generator */
+ def setGenerator(_generator: String) = {
+ rdd.setGenerator(_generator)
+ }
}