aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/api/java/JavaRDDLike.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/api/java/JavaRDDLike.scala')
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDDLike.scala50
1 files changed, 49 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index a6555081b3..3fe2011f4c 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -1,10 +1,11 @@
package spark.api.java
-import java.util.{List => JList}
+import java.util.{List => JList, Comparator}
import scala.Tuple2
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
+import org.apache.hadoop.io.compress.CompressionCodec
import spark.{SparkContext, Partition, RDD, TaskContext}
import spark.api.java.JavaPairRDD._
import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
@@ -183,6 +184,21 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
JavaPairRDD.fromRDD(rdd.zip(other.rdd)(other.classTag))(classTag, other.classTag)
}
+ /**
+ * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by
+ * applying a function to the zipped partitions. Assumes that all the RDDs have the
+ * *same number of partitions*, but does *not* require them to have the same number
+ * of elements in each partition.
+ */
+ def zipPartitions[U, V](
+ f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V],
+ other: JavaRDDLike[U, _]): JavaRDD[V] = {
+ def fn = (x: Iterator[T], y: Iterator[U]) => asScalaIterator(
+ f.apply(asJavaIterator(x), asJavaIterator(y)).iterator())
+ JavaRDD.fromRDD(
+ rdd.zipPartitions(fn, other.rdd)(other.classTag, f.elementType()))(f.elementType())
+ }
+
// Actions (launch a job to return a value to the user program)
/**
@@ -296,6 +312,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def saveAsTextFile(path: String) = rdd.saveAsTextFile(path)
+
+ /**
+ * Save this RDD as a compressed text file, using string representations of elements.
+ */
+ def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) =
+ rdd.saveAsTextFile(path, codec)
+
/**
* Save this RDD as a SequenceFile of serialized objects.
*/
@@ -337,4 +360,29 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def toDebugString(): String = {
rdd.toDebugString
}
+
+ /**
+ * Returns the top K elements from this RDD as defined by
+ * the specified Comparator[T].
+ * @param num the number of top elements to return
+ * @param comp the comparator that defines the order
+ * @return an array of top elements
+ */
+ def top(num: Int, comp: Comparator[T]): JList[T] = {
+ import scala.collection.JavaConversions._
+ val topElems = rdd.top(num)(Ordering.comparatorToOrdering(comp))
+ val arr: java.util.Collection[T] = topElems.toSeq
+ new java.util.ArrayList(arr)
+ }
+
+ /**
+ * Returns the top K elements from this RDD using the
+ * natural ordering for T.
+ * @param num the number of top elements to return
+ * @return an array of top elements
+ */
+ def top(num: Int): JList[T] = {
+ val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]]
+ top(num, comp)
+ }
}