diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-24 12:17:22 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-24 12:17:22 -0800 |
commit | dff53d1b94f718a7a1140e56c72238f03a61ce25 (patch) | |
tree | 503961d09c7c18348856aa86f90028c27b35f897 /core/src/test | |
parent | 68c7934b1a442d7c83fd7d8f70bc03eee1c61c76 (diff) | |
parent | 3b9f929467f3b14e780df459919a4d6c0c7ee772 (diff) | |
download | spark-dff53d1b94f718a7a1140e56c72238f03a61ce25.tar.gz spark-dff53d1b94f718a7a1140e56c72238f03a61ce25.tar.bz2 spark-dff53d1b94f718a7a1140e56c72238f03a61ce25.zip |
Merge branch 'mesos-master' into streaming
Diffstat (limited to 'core/src/test')
-rw-r--r-- | core/src/test/scala/spark/DistributedSuite.scala | 21 | ||||
-rw-r--r-- | core/src/test/scala/spark/ShuffleSuite.scala | 26 |
2 files changed, 47 insertions, 0 deletions
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index 0e2585daa4..caa4ba3a37 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -217,6 +217,27 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter assert(grouped.collect.size === 1) } } + + test("recover from node failures with replication") { + import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity} + DistributedSuite.amMaster = true + // Using more than two nodes so we don't have a symmetric communication pattern and might + // cache a partially correct list of peers. + sc = new SparkContext("local-cluster[3,1,512]", "test") + for (i <- 1 to 3) { + val data = sc.parallelize(Seq(true, false, false, false), 4) + data.persist(StorageLevel.MEMORY_ONLY_2) + + assert(data.count === 4) + assert(data.map(markNodeIfIdentity).collect.size === 4) + assert(data.map(failOnMarkedIdentity).collect.size === 4) + + // Create a new replicated RDD to make sure that cached peer information doesn't cause + // problems. + val data2 = sc.parallelize(Seq(true, true), 2).persist(StorageLevel.MEMORY_ONLY_2) + assert(data2.count === 2) + } + } } object DistributedSuite { diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index 92c3f67416..77e0eab829 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -234,6 +234,32 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { assert(rdd.keys.collect().toList === List(1, 2)) assert(rdd.values.collect().toList === List("a", "b")) } + + test("subtract") { + sc = new SparkContext("local", "test") + val a = sc.parallelize(Array(1, 2, 3), 2) + val b = sc.parallelize(Array(2, 3, 4), 4) + val c = a.subtract(b) + assert(c.collect().toSet === Set(1)) + assert(c.partitions.size === a.partitions.size) + } + + test("subtract with narrow dependency") { + sc = new SparkContext("local", "test") + // use a deterministic partitioner + val p = new Partitioner() { + def numPartitions = 5 + def getPartition(key: Any) = key.asInstanceOf[Int] + } + // partitionBy so we have a narrow dependency + val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).partitionBy(p) + println(sc.runJob(a, (i: Iterator[(Int, String)]) => i.toList).toList) + // more splits/no partitioner so a shuffle dependency + val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4) + val c = a.subtract(b) + assert(c.collect().toSet === Set((1, "a"), (3, "c"))) + assert(c.partitioner.get === p) + } } object ShuffleSuite { |