aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMark Hamstra <markhamstra@gmail.com>2012-12-24 02:26:11 -0800
committerMark Hamstra <markhamstra@gmail.com>2012-12-24 02:36:47 -0800
commit61be8566e24c664442780154debfea884d81f46b (patch)
tree707038b8b9e31241474c6a0a546e3c700a1c8861
parenta6bb41c6d389f1b98d5542000a7a9705ba282273 (diff)
downloadspark-61be8566e24c664442780154debfea884d81f46b.tar.gz
spark-61be8566e24c664442780154debfea884d81f46b.tar.bz2
spark-61be8566e24c664442780154debfea884d81f46b.zip
Allow distinct() to be called without parentheses when using the default number of splits.
-rw-r--r--core/src/main/scala/spark/RDD.scala4
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala12
2 files changed, 11 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index bb4c13c494..d15c6f7396 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -185,9 +185,11 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
/**
* Return a new RDD containing the distinct elements in this RDD.
*/
- def distinct(numSplits: Int = splits.size): RDD[T] =
+ def distinct(numSplits: Int): RDD[T] =
map(x => (x, null)).reduceByKey((x, y) => x, numSplits).map(_._1)
+ def distinct(): RDD[T] = distinct(splits.size)
+
/**
* Return a sampled subset of this RDD.
*/
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index b3c820ed94..08da9a1c4d 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -8,9 +8,9 @@ import spark.rdd.CoalescedRDD
import SparkContext._
class RDDSuite extends FunSuite with BeforeAndAfter {
-
+
var sc: SparkContext = _
-
+
after {
if (sc != null) {
sc.stop()
@@ -19,11 +19,15 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.master.port")
}
-
+
test("basic operations") {
sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
assert(nums.collect().toList === List(1, 2, 3, 4))
+ val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2)
+ assert(dups.distinct.count === 4)
+ assert(dups.distinct().collect === dups.distinct.collect)
+ assert(dups.distinct(2).collect === dups.distinct.collect)
assert(nums.reduce(_ + _) === 10)
assert(nums.fold(0)(_ + _) === 10)
assert(nums.map(_.toString).collect().toList === List("1", "2", "3", "4"))
@@ -121,7 +125,7 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
val zipped = nums.zip(nums.map(_ + 1.0))
assert(zipped.glom().map(_.toList).collect().toList ===
List(List((1, 2.0), (2, 3.0)), List((3, 4.0), (4, 5.0))))
-
+
intercept[IllegalArgumentException] {
nums.zip(sc.parallelize(1 to 4, 1)).collect()
}