aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-08-22 16:32:14 -0700
committerReynold Xin <rxin@databricks.com>2016-08-22 16:32:14 -0700
commit8e223ea67acf5aa730ccf688802f17f6fc10907c (patch)
treeb60988fb0166c27148e44636326ed20399327d94 /core/src/test/scala/org
parent71afeeea4ec8e67edc95b5d504c557c88a2598b9 (diff)
downloadspark-8e223ea67acf5aa730ccf688802f17f6fc10907c.tar.gz
spark-8e223ea67acf5aa730ccf688802f17f6fc10907c.tar.bz2
spark-8e223ea67acf5aa730ccf688802f17f6fc10907c.zip
[SPARK-16550][SPARK-17042][CORE] Certain classes fail to deserialize in block manager replication
## What changes were proposed in this pull request? This is a straightforward clone of JoshRosen 's original patch. I have follow-up changes to fix block replication for repl-defined classes as well, but those appear to be flaking tests so I'm going to leave that for SPARK-17042 ## How was this patch tested? End-to-end test in ReplSuite (also more tests in DistributedSuite from the original patch). Author: Eric Liang <ekl@databricks.com> Closes #14311 from ericl/spark-16550.
Diffstat (limited to 'core/src/test/scala/org')
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala77
1 files changed, 23 insertions, 54 deletions
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 6beae842b0..4ee0e00fde 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -149,61 +149,16 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
sc.parallelize(1 to 10).count()
}
- test("caching") {
+ private def testCaching(storageLevel: StorageLevel): Unit = {
sc = new SparkContext(clusterUrl, "test")
- val data = sc.parallelize(1 to 1000, 10).cache()
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- }
-
- test("caching on disk") {
- sc = new SparkContext(clusterUrl, "test")
- val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.DISK_ONLY)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- }
-
- test("caching in memory, replicated") {
- sc = new SparkContext(clusterUrl, "test")
- val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_2)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- }
-
- test("caching in memory, serialized, replicated") {
- sc = new SparkContext(clusterUrl, "test")
- val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_SER_2)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- }
-
- test("caching on disk, replicated") {
- sc = new SparkContext(clusterUrl, "test")
- val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.DISK_ONLY_2)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- }
-
- test("caching in memory and disk, replicated") {
- sc = new SparkContext(clusterUrl, "test")
- val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_2)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- }
-
- test("caching in memory and disk, serialized, replicated") {
- sc = new SparkContext(clusterUrl, "test")
- val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
-
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
+ sc.jobProgressListener.waitUntilExecutorsUp(2, 30000)
+ val data = sc.parallelize(1 to 1000, 10)
+ val cachedData = data.persist(storageLevel)
+ assert(cachedData.count === 1000)
+ assert(sc.getExecutorStorageStatus.map(_.rddBlocksById(cachedData.id).size).sum ===
+ storageLevel.replication * data.getNumPartitions)
+ assert(cachedData.count === 1000)
+ assert(cachedData.count === 1000)
// Get all the locations of the first partition and try to fetch the partitions
// from those locations.
@@ -221,6 +176,20 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
}
}
+ Seq(
+ "caching" -> StorageLevel.MEMORY_ONLY,
+ "caching on disk" -> StorageLevel.DISK_ONLY,
+ "caching in memory, replicated" -> StorageLevel.MEMORY_ONLY_2,
+ "caching in memory, serialized, replicated" -> StorageLevel.MEMORY_ONLY_SER_2,
+ "caching on disk, replicated" -> StorageLevel.DISK_ONLY_2,
+ "caching in memory and disk, replicated" -> StorageLevel.MEMORY_AND_DISK_2,
+ "caching in memory and disk, serialized, replicated" -> StorageLevel.MEMORY_AND_DISK_SER_2
+ ).foreach { case (testName, storageLevel) =>
+ test(testName) {
+ testCaching(storageLevel)
+ }
+ }
+
test("compute without caching when no partitions fit in memory") {
val size = 10000
val conf = new SparkConf()