aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-07-18 23:52:47 -0700
committerReynold Xin <rxin@apache.org>2014-07-18 23:52:47 -0700
commit7b8cd175254d42c8e82f0aa8eb4b7f3508d8fde2 (patch)
tree616eedb38efffeaffca1d2d583e07f2bdf3947ca /core/src/test
parenta243364b225da9a91813234027eafedffc495ecc (diff)
downloadspark-7b8cd175254d42c8e82f0aa8eb4b7f3508d8fde2.tar.gz
spark-7b8cd175254d42c8e82f0aa8eb4b7f3508d8fde2.tar.bz2
spark-7b8cd175254d42c8e82f0aa8eb4b7f3508d8fde2.zip
[SPARK-2521] Broadcast RDD object (instead of sending it along with every task).
Currently (as of Spark 1.0.1), Spark sends RDD object (which contains closures) using Akka along with the task itself to the executors. This is inefficient because all tasks in the same stage use the same RDD object, but we have to send RDD object multiple times to the executors. This is especially bad when a closure references some variable that is very large. The current design led to users having to explicitly broadcast large variables. The patch uses broadcast to send RDD objects and the closures to executors, and use Akka to only send a reference to the broadcast RDD/closure along with the partition specific information for the task. For those of you who know more about the internals, Spark already relies on broadcast to send the Hadoop JobConf every time it uses the Hadoop input, because the JobConf is large. The user-facing impact of the change include: 1. Users won't need to decide what to broadcast anymore, unless they would want to use a large object multiple times in different operations 2. Task size will get smaller, resulting in faster scheduling and higher task dispatch throughput. In addition, the change will simplify some internals of Spark, eliminating the need to maintain task caches and the complex logic to broadcast JobConf (which also led to a deadlock recently). A simple way to test this: ```scala val a = new Array[Byte](1000*1000); scala.util.Random.nextBytes(a); sc.parallelize(1 to 1000, 1000).map { x => a; x }.groupBy { x => a; x }.count ``` Numbers on 3 r3.8xlarge instances on EC2 ``` master branch: 5.648436068 s, 4.715361895 s, 5.360161877 s with this change: 3.416348793 s, 1.477846558 s, 1.553432156 s ``` Author: Reynold Xin <rxin@apache.org> Closes #1452 from rxin/broadcast-task and squashes the following commits: 762e0be [Reynold Xin] Warn large broadcasts. ade6eac [Reynold Xin] Log broadcast size. c3b6f11 [Reynold Xin] Added a unit test for clean up. 754085f [Reynold Xin] Explain why broadcasting serialized copy of the task. 04b17f0 [Reynold Xin] [SPARK-2521] Broadcast RDD object once per TaskSet (instead of sending it for every task).
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala62
1 files changed, 37 insertions, 25 deletions
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index 13b415cccb..871f831531 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -52,9 +52,8 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
}
}
-
test("cleanup RDD") {
- val rdd = newRDD.persist()
+ val rdd = newRDD().persist()
val collected = rdd.collect().toList
val tester = new CleanerTester(sc, rddIds = Seq(rdd.id))
@@ -67,7 +66,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
}
test("cleanup shuffle") {
- val (rdd, shuffleDeps) = newRDDWithShuffleDependencies
+ val (rdd, shuffleDeps) = newRDDWithShuffleDependencies()
val collected = rdd.collect().toList
val tester = new CleanerTester(sc, shuffleIds = shuffleDeps.map(_.shuffleId))
@@ -80,7 +79,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
}
test("cleanup broadcast") {
- val broadcast = newBroadcast
+ val broadcast = newBroadcast()
val tester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id))
// Explicit cleanup
@@ -89,7 +88,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
}
test("automatically cleanup RDD") {
- var rdd = newRDD.persist()
+ var rdd = newRDD().persist()
rdd.count()
// Test that GC does not cause RDD cleanup due to a strong reference
@@ -107,7 +106,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
}
test("automatically cleanup shuffle") {
- var rdd = newShuffleRDD
+ var rdd = newShuffleRDD()
rdd.count()
// Test that GC does not cause shuffle cleanup due to a strong reference
@@ -125,7 +124,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
}
test("automatically cleanup broadcast") {
- var broadcast = newBroadcast
+ var broadcast = newBroadcast()
// Test that GC does not cause broadcast cleanup due to a strong reference
val preGCTester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id))
@@ -141,11 +140,23 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
postGCTester.assertCleanup()
}
+ test("automatically cleanup broadcast data for task dispatching") {
+ var rdd = newRDDWithShuffleDependencies()._1
+ rdd.count() // This triggers an action that broadcasts the RDDs.
+
+ // Test that GC causes broadcast task data cleanup after dereferencing the RDD.
+ val postGCTester = new CleanerTester(sc,
+ broadcastIds = Seq(rdd.broadcasted.id, rdd.firstParent.broadcasted.id))
+ rdd = null
+ runGC()
+ postGCTester.assertCleanup()
+ }
+
test("automatically cleanup RDD + shuffle + broadcast") {
val numRdds = 100
val numBroadcasts = 4 // Broadcasts are more costly
- val rddBuffer = (1 to numRdds).map(i => randomRdd).toBuffer
- val broadcastBuffer = (1 to numBroadcasts).map(i => randomBroadcast).toBuffer
+ val rddBuffer = (1 to numRdds).map(i => randomRdd()).toBuffer
+ val broadcastBuffer = (1 to numBroadcasts).map(i => randomBroadcast()).toBuffer
val rddIds = sc.persistentRdds.keys.toSeq
val shuffleIds = 0 until sc.newShuffleId
val broadcastIds = 0L until numBroadcasts
@@ -175,8 +186,8 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
val numRdds = 10
val numBroadcasts = 4 // Broadcasts are more costly
- val rddBuffer = (1 to numRdds).map(i => randomRdd).toBuffer
- val broadcastBuffer = (1 to numBroadcasts).map(i => randomBroadcast).toBuffer
+ val rddBuffer = (1 to numRdds).map(i => randomRdd()).toBuffer
+ val broadcastBuffer = (1 to numBroadcasts).map(i => randomBroadcast()).toBuffer
val rddIds = sc.persistentRdds.keys.toSeq
val shuffleIds = 0 until sc.newShuffleId
val broadcastIds = 0L until numBroadcasts
@@ -197,17 +208,18 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
//------ Helper functions ------
- def newRDD = sc.makeRDD(1 to 10)
- def newPairRDD = newRDD.map(_ -> 1)
- def newShuffleRDD = newPairRDD.reduceByKey(_ + _)
- def newBroadcast = sc.broadcast(1 to 100)
- def newRDDWithShuffleDependencies: (RDD[_], Seq[ShuffleDependency[_, _, _]]) = {
+ private def newRDD() = sc.makeRDD(1 to 10)
+ private def newPairRDD() = newRDD().map(_ -> 1)
+ private def newShuffleRDD() = newPairRDD().reduceByKey(_ + _)
+ private def newBroadcast() = sc.broadcast(1 to 100)
+
+ private def newRDDWithShuffleDependencies(): (RDD[_], Seq[ShuffleDependency[_, _, _]]) = {
def getAllDependencies(rdd: RDD[_]): Seq[Dependency[_]] = {
rdd.dependencies ++ rdd.dependencies.flatMap { dep =>
getAllDependencies(dep.rdd)
}
}
- val rdd = newShuffleRDD
+ val rdd = newShuffleRDD()
// Get all the shuffle dependencies
val shuffleDeps = getAllDependencies(rdd)
@@ -216,34 +228,34 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
(rdd, shuffleDeps)
}
- def randomRdd = {
+ private def randomRdd() = {
val rdd: RDD[_] = Random.nextInt(3) match {
- case 0 => newRDD
- case 1 => newShuffleRDD
- case 2 => newPairRDD.join(newPairRDD)
+ case 0 => newRDD()
+ case 1 => newShuffleRDD()
+ case 2 => newPairRDD.join(newPairRDD())
}
if (Random.nextBoolean()) rdd.persist()
rdd.count()
rdd
}
- def randomBroadcast = {
+ private def randomBroadcast() = {
sc.broadcast(Random.nextInt(Int.MaxValue))
}
/** Run GC and make sure it actually has run */
- def runGC() {
+ private def runGC() {
val weakRef = new WeakReference(new Object())
val startTime = System.currentTimeMillis
System.gc() // Make a best effort to run the garbage collection. It *usually* runs GC.
// Wait until a weak reference object has been GCed
- while(System.currentTimeMillis - startTime < 10000 && weakRef.get != null) {
+ while (System.currentTimeMillis - startTime < 10000 && weakRef.get != null) {
System.gc()
Thread.sleep(200)
}
}
- def cleaner = sc.cleaner.get
+ private def cleaner = sc.cleaner.get
}