diff options
Diffstat (limited to 'core/src/test')
-rw-r--r-- | core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala | 62 |
1 files changed, 25 insertions, 37 deletions
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index 871f831531..13b415cccb 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -52,8 +52,9 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo } } + test("cleanup RDD") { - val rdd = newRDD().persist() + val rdd = newRDD.persist() val collected = rdd.collect().toList val tester = new CleanerTester(sc, rddIds = Seq(rdd.id)) @@ -66,7 +67,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo } test("cleanup shuffle") { - val (rdd, shuffleDeps) = newRDDWithShuffleDependencies() + val (rdd, shuffleDeps) = newRDDWithShuffleDependencies val collected = rdd.collect().toList val tester = new CleanerTester(sc, shuffleIds = shuffleDeps.map(_.shuffleId)) @@ -79,7 +80,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo } test("cleanup broadcast") { - val broadcast = newBroadcast() + val broadcast = newBroadcast val tester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id)) // Explicit cleanup @@ -88,7 +89,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo } test("automatically cleanup RDD") { - var rdd = newRDD().persist() + var rdd = newRDD.persist() rdd.count() // Test that GC does not cause RDD cleanup due to a strong reference @@ -106,7 +107,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo } test("automatically cleanup shuffle") { - var rdd = newShuffleRDD() + var rdd = newShuffleRDD rdd.count() // Test that GC does not cause shuffle cleanup due to a strong reference @@ -124,7 +125,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo } test("automatically cleanup broadcast") { - var broadcast = newBroadcast() + var broadcast = newBroadcast // Test that GC does not cause broadcast cleanup due to a strong reference val preGCTester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id)) @@ -140,23 +141,11 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo postGCTester.assertCleanup() } - test("automatically cleanup broadcast data for task dispatching") { - var rdd = newRDDWithShuffleDependencies()._1 - rdd.count() // This triggers an action that broadcasts the RDDs. - - // Test that GC causes broadcast task data cleanup after dereferencing the RDD. - val postGCTester = new CleanerTester(sc, - broadcastIds = Seq(rdd.broadcasted.id, rdd.firstParent.broadcasted.id)) - rdd = null - runGC() - postGCTester.assertCleanup() - } - test("automatically cleanup RDD + shuffle + broadcast") { val numRdds = 100 val numBroadcasts = 4 // Broadcasts are more costly - val rddBuffer = (1 to numRdds).map(i => randomRdd()).toBuffer - val broadcastBuffer = (1 to numBroadcasts).map(i => randomBroadcast()).toBuffer + val rddBuffer = (1 to numRdds).map(i => randomRdd).toBuffer + val broadcastBuffer = (1 to numBroadcasts).map(i => randomBroadcast).toBuffer val rddIds = sc.persistentRdds.keys.toSeq val shuffleIds = 0 until sc.newShuffleId val broadcastIds = 0L until numBroadcasts @@ -186,8 +175,8 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo val numRdds = 10 val numBroadcasts = 4 // Broadcasts are more costly - val rddBuffer = (1 to numRdds).map(i => randomRdd()).toBuffer - val broadcastBuffer = (1 to numBroadcasts).map(i => randomBroadcast()).toBuffer + val rddBuffer = (1 to numRdds).map(i => randomRdd).toBuffer + val broadcastBuffer = (1 to numBroadcasts).map(i => randomBroadcast).toBuffer val rddIds = sc.persistentRdds.keys.toSeq val shuffleIds = 0 until sc.newShuffleId val broadcastIds = 0L until numBroadcasts @@ -208,18 +197,17 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo //------ Helper functions ------ - private def newRDD() = sc.makeRDD(1 to 10) - private def newPairRDD() = newRDD().map(_ -> 1) - private def newShuffleRDD() = newPairRDD().reduceByKey(_ + _) - private def newBroadcast() = sc.broadcast(1 to 100) - - private def newRDDWithShuffleDependencies(): (RDD[_], Seq[ShuffleDependency[_, _, _]]) = { + def newRDD = sc.makeRDD(1 to 10) + def newPairRDD = newRDD.map(_ -> 1) + def newShuffleRDD = newPairRDD.reduceByKey(_ + _) + def newBroadcast = sc.broadcast(1 to 100) + def newRDDWithShuffleDependencies: (RDD[_], Seq[ShuffleDependency[_, _, _]]) = { def getAllDependencies(rdd: RDD[_]): Seq[Dependency[_]] = { rdd.dependencies ++ rdd.dependencies.flatMap { dep => getAllDependencies(dep.rdd) } } - val rdd = newShuffleRDD() + val rdd = newShuffleRDD // Get all the shuffle dependencies val shuffleDeps = getAllDependencies(rdd) @@ -228,34 +216,34 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo (rdd, shuffleDeps) } - private def randomRdd() = { + def randomRdd = { val rdd: RDD[_] = Random.nextInt(3) match { - case 0 => newRDD() - case 1 => newShuffleRDD() - case 2 => newPairRDD.join(newPairRDD()) + case 0 => newRDD + case 1 => newShuffleRDD + case 2 => newPairRDD.join(newPairRDD) } if (Random.nextBoolean()) rdd.persist() rdd.count() rdd } - private def randomBroadcast() = { + def randomBroadcast = { sc.broadcast(Random.nextInt(Int.MaxValue)) } /** Run GC and make sure it actually has run */ - private def runGC() { + def runGC() { val weakRef = new WeakReference(new Object()) val startTime = System.currentTimeMillis System.gc() // Make a best effort to run the garbage collection. It *usually* runs GC. // Wait until a weak reference object has been GCed - while (System.currentTimeMillis - startTime < 10000 && weakRef.get != null) { + while(System.currentTimeMillis - startTime < 10000 && weakRef.get != null) { System.gc() Thread.sleep(200) } } - private def cleaner = sc.cleaner.get + def cleaner = sc.cleaner.get } |