aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala')
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala59
1 files changed, 55 insertions, 4 deletions
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index e94a1e76d4..0e5625b764 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -26,6 +26,8 @@ import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.util.Utils
+import org.apache.spark.rdd.RDDSuiteUtils._
+
class RDDSuite extends FunSuite with SharedSparkContext {
test("basic operations") {
@@ -585,14 +587,63 @@ class RDDSuite extends FunSuite with SharedSparkContext {
}
}
+ test("sortByKey") {
+ val data = sc.parallelize(Seq("5|50|A","4|60|C", "6|40|B"))
+
+ val col1 = Array("4|60|C", "5|50|A", "6|40|B")
+ val col2 = Array("6|40|B", "5|50|A", "4|60|C")
+ val col3 = Array("5|50|A", "6|40|B", "4|60|C")
+
+ assert(data.sortBy(_.split("\\|")(0)).collect() === col1)
+ assert(data.sortBy(_.split("\\|")(1)).collect() === col2)
+ assert(data.sortBy(_.split("\\|")(2)).collect() === col3)
+ }
+
+ test("sortByKey ascending parameter") {
+ val data = sc.parallelize(Seq("5|50|A","4|60|C", "6|40|B"))
+
+ val asc = Array("4|60|C", "5|50|A", "6|40|B")
+ val desc = Array("6|40|B", "5|50|A", "4|60|C")
+
+ assert(data.sortBy(_.split("\\|")(0), true).collect() === asc)
+ assert(data.sortBy(_.split("\\|")(0), false).collect() === desc)
+ }
+
+ test("sortByKey with explicit ordering") {
+ val data = sc.parallelize(Seq("Bob|Smith|50",
+ "Jane|Smith|40",
+ "Thomas|Williams|30",
+ "Karen|Williams|60"))
+
+ val ageOrdered = Array("Thomas|Williams|30",
+ "Jane|Smith|40",
+ "Bob|Smith|50",
+ "Karen|Williams|60")
+
+ // last name, then first name
+ val nameOrdered = Array("Bob|Smith|50",
+ "Jane|Smith|40",
+ "Karen|Williams|60",
+ "Thomas|Williams|30")
+
+ val parse = (s: String) => {
+ val split = s.split("\\|")
+ Person(split(0), split(1), split(2).toInt)
+ }
+
+ import scala.reflect.classTag
+ assert(data.sortBy(parse, true, 2)(AgeOrdering, classTag[Person]).collect() === ageOrdered)
+ assert(data.sortBy(parse, true, 2)(NameOrdering, classTag[Person]).collect() === nameOrdered)
+ }
+
test("intersection") {
val all = sc.parallelize(1 to 10)
val evens = sc.parallelize(2 to 10 by 2)
val intersection = Array(2, 4, 6, 8, 10)
// intersection is commutative
- assert(all.intersection(evens).collect.sorted === intersection)
- assert(evens.intersection(all).collect.sorted === intersection)
+ assert(all.intersection(evens).collect().sorted === intersection)
+ assert(evens.intersection(all).collect().sorted === intersection)
}
test("intersection strips duplicates in an input") {
@@ -600,8 +651,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val b = sc.parallelize(Seq(1,1,2,3))
val intersection = Array(1,2,3)
- assert(a.intersection(b).collect.sorted === intersection)
- assert(b.intersection(a).collect.sorted === intersection)
+ assert(a.intersection(b).collect().sorted === intersection)
+ assert(b.intersection(a).collect().sorted === intersection)
}
test("zipWithIndex") {