diff options
author | Mark Hamstra <markhamstra@gmail.com> | 2013-03-16 12:17:13 -0700 |
---|---|---|
committer | Mark Hamstra <markhamstra@gmail.com> | 2013-03-16 12:17:13 -0700 |
commit | 1fb192ef404bb7aae126e30ad8a7663e7df6e618 (patch) | |
tree | e514067804d10ebc5e3189e35264941ae8211006 /core/src/test | |
parent | ef75be3bf72327f38148faae99cf429edcb3a80d (diff) | |
parent | c1e9cdc49f89222b366a14a20ffd937ca0fb9adc (diff) | |
download | spark-1fb192ef404bb7aae126e30ad8a7663e7df6e618.tar.gz spark-1fb192ef404bb7aae126e30ad8a7663e7df6e618.tar.bz2 spark-1fb192ef404bb7aae126e30ad8a7663e7df6e618.zip |
Merge branch 'master' of https://github.com/mesos/spark into foldByKey
Diffstat (limited to 'core/src/test')
-rw-r--r-- | core/src/test/scala/spark/ShuffleSuite.scala | 28 |
1 files changed, 27 insertions, 1 deletions
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index 8411291b2c..2b2a90defa 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -272,13 +272,39 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { } // partitionBy so we have a narrow dependency val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).partitionBy(p) - println(sc.runJob(a, (i: Iterator[(Int, String)]) => i.toList).toList) // more partitions/no partitioner so a shuffle dependency val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4) val c = a.subtract(b) assert(c.collect().toSet === Set((1, "a"), (3, "c"))) + // Ideally we could keep the original partitioner... + assert(c.partitioner === None) + } + + test("subtractByKey") { + sc = new SparkContext("local", "test") + val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2) + val b = sc.parallelize(Array((2, 20), (3, 30), (4, 40)), 4) + val c = a.subtractByKey(b) + assert(c.collect().toSet === Set((1, "a"), (1, "a"))) + assert(c.partitions.size === a.partitions.size) + } + + test("subtractByKey with narrow dependency") { + sc = new SparkContext("local", "test") + // use a deterministic partitioner + val p = new Partitioner() { + def numPartitions = 5 + def getPartition(key: Any) = key.asInstanceOf[Int] + } + // partitionBy so we have a narrow dependency + val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c"))).partitionBy(p) + // more partitions/no partitioner so a shuffle dependency + val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4) + val c = a.subtractByKey(b) + assert(c.collect().toSet === Set((1, "a"), (1, "a"))) assert(c.partitioner.get === p) } + } object ShuffleSuite { |