From 61be8566e24c664442780154debfea884d81f46b Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Mon, 24 Dec 2012 02:26:11 -0800 Subject: Allow distinct() to be called without parentheses when using the default number of splits. --- core/src/main/scala/spark/RDD.scala | 4 +++- core/src/test/scala/spark/RDDSuite.scala | 12 ++++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index bb4c13c494..d15c6f7396 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -185,9 +185,11 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial /** * Return a new RDD containing the distinct elements in this RDD. */ - def distinct(numSplits: Int = splits.size): RDD[T] = + def distinct(numSplits: Int): RDD[T] = map(x => (x, null)).reduceByKey((x, y) => x, numSplits).map(_._1) + def distinct(): RDD[T] = distinct(splits.size) + /** * Return a sampled subset of this RDD. */ diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index b3c820ed94..08da9a1c4d 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -8,9 +8,9 @@ import spark.rdd.CoalescedRDD import SparkContext._ class RDDSuite extends FunSuite with BeforeAndAfter { - + var sc: SparkContext = _ - + after { if (sc != null) { sc.stop() @@ -19,11 +19,15 @@ class RDDSuite extends FunSuite with BeforeAndAfter { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.master.port") } - + test("basic operations") { sc = new SparkContext("local", "test") val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) assert(nums.collect().toList === List(1, 2, 3, 4)) + val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2) + assert(dups.distinct.count === 4) + assert(dups.distinct().collect === dups.distinct.collect) + assert(dups.distinct(2).collect === dups.distinct.collect) assert(nums.reduce(_ + _) === 10) assert(nums.fold(0)(_ + _) === 10) assert(nums.map(_.toString).collect().toList === List("1", "2", "3", "4")) @@ -121,7 +125,7 @@ class RDDSuite extends FunSuite with BeforeAndAfter { val zipped = nums.zip(nums.map(_ + 1.0)) assert(zipped.glom().map(_.toList).collect().toList === List(List((1, 2.0), (2, 3.0)), List((3, 4.0), (4, 5.0)))) - + intercept[IllegalArgumentException] { nums.zip(sc.parallelize(1 to 4, 1)).collect() } -- cgit v1.2.3