aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-07-19 16:56:22 -0700
committerReynold Xin <rxin@apache.org>2014-07-19 16:56:22 -0700
commit1efb3698b6cf39a80683b37124d2736ebf3c9d9a (patch)
tree3e4648ec06a8f395a15cd0ce363bf7f8bfc1d667 /core/src/test
parent2a732110d46712c535b75dd4f5a73761b6463aa8 (diff)
downloadspark-1efb3698b6cf39a80683b37124d2736ebf3c9d9a.tar.gz
spark-1efb3698b6cf39a80683b37124d2736ebf3c9d9a.tar.bz2
spark-1efb3698b6cf39a80683b37124d2736ebf3c9d9a.zip
Revert "[SPARK-2521] Broadcast RDD object (instead of sending it along with every task)."
This reverts commit 7b8cd175254d42c8e82f0aa8eb4b7f3508d8fde2.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala62
1 files changed, 25 insertions, 37 deletions
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index 871f831531..13b415cccb 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -52,8 +52,9 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
}
}
+
test("cleanup RDD") {
- val rdd = newRDD().persist()
+ val rdd = newRDD.persist()
val collected = rdd.collect().toList
val tester = new CleanerTester(sc, rddIds = Seq(rdd.id))
@@ -66,7 +67,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
}
test("cleanup shuffle") {
- val (rdd, shuffleDeps) = newRDDWithShuffleDependencies()
+ val (rdd, shuffleDeps) = newRDDWithShuffleDependencies
val collected = rdd.collect().toList
val tester = new CleanerTester(sc, shuffleIds = shuffleDeps.map(_.shuffleId))
@@ -79,7 +80,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
}
test("cleanup broadcast") {
- val broadcast = newBroadcast()
+ val broadcast = newBroadcast
val tester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id))
// Explicit cleanup
@@ -88,7 +89,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
}
test("automatically cleanup RDD") {
- var rdd = newRDD().persist()
+ var rdd = newRDD.persist()
rdd.count()
// Test that GC does not cause RDD cleanup due to a strong reference
@@ -106,7 +107,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
}
test("automatically cleanup shuffle") {
- var rdd = newShuffleRDD()
+ var rdd = newShuffleRDD
rdd.count()
// Test that GC does not cause shuffle cleanup due to a strong reference
@@ -124,7 +125,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
}
test("automatically cleanup broadcast") {
- var broadcast = newBroadcast()
+ var broadcast = newBroadcast
// Test that GC does not cause broadcast cleanup due to a strong reference
val preGCTester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id))
@@ -140,23 +141,11 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
postGCTester.assertCleanup()
}
- test("automatically cleanup broadcast data for task dispatching") {
- var rdd = newRDDWithShuffleDependencies()._1
- rdd.count() // This triggers an action that broadcasts the RDDs.
-
- // Test that GC causes broadcast task data cleanup after dereferencing the RDD.
- val postGCTester = new CleanerTester(sc,
- broadcastIds = Seq(rdd.broadcasted.id, rdd.firstParent.broadcasted.id))
- rdd = null
- runGC()
- postGCTester.assertCleanup()
- }
-
test("automatically cleanup RDD + shuffle + broadcast") {
val numRdds = 100
val numBroadcasts = 4 // Broadcasts are more costly
- val rddBuffer = (1 to numRdds).map(i => randomRdd()).toBuffer
- val broadcastBuffer = (1 to numBroadcasts).map(i => randomBroadcast()).toBuffer
+ val rddBuffer = (1 to numRdds).map(i => randomRdd).toBuffer
+ val broadcastBuffer = (1 to numBroadcasts).map(i => randomBroadcast).toBuffer
val rddIds = sc.persistentRdds.keys.toSeq
val shuffleIds = 0 until sc.newShuffleId
val broadcastIds = 0L until numBroadcasts
@@ -186,8 +175,8 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
val numRdds = 10
val numBroadcasts = 4 // Broadcasts are more costly
- val rddBuffer = (1 to numRdds).map(i => randomRdd()).toBuffer
- val broadcastBuffer = (1 to numBroadcasts).map(i => randomBroadcast()).toBuffer
+ val rddBuffer = (1 to numRdds).map(i => randomRdd).toBuffer
+ val broadcastBuffer = (1 to numBroadcasts).map(i => randomBroadcast).toBuffer
val rddIds = sc.persistentRdds.keys.toSeq
val shuffleIds = 0 until sc.newShuffleId
val broadcastIds = 0L until numBroadcasts
@@ -208,18 +197,17 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
//------ Helper functions ------
- private def newRDD() = sc.makeRDD(1 to 10)
- private def newPairRDD() = newRDD().map(_ -> 1)
- private def newShuffleRDD() = newPairRDD().reduceByKey(_ + _)
- private def newBroadcast() = sc.broadcast(1 to 100)
-
- private def newRDDWithShuffleDependencies(): (RDD[_], Seq[ShuffleDependency[_, _, _]]) = {
+ def newRDD = sc.makeRDD(1 to 10)
+ def newPairRDD = newRDD.map(_ -> 1)
+ def newShuffleRDD = newPairRDD.reduceByKey(_ + _)
+ def newBroadcast = sc.broadcast(1 to 100)
+ def newRDDWithShuffleDependencies: (RDD[_], Seq[ShuffleDependency[_, _, _]]) = {
def getAllDependencies(rdd: RDD[_]): Seq[Dependency[_]] = {
rdd.dependencies ++ rdd.dependencies.flatMap { dep =>
getAllDependencies(dep.rdd)
}
}
- val rdd = newShuffleRDD()
+ val rdd = newShuffleRDD
// Get all the shuffle dependencies
val shuffleDeps = getAllDependencies(rdd)
@@ -228,34 +216,34 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
(rdd, shuffleDeps)
}
- private def randomRdd() = {
+ def randomRdd = {
val rdd: RDD[_] = Random.nextInt(3) match {
- case 0 => newRDD()
- case 1 => newShuffleRDD()
- case 2 => newPairRDD.join(newPairRDD())
+ case 0 => newRDD
+ case 1 => newShuffleRDD
+ case 2 => newPairRDD.join(newPairRDD)
}
if (Random.nextBoolean()) rdd.persist()
rdd.count()
rdd
}
- private def randomBroadcast() = {
+ def randomBroadcast = {
sc.broadcast(Random.nextInt(Int.MaxValue))
}
/** Run GC and make sure it actually has run */
- private def runGC() {
+ def runGC() {
val weakRef = new WeakReference(new Object())
val startTime = System.currentTimeMillis
System.gc() // Make a best effort to run the garbage collection. It *usually* runs GC.
// Wait until a weak reference object has been GCed
- while (System.currentTimeMillis - startTime < 10000 && weakRef.get != null) {
+ while(System.currentTimeMillis - startTime < 10000 && weakRef.get != null) {
System.gc()
Thread.sleep(200)
}
}
- private def cleaner = sc.cleaner.get
+ def cleaner = sc.cleaner.get
}