aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache')
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala77
1 files changed, 23 insertions, 54 deletions
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 6beae842b0..4ee0e00fde 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -149,61 +149,16 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
sc.parallelize(1 to 10).count()
}
- test("caching") {
+ private def testCaching(storageLevel: StorageLevel): Unit = {
sc = new SparkContext(clusterUrl, "test")
- val data = sc.parallelize(1 to 1000, 10).cache()
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- }
-
- test("caching on disk") {
- sc = new SparkContext(clusterUrl, "test")
- val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.DISK_ONLY)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- }
-
- test("caching in memory, replicated") {
- sc = new SparkContext(clusterUrl, "test")
- val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_2)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- }
-
- test("caching in memory, serialized, replicated") {
- sc = new SparkContext(clusterUrl, "test")
- val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_SER_2)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- }
-
- test("caching on disk, replicated") {
- sc = new SparkContext(clusterUrl, "test")
- val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.DISK_ONLY_2)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- }
-
- test("caching in memory and disk, replicated") {
- sc = new SparkContext(clusterUrl, "test")
- val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_2)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- }
-
- test("caching in memory and disk, serialized, replicated") {
- sc = new SparkContext(clusterUrl, "test")
- val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
-
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
+ sc.jobProgressListener.waitUntilExecutorsUp(2, 30000)
+ val data = sc.parallelize(1 to 1000, 10)
+ val cachedData = data.persist(storageLevel)
+ assert(cachedData.count === 1000)
+ assert(sc.getExecutorStorageStatus.map(_.rddBlocksById(cachedData.id).size).sum ===
+ storageLevel.replication * data.getNumPartitions)
+ assert(cachedData.count === 1000)
+ assert(cachedData.count === 1000)
// Get all the locations of the first partition and try to fetch the partitions
// from those locations.
@@ -221,6 +176,20 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
}
}
+ Seq(
+ "caching" -> StorageLevel.MEMORY_ONLY,
+ "caching on disk" -> StorageLevel.DISK_ONLY,
+ "caching in memory, replicated" -> StorageLevel.MEMORY_ONLY_2,
+ "caching in memory, serialized, replicated" -> StorageLevel.MEMORY_ONLY_SER_2,
+ "caching on disk, replicated" -> StorageLevel.DISK_ONLY_2,
+ "caching in memory and disk, replicated" -> StorageLevel.MEMORY_AND_DISK_2,
+ "caching in memory and disk, serialized, replicated" -> StorageLevel.MEMORY_AND_DISK_SER_2
+ ).foreach { case (testName, storageLevel) =>
+ test(testName) {
+ testCaching(storageLevel)
+ }
+ }
+
test("compute without caching when no partitions fit in memory") {
val size = 10000
val conf = new SparkConf()