aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-09-28 23:57:24 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-09-28 23:57:24 -0700
commit2f11e3c285499880b9d800fdd65ea9ad1c82b4af (patch)
treef6c36434b4a7517c1e5bf9eb64de0aa36b6ff87c /core/src/main
parent56dcad593641ef8de211fcb4303574a9f4509f89 (diff)
parent8654165e692d881c38e7d7e342974ba766452741 (diff)
downloadspark-2f11e3c285499880b9d800fdd65ea9ad1c82b4af.tar.gz
spark-2f11e3c285499880b9d800fdd65ea9ad1c82b4af.tar.bz2
spark-2f11e3c285499880b9d800fdd65ea9ad1c82b4af.zip
Merge pull request #227 from JoshRosen/fix/distinct_numsplits
Allow controlling number of splits in distinct().
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/RDD.scala3
-rw-r--r--core/src/main/scala/spark/api/java/JavaDoubleRDD.scala2
-rw-r--r--core/src/main/scala/spark/api/java/JavaPairRDD.scala2
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDD.scala2
4 files changed, 8 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index b5f67d1253..784f25086e 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -136,7 +136,8 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
- def distinct(): RDD[T] = map(x => (x, "")).reduceByKey((x, y) => x).map(_._1)
+ def distinct(numSplits: Int = splits.size): RDD[T] =
+ map(x => (x, null)).reduceByKey((x, y) => x, numSplits).map(_._1)
def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] =
new SampledRDD(this, withReplacement, fraction, seed)
diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
index 7c0b17c45e..9731bb4eac 100644
--- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
@@ -33,6 +33,8 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
def distinct(): JavaDoubleRDD = fromRDD(srdd.distinct())
+ def distinct(numSplits: Int): JavaDoubleRDD = fromRDD(srdd.distinct(numSplits))
+
def filter(f: JFunction[Double, java.lang.Boolean]): JavaDoubleRDD =
fromRDD(srdd.filter(x => f(x).booleanValue()))
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index c28a13b061..84ec386ce4 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -40,6 +40,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
def distinct(): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct())
+ def distinct(numSplits: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct(numSplits))
+
def filter(f: Function[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue()))
diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala
index 541aa1e60b..b3e1977bcb 100644
--- a/core/src/main/scala/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDD.scala
@@ -19,6 +19,8 @@ JavaRDDLike[T, JavaRDD[T]] {
def distinct(): JavaRDD[T] = wrapRDD(rdd.distinct())
+ def distinct(numSplits: Int): JavaRDD[T] = wrapRDD(rdd.distinct(numSplits))
+
def filter(f: JFunction[T, java.lang.Boolean]): JavaRDD[T] =
wrapRDD(rdd.filter((x => f(x).booleanValue())))