diff options
author | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2012-09-28 23:44:19 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2012-09-28 23:44:19 -0700 |
commit | 37c199bbb098c68efecb4f8bd10b5cb8dfd9da3b (patch) | |
tree | b5cc28279860bbe82667dcd7e73deba2ff36f60b | |
parent | 9f6efbf06a65953c4fcabd439124d71d50c5df6e (diff) | |
download | spark-37c199bbb098c68efecb4f8bd10b5cb8dfd9da3b.tar.gz spark-37c199bbb098c68efecb4f8bd10b5cb8dfd9da3b.tar.bz2 spark-37c199bbb098c68efecb4f8bd10b5cb8dfd9da3b.zip |
Allow controlling number of splits in distinct().
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 3 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/java/JavaDoubleRDD.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/java/JavaPairRDD.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/java/JavaRDD.scala | 2 | ||||
-rw-r--r-- | docs/scala-programming-guide.md | 4 |
5 files changed, 12 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 6883fb70f9..3cf2ff5ea4 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -168,7 +168,8 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f)) - def distinct(): RDD[T] = map(x => (x, "")).reduceByKey((x, y) => x).map(_._1) + def distinct(numSplits: Int = splits.size): RDD[T] = + map(x => (x, "")).reduceByKey((x, y) => x, numSplits).map(_._1) def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] = new SampledRDD(this, withReplacement, fraction, seed) diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala index 7c0b17c45e..9731bb4eac 100644 --- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala @@ -33,6 +33,8 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav def distinct(): JavaDoubleRDD = fromRDD(srdd.distinct()) + def distinct(numSplits: Int): JavaDoubleRDD = fromRDD(srdd.distinct(numSplits)) + def filter(f: JFunction[Double, java.lang.Boolean]): JavaDoubleRDD = fromRDD(srdd.filter(x => f(x).booleanValue())) diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala index c28a13b061..84ec386ce4 100644 --- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala @@ -40,6 +40,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif def distinct(): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct()) + def distinct(numSplits: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct(numSplits)) + def filter(f: Function[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue())) diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala index 541aa1e60b..b3e1977bcb 100644 --- a/core/src/main/scala/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaRDD.scala @@ -19,6 +19,8 @@ JavaRDDLike[T, JavaRDD[T]] { def distinct(): JavaRDD[T] = wrapRDD(rdd.distinct()) + def distinct(numSplits: Int): JavaRDD[T] = wrapRDD(rdd.distinct(numSplits)) + def filter(f: JFunction[T, java.lang.Boolean]): JavaRDD[T] = wrapRDD(rdd.filter((x => f(x).booleanValue()))) diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md index a370bf3ddc..db761d7df1 100644 --- a/docs/scala-programming-guide.md +++ b/docs/scala-programming-guide.md @@ -148,6 +148,10 @@ The following tables list the transformations and actions currently supported (s <td> Return a new dataset that contains the union of the elements in the source dataset and the argument. </td> </tr> <tr> + <td> <b>distinct</b>([<i>numTasks</i>])) </td> + <td> Return a new dataset that contains the distinct elements of the source dataset.</td> +</tr> +<tr> <td> <b>groupByKey</b>([<i>numTasks</i>]) </td> <td> When called on a dataset of (K, V) pairs, returns a dataset of (K, Seq[V]) pairs. <br /> <b>Note:</b> By default, this uses only 8 parallel tasks to do the grouping. You can pass an optional <code>numTasks</code> argument to set a different number of tasks. |