aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorMark Hamstra <markhamstra@gmail.com>2013-03-16 12:17:13 -0700
committerMark Hamstra <markhamstra@gmail.com>2013-03-16 12:17:13 -0700
commit1fb192ef404bb7aae126e30ad8a7663e7df6e618 (patch)
treee514067804d10ebc5e3189e35264941ae8211006 /core/src/test
parentef75be3bf72327f38148faae99cf429edcb3a80d (diff)
parentc1e9cdc49f89222b366a14a20ffd937ca0fb9adc (diff)
downloadspark-1fb192ef404bb7aae126e30ad8a7663e7df6e618.tar.gz
spark-1fb192ef404bb7aae126e30ad8a7663e7df6e618.tar.bz2
spark-1fb192ef404bb7aae126e30ad8a7663e7df6e618.zip
Merge branch 'master' of https://github.com/mesos/spark into foldByKey
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala28
1 files changed, 27 insertions, 1 deletions
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 8411291b2c..2b2a90defa 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -272,13 +272,39 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
}
// partitionBy so we have a narrow dependency
val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).partitionBy(p)
- println(sc.runJob(a, (i: Iterator[(Int, String)]) => i.toList).toList)
// more partitions/no partitioner so a shuffle dependency
val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
val c = a.subtract(b)
assert(c.collect().toSet === Set((1, "a"), (3, "c")))
+ // Ideally we could keep the original partitioner...
+ assert(c.partitioner === None)
+ }
+
+ test("subtractByKey") {
+ sc = new SparkContext("local", "test")
+ val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2)
+ val b = sc.parallelize(Array((2, 20), (3, 30), (4, 40)), 4)
+ val c = a.subtractByKey(b)
+ assert(c.collect().toSet === Set((1, "a"), (1, "a")))
+ assert(c.partitions.size === a.partitions.size)
+ }
+
+ test("subtractByKey with narrow dependency") {
+ sc = new SparkContext("local", "test")
+ // use a deterministic partitioner
+ val p = new Partitioner() {
+ def numPartitions = 5
+ def getPartition(key: Any) = key.asInstanceOf[Int]
+ }
+ // partitionBy so we have a narrow dependency
+ val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c"))).partitionBy(p)
+ // more partitions/no partitioner so a shuffle dependency
+ val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
+ val c = a.subtractByKey(b)
+ assert(c.collect().toSet === Set((1, "a"), (1, "a")))
assert(c.partitioner.get === p)
}
+
}
object ShuffleSuite {