aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2012-09-28 23:44:19 -0700
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2012-09-28 23:44:19 -0700
commit37c199bbb098c68efecb4f8bd10b5cb8dfd9da3b (patch)
treeb5cc28279860bbe82667dcd7e73deba2ff36f60b
parent9f6efbf06a65953c4fcabd439124d71d50c5df6e (diff)
downloadspark-37c199bbb098c68efecb4f8bd10b5cb8dfd9da3b.tar.gz
spark-37c199bbb098c68efecb4f8bd10b5cb8dfd9da3b.tar.bz2
spark-37c199bbb098c68efecb4f8bd10b5cb8dfd9da3b.zip
Allow controlling number of splits in distinct().
-rw-r--r--core/src/main/scala/spark/RDD.scala3
-rw-r--r--core/src/main/scala/spark/api/java/JavaDoubleRDD.scala2
-rw-r--r--core/src/main/scala/spark/api/java/JavaPairRDD.scala2
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDD.scala2
-rw-r--r--docs/scala-programming-guide.md4
5 files changed, 12 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 6883fb70f9..3cf2ff5ea4 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -168,7 +168,8 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
- def distinct(): RDD[T] = map(x => (x, "")).reduceByKey((x, y) => x).map(_._1)
+ def distinct(numSplits: Int = splits.size): RDD[T] =
+ map(x => (x, "")).reduceByKey((x, y) => x, numSplits).map(_._1)
def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] =
new SampledRDD(this, withReplacement, fraction, seed)
diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
index 7c0b17c45e..9731bb4eac 100644
--- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
@@ -33,6 +33,8 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
def distinct(): JavaDoubleRDD = fromRDD(srdd.distinct())
+ def distinct(numSplits: Int): JavaDoubleRDD = fromRDD(srdd.distinct(numSplits))
+
def filter(f: JFunction[Double, java.lang.Boolean]): JavaDoubleRDD =
fromRDD(srdd.filter(x => f(x).booleanValue()))
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index c28a13b061..84ec386ce4 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -40,6 +40,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
def distinct(): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct())
+ def distinct(numSplits: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct(numSplits))
+
def filter(f: Function[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue()))
diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala
index 541aa1e60b..b3e1977bcb 100644
--- a/core/src/main/scala/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDD.scala
@@ -19,6 +19,8 @@ JavaRDDLike[T, JavaRDD[T]] {
def distinct(): JavaRDD[T] = wrapRDD(rdd.distinct())
+ def distinct(numSplits: Int): JavaRDD[T] = wrapRDD(rdd.distinct(numSplits))
+
def filter(f: JFunction[T, java.lang.Boolean]): JavaRDD[T] =
wrapRDD(rdd.filter((x => f(x).booleanValue())))
diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md
index a370bf3ddc..db761d7df1 100644
--- a/docs/scala-programming-guide.md
+++ b/docs/scala-programming-guide.md
@@ -148,6 +148,10 @@ The following tables list the transformations and actions currently supported (s
<td> Return a new dataset that contains the union of the elements in the source dataset and the argument. </td>
</tr>
<tr>
+ <td> <b>distinct</b>([<i>numTasks</i>])) </td>
+ <td> Return a new dataset that contains the distinct elements of the source dataset.</td>
+</tr>
+<tr>
<td> <b>groupByKey</b>([<i>numTasks</i>]) </td>
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, Seq[V]) pairs. <br />
<b>Note:</b> By default, this uses only 8 parallel tasks to do the grouping. You can pass an optional <code>numTasks</code> argument to set a different number of tasks.