aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGuoQiang Li <witgo@qq.com>2015-04-19 09:37:09 +0100
committerSean Owen <sowen@cloudera.com>2015-04-19 09:37:09 +0100
commit0424da68d4c81dc3a9944d8485feb1233c6633c4 (patch)
tree88b8c9a59f2b0457f11202974ac53947919a9f8b
parent8fbd45c74e762dd6b071ea58a60f5bb649f74042 (diff)
downloadspark-0424da68d4c81dc3a9944d8485feb1233c6633c4.tar.gz
spark-0424da68d4c81dc3a9944d8485feb1233c6633c4.tar.bz2
spark-0424da68d4c81dc3a9944d8485feb1233c6633c4.zip
[SPARK-6963][CORE]Flaky test: o.a.s.ContextCleanerSuite automatically cleanup checkpoint
cc andrewor14 Author: GuoQiang Li <witgo@qq.com> Closes #5548 from witgo/SPARK-6963 and squashes the following commits: 964aea7 [GuoQiang Li] review commits b08b3c9 [GuoQiang Li] Flaky test: o.a.s.ContextCleanerSuite automatically cleanup checkpoint
-rw-r--r--core/src/main/scala/org/apache/spark/ContextCleaner.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala21
2 files changed, 17 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 715b259057..37198d887b 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -236,6 +236,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
try {
logDebug("Cleaning rdd checkpoint data " + rddId)
RDDCheckpointData.clearRDDCheckpointData(sc, rddId)
+ listeners.foreach(_.checkpointCleaned(rddId))
logInfo("Cleaned rdd checkpoint data " + rddId)
}
catch {
@@ -260,4 +261,5 @@ private[spark] trait CleanerListener {
def shuffleCleaned(shuffleId: Int)
def broadcastCleaned(broadcastId: Long)
def accumCleaned(accId: Long)
+ def checkpointCleaned(rddId: Long)
}
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index 097e7076e5..c7868ddcf7 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -224,7 +224,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
assert(fs.exists(path))
// the checkpoint is not cleaned by default (without the configuration set)
- var postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil)
+ var postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Nil)
rdd = null // Make RDD out of scope
runGC()
postGCTester.assertCleanup()
@@ -245,7 +245,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
assert(fs.exists(RDDCheckpointData.rddCheckpointDataPath(sc, rddId).get))
// Test that GC causes checkpoint data cleanup after dereferencing the RDD
- postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil)
+ postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Seq(rddId))
rdd = null // Make RDD out of scope
runGC()
postGCTester.assertCleanup()
@@ -406,12 +406,14 @@ class CleanerTester(
sc: SparkContext,
rddIds: Seq[Int] = Seq.empty,
shuffleIds: Seq[Int] = Seq.empty,
- broadcastIds: Seq[Long] = Seq.empty)
+ broadcastIds: Seq[Long] = Seq.empty,
+ checkpointIds: Seq[Long] = Seq.empty)
extends Logging {
val toBeCleanedRDDIds = new HashSet[Int] with SynchronizedSet[Int] ++= rddIds
val toBeCleanedShuffleIds = new HashSet[Int] with SynchronizedSet[Int] ++= shuffleIds
val toBeCleanedBroadcstIds = new HashSet[Long] with SynchronizedSet[Long] ++= broadcastIds
+ val toBeCheckpointIds = new HashSet[Long] with SynchronizedSet[Long] ++= checkpointIds
val isDistributed = !sc.isLocal
val cleanerListener = new CleanerListener {
@@ -427,12 +429,17 @@ class CleanerTester(
def broadcastCleaned(broadcastId: Long): Unit = {
toBeCleanedBroadcstIds -= broadcastId
- logInfo("Broadcast" + broadcastId + " cleaned")
+ logInfo("Broadcast " + broadcastId + " cleaned")
}
def accumCleaned(accId: Long): Unit = {
logInfo("Cleaned accId " + accId + " cleaned")
}
+
+ def checkpointCleaned(rddId: Long): Unit = {
+ toBeCheckpointIds -= rddId
+ logInfo("checkpoint " + rddId + " cleaned")
+ }
}
val MAX_VALIDATION_ATTEMPTS = 10
@@ -456,7 +463,8 @@ class CleanerTester(
/** Verify that RDDs, shuffles, etc. occupy resources */
private def preCleanupValidate() {
- assert(rddIds.nonEmpty || shuffleIds.nonEmpty || broadcastIds.nonEmpty, "Nothing to cleanup")
+ assert(rddIds.nonEmpty || shuffleIds.nonEmpty || broadcastIds.nonEmpty ||
+ checkpointIds.nonEmpty, "Nothing to cleanup")
// Verify the RDDs have been persisted and blocks are present
rddIds.foreach { rddId =>
@@ -547,7 +555,8 @@ class CleanerTester(
private def isAllCleanedUp =
toBeCleanedRDDIds.isEmpty &&
toBeCleanedShuffleIds.isEmpty &&
- toBeCleanedBroadcstIds.isEmpty
+ toBeCleanedBroadcstIds.isEmpty &&
+ toBeCheckpointIds.isEmpty
private def getRDDBlocks(rddId: Int): Seq[BlockId] = {
blockManager.master.getMatchingBlockIds( _ match {