aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorImran Rashid <imran@quantifind.com>2013-03-03 15:20:46 -0800
committerImran Rashid <imran@quantifind.com>2013-03-03 15:20:46 -0800
commitd36abdb053be0dae2225b09c747830ac7b64387d (patch)
tree25b88a9446b4f86c740322575e60404f9bb325b5 /core/src/test
parent8f17387d9723c1359c86ff1773cf6613f02bb9c6 (diff)
parent6bfc7cad6bf58aa40975c0984b98debb33df080d (diff)
downloadspark-d36abdb053be0dae2225b09c747830ac7b64387d.tar.gz
spark-d36abdb053be0dae2225b09c747830ac7b64387d.tar.bz2
spark-d36abdb053be0dae2225b09c747830ac7b64387d.zip
Merge branch 'master' into stageInfo
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/spark/CheckpointSuite.scala10
-rw-r--r--core/src/test/scala/spark/DistributedSuite.scala37
-rw-r--r--core/src/test/scala/spark/PartitioningSuite.scala8
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala45
4 files changed, 95 insertions, 5 deletions
diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala
index 3e5ffa81d6..ca385972fb 100644
--- a/core/src/test/scala/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/spark/CheckpointSuite.scala
@@ -162,6 +162,16 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false)
}
+ test("CheckpointRDD with zero partitions") {
+ val rdd = new BlockRDD[Int](sc, Array[String]())
+ assert(rdd.partitions.size === 0)
+ assert(rdd.isCheckpointed === false)
+ rdd.checkpoint()
+ assert(rdd.count() === 0)
+ assert(rdd.isCheckpointed === true)
+ assert(rdd.partitions.size === 0)
+ }
+
/**
* Test checkpointing of the final RDD generated by the given operation. By default,
* this method tests whether the size of serialized RDD has reduced after checkpointing or not.
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index 0e2585daa4..4104b33c8b 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -1,5 +1,6 @@
package spark
+import network.ConnectionManagerId
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import org.scalatest.matchers.ShouldMatchers
@@ -13,7 +14,7 @@ import com.google.common.io.Files
import scala.collection.mutable.ArrayBuffer
import SparkContext._
-import storage.StorageLevel
+import storage.{GetBlock, BlockManagerWorker, StorageLevel}
class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter with LocalSparkContext {
@@ -140,9 +141,22 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
test("caching in memory and disk, serialized, replicated") {
sc = new SparkContext(clusterUrl, "test")
val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
+
assert(data.count() === 1000)
assert(data.count() === 1000)
assert(data.count() === 1000)
+
+ // Get all the locations of the first partition and try to fetch the partitions
+ // from those locations.
+ val blockIds = data.partitions.indices.map(index => "rdd_%d_%d".format(data.id, index)).toArray
+ val blockId = blockIds(0)
+ val blockManager = SparkEnv.get.blockManager
+ blockManager.master.getLocations(blockId).foreach(id => {
+ val bytes = BlockManagerWorker.syncGetBlock(
+ GetBlock(blockId), ConnectionManagerId(id.ip, id.port))
+ val deserialized = blockManager.dataDeserialize(blockId, bytes).asInstanceOf[Iterator[Int]].toList
+ assert(deserialized === (1 to 100).toList)
+ })
}
test("compute without caching when no partitions fit in memory") {
@@ -217,6 +231,27 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
assert(grouped.collect.size === 1)
}
}
+
+ test("recover from node failures with replication") {
+ import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
+ DistributedSuite.amMaster = true
+ // Using more than two nodes so we don't have a symmetric communication pattern and might
+ // cache a partially correct list of peers.
+ sc = new SparkContext("local-cluster[3,1,512]", "test")
+ for (i <- 1 to 3) {
+ val data = sc.parallelize(Seq(true, false, false, false), 4)
+ data.persist(StorageLevel.MEMORY_ONLY_2)
+
+ assert(data.count === 4)
+ assert(data.map(markNodeIfIdentity).collect.size === 4)
+ assert(data.map(failOnMarkedIdentity).collect.size === 4)
+
+ // Create a new replicated RDD to make sure that cached peer information doesn't cause
+ // problems.
+ val data2 = sc.parallelize(Seq(true, true), 2).persist(StorageLevel.MEMORY_ONLY_2)
+ assert(data2.count === 2)
+ }
+ }
}
object DistributedSuite {
diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala
index af1107cd19..60db759c25 100644
--- a/core/src/test/scala/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/spark/PartitioningSuite.scala
@@ -84,10 +84,10 @@ class PartitioningSuite extends FunSuite with LocalSparkContext {
assert(grouped4.groupByKey(3).partitioner != grouped4.partitioner)
assert(grouped4.groupByKey(4).partitioner === grouped4.partitioner)
- assert(grouped2.join(grouped4).partitioner === grouped2.partitioner)
- assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped2.partitioner)
- assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped2.partitioner)
- assert(grouped2.cogroup(grouped4).partitioner === grouped2.partitioner)
+ assert(grouped2.join(grouped4).partitioner === grouped4.partitioner)
+ assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped4.partitioner)
+ assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped4.partitioner)
+ assert(grouped2.cogroup(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.join(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner)
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 92c3f67416..8411291b2c 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -234,6 +234,51 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
assert(rdd.keys.collect().toList === List(1, 2))
assert(rdd.values.collect().toList === List("a", "b"))
}
+
+ test("default partitioner uses partition size") {
+ sc = new SparkContext("local", "test")
+ // specify 2000 partitions
+ val a = sc.makeRDD(Array(1, 2, 3, 4), 2000)
+ // do a map, which loses the partitioner
+ val b = a.map(a => (a, (a * 2).toString))
+ // then a group by, and see we didn't revert to 2 partitions
+ val c = b.groupByKey()
+ assert(c.partitions.size === 2000)
+ }
+
+ test("default partitioner uses largest partitioner") {
+ sc = new SparkContext("local", "test")
+ val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2)
+ val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000)
+ val c = a.join(b)
+ assert(c.partitions.size === 2000)
+ }
+
+ test("subtract") {
+ sc = new SparkContext("local", "test")
+ val a = sc.parallelize(Array(1, 2, 3), 2)
+ val b = sc.parallelize(Array(2, 3, 4), 4)
+ val c = a.subtract(b)
+ assert(c.collect().toSet === Set(1))
+ assert(c.partitions.size === a.partitions.size)
+ }
+
+ test("subtract with narrow dependency") {
+ sc = new SparkContext("local", "test")
+ // use a deterministic partitioner
+ val p = new Partitioner() {
+ def numPartitions = 5
+ def getPartition(key: Any) = key.asInstanceOf[Int]
+ }
+ // partitionBy so we have a narrow dependency
+ val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).partitionBy(p)
+ println(sc.runJob(a, (i: Iterator[(Int, String)]) => i.toList).toList)
+ // more partitions/no partitioner so a shuffle dependency
+ val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
+ val c = a.subtract(b)
+ assert(c.collect().toSet === Set((1, "a"), (3, "c")))
+ assert(c.partitioner.get === p)
+ }
}
object ShuffleSuite {