diff options
author | Ali Ghodsi <alig@cs.berkeley.edu> | 2013-08-16 14:03:45 -0700 |
---|---|---|
committer | Ali Ghodsi <alig@cs.berkeley.edu> | 2013-08-20 16:16:04 -0700 |
commit | b69e7166ba76d35d75b98015b0d39a8a004a7436 (patch) | |
tree | a38fe0b85522ce620f7858eb3fc339e3d280e559 | |
parent | 3b5bb8a4ae1ebc0bbfa34c908a99274c343fe883 (diff) | |
download | spark-b69e7166ba76d35d75b98015b0d39a8a004a7436.tar.gz spark-b69e7166ba76d35d75b98015b0d39a8a004a7436.tar.bz2 spark-b69e7166ba76d35d75b98015b0d39a8a004a7436.zip |
Coalescer now uses current preferred locations for derived RDDs. Made run() in DAGScheduler thread safe and added a method to be able to ask it for preferred locations. Added a similar method that wraps the former inside SparkContext.
-rw-r--r-- | core/src/main/scala/spark/SparkContext.scala | 9 | ||||
-rw-r--r-- | core/src/main/scala/spark/rdd/CoalescedRDD.scala | 30 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/DAGScheduler.scala | 40 | ||||
-rw-r--r-- | core/src/test/scala/spark/RDDSuite.scala | 14 |
4 files changed, 59 insertions, 34 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index fdd2dfa810..544c971efc 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -615,6 +615,15 @@ class SparkContext( } /** + * Gets the locality information associated with the partition in a particular rdd + * @param rdd of interest + * @param partition to be looked up for locality + * @return list of preferred locations for the partition + */ + def getPreferredLocs(rdd: RDD[_], partition: Int): List[String] = + dagScheduler.getPreferredLocs(rdd, partition) + + /** * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported * filesystems), or an HTTP, HTTPS or FTP URI. diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala index e6fe8ec8ee..01d4bcadc2 100644 --- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala @@ -17,9 +17,11 @@ package spark.rdd -import spark.{Dependency, OneToOneDependency, NarrowDependency, RDD, Partition, TaskContext} +import spark._ import java.io.{ObjectOutputStream, IOException} import scala.collection.mutable +import scala.Some +import spark.rdd.CoalescedRDDPartition private[spark] case class CoalescedRDDPartition( index: Int, @@ -42,7 +44,8 @@ private[spark] case class CoalescedRDDPartition( * @return locality of this coalesced partition between 0 and 1 */ def localFraction: Double = { - val loc = parents.count(p => rdd.preferredLocations(p).contains(preferredLocation)) + val loc = parents.count(p => rdd.context.getPreferredLocs(rdd, p.index) + .contains(preferredLocation)) if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble) } } @@ -126,8 +129,8 @@ class CoalescedRDD[T: ClassManifest]( } -private[spark] class PartitionCoalescer (maxPartitions: Int, prev: RDD[_], balanceSlack: Double) { - +private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], + balanceSlack: Double) { protected def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size protected def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean = if (o1 == None) false else if (o2 == None) true else compare(o1.get, o2.get) @@ -149,6 +152,10 @@ private[spark] class PartitionCoalescer (maxPartitions: Int, prev: RDD[_], balan protected var noLocality = true // if true if no preferredLocations exists for parent RDD + // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones) + protected def currentPreferredLocations(rdd: RDD[_], part: Partition) : Seq[String] = + rdd.context.getPreferredLocs(rdd, part.index) + // this class just keeps iterating and rotating infinitely over the partitions of the RDD // next() returns the next preferred machine that a partition is replicated on // the rotator first goes through the first replica copy of each partition, then second, third @@ -161,14 +168,14 @@ private[spark] class PartitionCoalescer (maxPartitions: Int, prev: RDD[_], balan // initializes/resets to start iterating from the beginning protected def resetIterator() = { val i1 = prev.partitions.view.map{ p: Partition => - { if (prev.preferredLocations(p).length > 0) - Some((prev.preferredLocations(p)(0),p)) else None } } + { if (currentPreferredLocations(prev, p).length > 0) + Some((currentPreferredLocations(prev, p)(0),p)) else None } } val i2 = prev.partitions.view.map{ p: Partition => - { if (prev.preferredLocations(p).length > 1) - Some((prev.preferredLocations(p)(1),p)) else None } } + { if (currentPreferredLocations(prev, p).length > 1) + Some((currentPreferredLocations(prev, p)(1),p)) else None } } val i3 = prev.partitions.view.map{ p: Partition => - { if (prev.preferredLocations(p).length > 2) - Some((prev.preferredLocations(p)(2),p)) else None } } + { if (currentPreferredLocations(prev, p).length > 2) + Some((currentPreferredLocations(prev, p)(2),p)) else None } } val res = List(i1,i2,i3) res.view.flatMap(x => x).flatten.iterator // fuses the 3 iterators (1st replica, 2nd, 3rd) } @@ -265,7 +272,8 @@ private[spark] class PartitionCoalescer (maxPartitions: Int, prev: RDD[_], balan * @return partition group (bin to be put in) */ protected def pickBin(p: Partition): PartitionGroup = { - val pref = prev.preferredLocations(p).map(getLeastGroupHash(_)).sortWith(compare) // least load + val pref = prev.context.getPreferredLocs(prev, p.index). + map(getLeastGroupHash(_)).sortWith(compare) // least loaded of the pref locations val prefPart = if (pref == Nil) None else pref.head val r1 = rnd.nextInt(groupArr.size) diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 9402f18a0f..7275bd346a 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -435,23 +435,24 @@ class DAGScheduler( if (event != null) { logDebug("Got event of type " + event.getClass.getName) } - - if (event != null) { - if (processEvent(event)) { - return + this.synchronized { // needed in case other threads makes calls into methods of this class + if (event != null) { + if (processEvent(event)) { + return + } } - } - val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability - // Periodically resubmit failed stages if some map output fetches have failed and we have - // waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails, - // tasks on many other nodes are bound to get a fetch failure, and they won't all get it at - // the same time, so we want to make sure we've identified all the reduce tasks that depend - // on the failed node. - if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) { - resubmitFailedStages() - } else { - submitWaitingStages() + val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability + // Periodically resubmit failed stages if some map output fetches have failed and we have + // waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails, + // tasks on many other nodes are bound to get a fetch failure, and they won't all get it at + // the same time, so we want to make sure we've identified all the reduce tasks that depend + // on the failed node. + if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) { + resubmitFailedStages() + } else { + submitWaitingStages() + } } } } @@ -789,7 +790,14 @@ class DAGScheduler( visitedRdds.contains(target.rdd) } - private def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = { + /** + * Synchronized method that might be called from other threads. + * @param rdd whose partitions are to be looked at + * @param partition to lookup locality information for + * @return list of machines that are preferred by the partition + */ + private[spark] + def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = synchronized { // If the partition is cached, return the cache locations val cached = getCacheLocs(rdd)(partition) if (!cached.isEmpty) { diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index ad91263322..0532435288 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -221,13 +221,13 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(maxImbalance <= 20, "Expected 100 +/- 20 per partition, but got " + maxImbalance) // TDD: Test for later when we have implemented functionality to get locality from DAGScheduler -// val data3 = sc.makeRDD(blocks).map( i => i*2 ) -// val coalesced3 = data3.coalesce(numMachines*2) -// val minLocality2 = coalesced3.partitions -// .map( part => part.asInstanceOf[CoalescedRDDPartition].localFraction ) -// .foldLeft(1.)( (perc, loc) => math.min(perc,loc) ) -// assert(minLocality2 >= 0.90, "Expected 90% locality for derived RDD but got " + -// (minLocality2*100.).toInt + "%") + val data3 = sc.makeRDD(blocks).map( i => i*2 ) + val coalesced3 = data3.coalesce(numMachines*2) + val minLocality2 = coalesced3.partitions + .map( part => part.asInstanceOf[CoalescedRDDPartition].localFraction ) + .foldLeft(1.)( (perc, loc) => math.min(perc,loc) ) + assert(minLocality2 >= 0.90, "Expected 90% locality for derived RDD but got " + + (minLocality2*100.).toInt + "%") } test("zipped RDDs") { |