aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala10
1 files changed, 8 insertions, 2 deletions
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index cb30e1f4e6..0922a2c359 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -165,6 +165,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
}
// Test that GC causes RDD cleanup after dereferencing the RDD
+ // Note rdd is used after previous GC to avoid early collection by the JVM
val postGCTester = new CleanerTester(sc, rddIds = Seq(rdd.id))
rdd = null // Make RDD out of scope
runGC()
@@ -181,9 +182,9 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1000 millis))
}
+ rdd.count() // Defeat early collection by the JVM
// Test that GC causes shuffle cleanup after dereferencing the RDD
- rdd.count() // Defeat any early collection of rdd variable by the JVM
val postGCTester = new CleanerTester(sc, shuffleIds = Seq(0))
rdd = null // Make RDD out of scope, so that corresponding shuffle goes out of scope
runGC()
@@ -201,6 +202,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
}
// Test that GC causes broadcast cleanup after dereferencing the broadcast variable
+ // Note broadcast is used after previous GC to avoid early collection by the JVM
val postGCTester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id))
broadcast = null // Make broadcast variable out of scope
runGC()
@@ -226,7 +228,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
// the checkpoint is not cleaned by default (without the configuration set)
var postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Nil)
- rdd = null // Make RDD out of scope
+ rdd = null // Make RDD out of scope, ok if collected earlier
runGC()
postGCTester.assertCleanup()
assert(fs.exists(RDDCheckpointData.rddCheckpointDataPath(sc, rddId).get))
@@ -245,6 +247,9 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
// Confirm the checkpoint directory exists
assert(fs.exists(RDDCheckpointData.rddCheckpointDataPath(sc, rddId).get))
+ // Reference rdd to defeat any early collection by the JVM
+ rdd.count()
+
// Test that GC causes checkpoint data cleanup after dereferencing the RDD
postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Seq(rddId))
rdd = null // Make RDD out of scope
@@ -352,6 +357,7 @@ class SortShuffleContextCleanerSuite extends ContextCleanerSuiteBase(classOf[Sor
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1000 millis))
}
+ rdd.count() // Defeat early collection by the JVM
// Test that GC causes shuffle cleanup after dereferencing the RDD
val postGCTester = new CleanerTester(sc, shuffleIds = Seq(0))