aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-09-29 20:22:31 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-09-29 20:22:31 -0700
commitebd52347b550cf608a5876f4ed95007023bc132a (patch)
tree9b93f999e54cd5815aa828849a99f04e4e56e0c7 /core/src/main
parent9b326d01e9a9ec4a4a9abf293cf039c07d426293 (diff)
parent2f11e3c285499880b9d800fdd65ea9ad1c82b4af (diff)
downloadspark-ebd52347b550cf608a5876f4ed95007023bc132a.tar.gz
spark-ebd52347b550cf608a5876f4ed95007023bc132a.tar.bz2
spark-ebd52347b550cf608a5876f4ed95007023bc132a.zip
Merge branch 'dev' of github.com:mesos/spark into dev
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/RDD.scala3
-rw-r--r--core/src/main/scala/spark/api/java/JavaDoubleRDD.scala2
-rw-r--r--core/src/main/scala/spark/api/java/JavaPairRDD.scala2
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDD.scala2
4 files changed, 8 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index b5f67d1253..784f25086e 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -136,7 +136,8 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
- def distinct(): RDD[T] = map(x => (x, "")).reduceByKey((x, y) => x).map(_._1)
+ def distinct(numSplits: Int = splits.size): RDD[T] =
+ map(x => (x, null)).reduceByKey((x, y) => x, numSplits).map(_._1)
def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] =
new SampledRDD(this, withReplacement, fraction, seed)
diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
index 7c0b17c45e..9731bb4eac 100644
--- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
@@ -33,6 +33,8 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
def distinct(): JavaDoubleRDD = fromRDD(srdd.distinct())
+ def distinct(numSplits: Int): JavaDoubleRDD = fromRDD(srdd.distinct(numSplits))
+
def filter(f: JFunction[Double, java.lang.Boolean]): JavaDoubleRDD =
fromRDD(srdd.filter(x => f(x).booleanValue()))
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index c28a13b061..84ec386ce4 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -40,6 +40,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
def distinct(): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct())
+ def distinct(numSplits: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct(numSplits))
+
def filter(f: Function[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue()))
diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala
index 541aa1e60b..b3e1977bcb 100644
--- a/core/src/main/scala/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDD.scala
@@ -19,6 +19,8 @@ JavaRDDLike[T, JavaRDD[T]] {
def distinct(): JavaRDD[T] = wrapRDD(rdd.distinct())
+ def distinct(numSplits: Int): JavaRDD[T] = wrapRDD(rdd.distinct(numSplits))
+
def filter(f: JFunction[T, java.lang.Boolean]): JavaRDD[T] =
wrapRDD(rdd.filter((x => f(x).booleanValue())))