diff options
Diffstat (limited to 'core/src/test')
-rw-r--r-- | core/src/test/scala/org/apache/spark/DistributedSuite.scala | 77 |
1 files changed, 23 insertions, 54 deletions
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index 6beae842b0..4ee0e00fde 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -149,61 +149,16 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex sc.parallelize(1 to 10).count() } - test("caching") { + private def testCaching(storageLevel: StorageLevel): Unit = { sc = new SparkContext(clusterUrl, "test") - val data = sc.parallelize(1 to 1000, 10).cache() - assert(data.count() === 1000) - assert(data.count() === 1000) - assert(data.count() === 1000) - } - - test("caching on disk") { - sc = new SparkContext(clusterUrl, "test") - val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.DISK_ONLY) - assert(data.count() === 1000) - assert(data.count() === 1000) - assert(data.count() === 1000) - } - - test("caching in memory, replicated") { - sc = new SparkContext(clusterUrl, "test") - val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_2) - assert(data.count() === 1000) - assert(data.count() === 1000) - assert(data.count() === 1000) - } - - test("caching in memory, serialized, replicated") { - sc = new SparkContext(clusterUrl, "test") - val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_SER_2) - assert(data.count() === 1000) - assert(data.count() === 1000) - assert(data.count() === 1000) - } - - test("caching on disk, replicated") { - sc = new SparkContext(clusterUrl, "test") - val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.DISK_ONLY_2) - assert(data.count() === 1000) - assert(data.count() === 1000) - assert(data.count() === 1000) - } - - test("caching in memory and disk, replicated") { - sc = new SparkContext(clusterUrl, "test") - val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_2) - assert(data.count() === 1000) - assert(data.count() === 1000) - assert(data.count() === 1000) - } - - test("caching in memory and disk, serialized, replicated") { - sc = new SparkContext(clusterUrl, "test") - val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_SER_2) - - assert(data.count() === 1000) - assert(data.count() === 1000) - assert(data.count() === 1000) + sc.jobProgressListener.waitUntilExecutorsUp(2, 30000) + val data = sc.parallelize(1 to 1000, 10) + val cachedData = data.persist(storageLevel) + assert(cachedData.count === 1000) + assert(sc.getExecutorStorageStatus.map(_.rddBlocksById(cachedData.id).size).sum === + storageLevel.replication * data.getNumPartitions) + assert(cachedData.count === 1000) + assert(cachedData.count === 1000) // Get all the locations of the first partition and try to fetch the partitions // from those locations. @@ -221,6 +176,20 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex } } + Seq( + "caching" -> StorageLevel.MEMORY_ONLY, + "caching on disk" -> StorageLevel.DISK_ONLY, + "caching in memory, replicated" -> StorageLevel.MEMORY_ONLY_2, + "caching in memory, serialized, replicated" -> StorageLevel.MEMORY_ONLY_SER_2, + "caching on disk, replicated" -> StorageLevel.DISK_ONLY_2, + "caching in memory and disk, replicated" -> StorageLevel.MEMORY_AND_DISK_2, + "caching in memory and disk, serialized, replicated" -> StorageLevel.MEMORY_AND_DISK_SER_2 + ).foreach { case (testName, storageLevel) => + test(testName) { + testCaching(storageLevel) + } + } + test("compute without caching when no partitions fit in memory") { val size = 10000 val conf = new SparkConf() |