aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorStephen Haberman <stephen@exigencecorp.com>2013-02-24 00:27:53 -0600
committerStephen Haberman <stephen@exigencecorp.com>2013-02-24 00:27:53 -0600
commit37c7a71f9cc574e0a17579e9cf1651daa92553c7 (patch)
tree54c23a8cd9e64d89df746e9da55fa8c6b0445123 /core/src
parentf442e7d83c93c894215427f5ef86c96d61160e0e (diff)
downloadspark-37c7a71f9cc574e0a17579e9cf1651daa92553c7.tar.gz
spark-37c7a71f9cc574e0a17579e9cf1651daa92553c7.tar.bz2
spark-37c7a71f9cc574e0a17579e9cf1651daa92553c7.zip
Add subtract to JavaRDD, JavaDoubleRDD, and JavaPairRDD.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/api/java/JavaDoubleRDD.scala23
-rw-r--r--core/src/main/scala/spark/api/java/JavaPairRDD.scala21
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDD.scala20
3 files changed, 63 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
index da3cb2cd31..ba00b6a844 100644
--- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
@@ -6,8 +6,8 @@ import spark.api.java.function.{Function => JFunction}
import spark.util.StatCounter
import spark.partial.{BoundedDouble, PartialResult}
import spark.storage.StorageLevel
-
import java.lang.Double
+import spark.Partitioner
class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, JavaDoubleRDD] {
@@ -58,6 +58,27 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
def coalesce(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.coalesce(numPartitions))
/**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ *
+ * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
+ * RDD will be <= us.
+ */
+ def subtract(other: JavaDoubleRDD): JavaDoubleRDD =
+ fromRDD(srdd.subtract(other))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: JavaDoubleRDD, numPartitions: Int): JavaDoubleRDD =
+ fromRDD(srdd.subtract(other, numPartitions))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: JavaDoubleRDD, p: Partitioner): JavaDoubleRDD =
+ fromRDD(srdd.subtract(other, p))
+
+ /**
* Return a sampled subset of this RDD.
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaDoubleRDD =
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index c41207773e..cfbdda88c0 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -182,6 +182,27 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions)))
/**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ *
+ * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
+ * RDD will be <= us.
+ */
+ def subtract(other: JavaPairRDD[K, V]): JavaPairRDD[K, V] =
+ fromRDD(rdd.subtract(other))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: JavaPairRDD[K, V], numPartitions: Int): JavaPairRDD[K, V] =
+ fromRDD(rdd.subtract(other, numPartitions))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: JavaPairRDD[K, V], p: Partitioner): JavaPairRDD[K, V] =
+ fromRDD(rdd.subtract(other, p))
+
+ /**
* Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine`
* is true, Spark will group values of the same key together on the map side before the
* repartitioning, to only send each key over the network once. If a large number of
diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala
index 3ccd6f055e..3016888898 100644
--- a/core/src/main/scala/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDD.scala
@@ -55,6 +55,26 @@ JavaRDDLike[T, JavaRDD[T]] {
*/
def union(other: JavaRDD[T]): JavaRDD[T] = wrapRDD(rdd.union(other.rdd))
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ *
+ * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
+ * RDD will be <= us.
+ */
+ def subtract(other: JavaRDD[T]): JavaRDD[T] = wrapRDD(rdd.subtract(other))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: JavaRDD[T], numPartitions: Int): JavaRDD[T] =
+ wrapRDD(rdd.subtract(other, numPartitions))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] =
+ wrapRDD(rdd.subtract(other, p))
+
}
object JavaRDD {