aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala62
1 files changed, 37 insertions, 25 deletions
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index 13b415cccb..871f831531 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -52,9 +52,8 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
}
}
-
test("cleanup RDD") {
- val rdd = newRDD.persist()
+ val rdd = newRDD().persist()
val collected = rdd.collect().toList
val tester = new CleanerTester(sc, rddIds = Seq(rdd.id))
@@ -67,7 +66,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
}
test("cleanup shuffle") {
- val (rdd, shuffleDeps) = newRDDWithShuffleDependencies
+ val (rdd, shuffleDeps) = newRDDWithShuffleDependencies()
val collected = rdd.collect().toList
val tester = new CleanerTester(sc, shuffleIds = shuffleDeps.map(_.shuffleId))
@@ -80,7 +79,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
}
test("cleanup broadcast") {
- val broadcast = newBroadcast
+ val broadcast = newBroadcast()
val tester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id))
// Explicit cleanup
@@ -89,7 +88,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
}
test("automatically cleanup RDD") {
- var rdd = newRDD.persist()
+ var rdd = newRDD().persist()
rdd.count()
// Test that GC does not cause RDD cleanup due to a strong reference
@@ -107,7 +106,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
}
test("automatically cleanup shuffle") {
- var rdd = newShuffleRDD
+ var rdd = newShuffleRDD()
rdd.count()
// Test that GC does not cause shuffle cleanup due to a strong reference
@@ -125,7 +124,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
}
test("automatically cleanup broadcast") {
- var broadcast = newBroadcast
+ var broadcast = newBroadcast()
// Test that GC does not cause broadcast cleanup due to a strong reference
val preGCTester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id))
@@ -141,11 +140,23 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
postGCTester.assertCleanup()
}
+ test("automatically cleanup broadcast data for task dispatching") {
+ var rdd = newRDDWithShuffleDependencies()._1
+ rdd.count() // This triggers an action that broadcasts the RDDs.
+
+ // Test that GC causes broadcast task data cleanup after dereferencing the RDD.
+ val postGCTester = new CleanerTester(sc,
+ broadcastIds = Seq(rdd.broadcasted.id, rdd.firstParent.broadcasted.id))
+ rdd = null
+ runGC()
+ postGCTester.assertCleanup()
+ }
+
test("automatically cleanup RDD + shuffle + broadcast") {
val numRdds = 100
val numBroadcasts = 4 // Broadcasts are more costly
- val rddBuffer = (1 to numRdds).map(i => randomRdd).toBuffer
- val broadcastBuffer = (1 to numBroadcasts).map(i => randomBroadcast).toBuffer
+ val rddBuffer = (1 to numRdds).map(i => randomRdd()).toBuffer
+ val broadcastBuffer = (1 to numBroadcasts).map(i => randomBroadcast()).toBuffer
val rddIds = sc.persistentRdds.keys.toSeq
val shuffleIds = 0 until sc.newShuffleId
val broadcastIds = 0L until numBroadcasts
@@ -175,8 +186,8 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
val numRdds = 10
val numBroadcasts = 4 // Broadcasts are more costly
- val rddBuffer = (1 to numRdds).map(i => randomRdd).toBuffer
- val broadcastBuffer = (1 to numBroadcasts).map(i => randomBroadcast).toBuffer
+ val rddBuffer = (1 to numRdds).map(i => randomRdd()).toBuffer
+ val broadcastBuffer = (1 to numBroadcasts).map(i => randomBroadcast()).toBuffer
val rddIds = sc.persistentRdds.keys.toSeq
val shuffleIds = 0 until sc.newShuffleId
val broadcastIds = 0L until numBroadcasts
@@ -197,17 +208,18 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
//------ Helper functions ------
- def newRDD = sc.makeRDD(1 to 10)
- def newPairRDD = newRDD.map(_ -> 1)
- def newShuffleRDD = newPairRDD.reduceByKey(_ + _)
- def newBroadcast = sc.broadcast(1 to 100)
- def newRDDWithShuffleDependencies: (RDD[_], Seq[ShuffleDependency[_, _, _]]) = {
+ private def newRDD() = sc.makeRDD(1 to 10)
+ private def newPairRDD() = newRDD().map(_ -> 1)
+ private def newShuffleRDD() = newPairRDD().reduceByKey(_ + _)
+ private def newBroadcast() = sc.broadcast(1 to 100)
+
+ private def newRDDWithShuffleDependencies(): (RDD[_], Seq[ShuffleDependency[_, _, _]]) = {
def getAllDependencies(rdd: RDD[_]): Seq[Dependency[_]] = {
rdd.dependencies ++ rdd.dependencies.flatMap { dep =>
getAllDependencies(dep.rdd)
}
}
- val rdd = newShuffleRDD
+ val rdd = newShuffleRDD()
// Get all the shuffle dependencies
val shuffleDeps = getAllDependencies(rdd)
@@ -216,34 +228,34 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
(rdd, shuffleDeps)
}
- def randomRdd = {
+ private def randomRdd() = {
val rdd: RDD[_] = Random.nextInt(3) match {
- case 0 => newRDD
- case 1 => newShuffleRDD
- case 2 => newPairRDD.join(newPairRDD)
+ case 0 => newRDD()
+ case 1 => newShuffleRDD()
+ case 2 => newPairRDD.join(newPairRDD())
}
if (Random.nextBoolean()) rdd.persist()
rdd.count()
rdd
}
- def randomBroadcast = {
+ private def randomBroadcast() = {
sc.broadcast(Random.nextInt(Int.MaxValue))
}
/** Run GC and make sure it actually has run */
- def runGC() {
+ private def runGC() {
val weakRef = new WeakReference(new Object())
val startTime = System.currentTimeMillis
System.gc() // Make a best effort to run the garbage collection. It *usually* runs GC.
// Wait until a weak reference object has been GCed
- while(System.currentTimeMillis - startTime < 10000 && weakRef.get != null) {
+ while (System.currentTimeMillis - startTime < 10000 && weakRef.get != null) {
System.gc()
Thread.sleep(200)
}
}
- def cleaner = sc.cleaner.get
+ private def cleaner = sc.cleaner.get
}