diff options
author | Imran Rashid <imran@quantifind.com> | 2013-03-03 15:20:46 -0800 |
---|---|---|
committer | Imran Rashid <imran@quantifind.com> | 2013-03-03 15:20:46 -0800 |
commit | d36abdb053be0dae2225b09c747830ac7b64387d (patch) | |
tree | 25b88a9446b4f86c740322575e60404f9bb325b5 /core/src/test | |
parent | 8f17387d9723c1359c86ff1773cf6613f02bb9c6 (diff) | |
parent | 6bfc7cad6bf58aa40975c0984b98debb33df080d (diff) | |
download | spark-d36abdb053be0dae2225b09c747830ac7b64387d.tar.gz spark-d36abdb053be0dae2225b09c747830ac7b64387d.tar.bz2 spark-d36abdb053be0dae2225b09c747830ac7b64387d.zip |
Merge branch 'master' into stageInfo
Diffstat (limited to 'core/src/test')
-rw-r--r-- | core/src/test/scala/spark/CheckpointSuite.scala | 10 | ||||
-rw-r--r-- | core/src/test/scala/spark/DistributedSuite.scala | 37 | ||||
-rw-r--r-- | core/src/test/scala/spark/PartitioningSuite.scala | 8 | ||||
-rw-r--r-- | core/src/test/scala/spark/ShuffleSuite.scala | 45 |
4 files changed, 95 insertions, 5 deletions
diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala index 3e5ffa81d6..ca385972fb 100644 --- a/core/src/test/scala/spark/CheckpointSuite.scala +++ b/core/src/test/scala/spark/CheckpointSuite.scala @@ -162,6 +162,16 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false) } + test("CheckpointRDD with zero partitions") { + val rdd = new BlockRDD[Int](sc, Array[String]()) + assert(rdd.partitions.size === 0) + assert(rdd.isCheckpointed === false) + rdd.checkpoint() + assert(rdd.count() === 0) + assert(rdd.isCheckpointed === true) + assert(rdd.partitions.size === 0) + } + /** * Test checkpointing of the final RDD generated by the given operation. By default, * this method tests whether the size of serialized RDD has reduced after checkpointing or not. diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index 0e2585daa4..4104b33c8b 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -1,5 +1,6 @@ package spark +import network.ConnectionManagerId import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter import org.scalatest.matchers.ShouldMatchers @@ -13,7 +14,7 @@ import com.google.common.io.Files import scala.collection.mutable.ArrayBuffer import SparkContext._ -import storage.StorageLevel +import storage.{GetBlock, BlockManagerWorker, StorageLevel} class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter with LocalSparkContext { @@ -140,9 +141,22 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter test("caching in memory and disk, serialized, replicated") { sc = new SparkContext(clusterUrl, "test") val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_SER_2) + assert(data.count() === 1000) assert(data.count() === 1000) assert(data.count() === 1000) + + // Get all the locations of the first partition and try to fetch the partitions + // from those locations. + val blockIds = data.partitions.indices.map(index => "rdd_%d_%d".format(data.id, index)).toArray + val blockId = blockIds(0) + val blockManager = SparkEnv.get.blockManager + blockManager.master.getLocations(blockId).foreach(id => { + val bytes = BlockManagerWorker.syncGetBlock( + GetBlock(blockId), ConnectionManagerId(id.ip, id.port)) + val deserialized = blockManager.dataDeserialize(blockId, bytes).asInstanceOf[Iterator[Int]].toList + assert(deserialized === (1 to 100).toList) + }) } test("compute without caching when no partitions fit in memory") { @@ -217,6 +231,27 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter assert(grouped.collect.size === 1) } } + + test("recover from node failures with replication") { + import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity} + DistributedSuite.amMaster = true + // Using more than two nodes so we don't have a symmetric communication pattern and might + // cache a partially correct list of peers. + sc = new SparkContext("local-cluster[3,1,512]", "test") + for (i <- 1 to 3) { + val data = sc.parallelize(Seq(true, false, false, false), 4) + data.persist(StorageLevel.MEMORY_ONLY_2) + + assert(data.count === 4) + assert(data.map(markNodeIfIdentity).collect.size === 4) + assert(data.map(failOnMarkedIdentity).collect.size === 4) + + // Create a new replicated RDD to make sure that cached peer information doesn't cause + // problems. + val data2 = sc.parallelize(Seq(true, true), 2).persist(StorageLevel.MEMORY_ONLY_2) + assert(data2.count === 2) + } + } } object DistributedSuite { diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala index af1107cd19..60db759c25 100644 --- a/core/src/test/scala/spark/PartitioningSuite.scala +++ b/core/src/test/scala/spark/PartitioningSuite.scala @@ -84,10 +84,10 @@ class PartitioningSuite extends FunSuite with LocalSparkContext { assert(grouped4.groupByKey(3).partitioner != grouped4.partitioner) assert(grouped4.groupByKey(4).partitioner === grouped4.partitioner) - assert(grouped2.join(grouped4).partitioner === grouped2.partitioner) - assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped2.partitioner) - assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped2.partitioner) - assert(grouped2.cogroup(grouped4).partitioner === grouped2.partitioner) + assert(grouped2.join(grouped4).partitioner === grouped4.partitioner) + assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped4.partitioner) + assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped4.partitioner) + assert(grouped2.cogroup(grouped4).partitioner === grouped4.partitioner) assert(grouped2.join(reduced2).partitioner === grouped2.partitioner) assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner) diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index 92c3f67416..8411291b2c 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -234,6 +234,51 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { assert(rdd.keys.collect().toList === List(1, 2)) assert(rdd.values.collect().toList === List("a", "b")) } + + test("default partitioner uses partition size") { + sc = new SparkContext("local", "test") + // specify 2000 partitions + val a = sc.makeRDD(Array(1, 2, 3, 4), 2000) + // do a map, which loses the partitioner + val b = a.map(a => (a, (a * 2).toString)) + // then a group by, and see we didn't revert to 2 partitions + val c = b.groupByKey() + assert(c.partitions.size === 2000) + } + + test("default partitioner uses largest partitioner") { + sc = new SparkContext("local", "test") + val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2) + val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000) + val c = a.join(b) + assert(c.partitions.size === 2000) + } + + test("subtract") { + sc = new SparkContext("local", "test") + val a = sc.parallelize(Array(1, 2, 3), 2) + val b = sc.parallelize(Array(2, 3, 4), 4) + val c = a.subtract(b) + assert(c.collect().toSet === Set(1)) + assert(c.partitions.size === a.partitions.size) + } + + test("subtract with narrow dependency") { + sc = new SparkContext("local", "test") + // use a deterministic partitioner + val p = new Partitioner() { + def numPartitions = 5 + def getPartition(key: Any) = key.asInstanceOf[Int] + } + // partitionBy so we have a narrow dependency + val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).partitionBy(p) + println(sc.runJob(a, (i: Iterator[(Int, String)]) => i.toList).toList) + // more partitions/no partitioner so a shuffle dependency + val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4) + val c = a.subtract(b) + assert(c.collect().toSet === Set((1, "a"), (3, "c"))) + assert(c.partitioner.get === p) + } } object ShuffleSuite { |