diff options
author | Mark Hamstra <markhamstra@gmail.com> | 2012-12-24 02:26:11 -0800 |
---|---|---|
committer | Mark Hamstra <markhamstra@gmail.com> | 2012-12-24 02:36:47 -0800 |
commit | 61be8566e24c664442780154debfea884d81f46b (patch) | |
tree | 707038b8b9e31241474c6a0a546e3c700a1c8861 | |
parent | a6bb41c6d389f1b98d5542000a7a9705ba282273 (diff) | |
download | spark-61be8566e24c664442780154debfea884d81f46b.tar.gz spark-61be8566e24c664442780154debfea884d81f46b.tar.bz2 spark-61be8566e24c664442780154debfea884d81f46b.zip |
Allow distinct() to be called without parentheses when using the default number of splits.
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 4 | ||||
-rw-r--r-- | core/src/test/scala/spark/RDDSuite.scala | 12 |
2 files changed, 11 insertions, 5 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index bb4c13c494..d15c6f7396 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -185,9 +185,11 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial /** * Return a new RDD containing the distinct elements in this RDD. */ - def distinct(numSplits: Int = splits.size): RDD[T] = + def distinct(numSplits: Int): RDD[T] = map(x => (x, null)).reduceByKey((x, y) => x, numSplits).map(_._1) + def distinct(): RDD[T] = distinct(splits.size) + /** * Return a sampled subset of this RDD. */ diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index b3c820ed94..08da9a1c4d 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -8,9 +8,9 @@ import spark.rdd.CoalescedRDD import SparkContext._ class RDDSuite extends FunSuite with BeforeAndAfter { - + var sc: SparkContext = _ - + after { if (sc != null) { sc.stop() @@ -19,11 +19,15 @@ class RDDSuite extends FunSuite with BeforeAndAfter { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.master.port") } - + test("basic operations") { sc = new SparkContext("local", "test") val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) assert(nums.collect().toList === List(1, 2, 3, 4)) + val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2) + assert(dups.distinct.count === 4) + assert(dups.distinct().collect === dups.distinct.collect) + assert(dups.distinct(2).collect === dups.distinct.collect) assert(nums.reduce(_ + _) === 10) assert(nums.fold(0)(_ + _) === 10) assert(nums.map(_.toString).collect().toList === List("1", "2", "3", "4")) @@ -121,7 +125,7 @@ class RDDSuite extends FunSuite with BeforeAndAfter { val zipped = nums.zip(nums.map(_ + 1.0)) assert(zipped.glom().map(_.toList).collect().toList === List(List((1, 2.0), (2, 3.0)), List((3, 4.0), (4, 5.0)))) - + intercept[IllegalArgumentException] { nums.zip(sc.parallelize(1 to 4, 1)).collect() } |