aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAli Ghodsi <alig@cs.berkeley.edu>2013-08-16 14:03:45 -0700
committerAli Ghodsi <alig@cs.berkeley.edu>2013-08-20 16:16:04 -0700
commitb69e7166ba76d35d75b98015b0d39a8a004a7436 (patch)
treea38fe0b85522ce620f7858eb3fc339e3d280e559 /core
parent3b5bb8a4ae1ebc0bbfa34c908a99274c343fe883 (diff)
downloadspark-b69e7166ba76d35d75b98015b0d39a8a004a7436.tar.gz
spark-b69e7166ba76d35d75b98015b0d39a8a004a7436.tar.bz2
spark-b69e7166ba76d35d75b98015b0d39a8a004a7436.zip
Coalescer now uses current preferred locations for derived RDDs. Made run() in DAGScheduler thread safe and added a method to be able to ask it for preferred locations. Added a similar method that wraps the former inside SparkContext.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/SparkContext.scala9
-rw-r--r--core/src/main/scala/spark/rdd/CoalescedRDD.scala30
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala40
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala14
4 files changed, 59 insertions, 34 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index fdd2dfa810..544c971efc 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -615,6 +615,15 @@ class SparkContext(
}
/**
+ * Gets the locality information associated with the partition in a particular rdd
+ * @param rdd of interest
+ * @param partition to be looked up for locality
+ * @return list of preferred locations for the partition
+ */
+ def getPreferredLocs(rdd: RDD[_], partition: Int): List[String] =
+ dagScheduler.getPreferredLocs(rdd, partition)
+
+ /**
* Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), or an HTTP, HTTPS or FTP URI.
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
index e6fe8ec8ee..01d4bcadc2 100644
--- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
@@ -17,9 +17,11 @@
package spark.rdd
-import spark.{Dependency, OneToOneDependency, NarrowDependency, RDD, Partition, TaskContext}
+import spark._
import java.io.{ObjectOutputStream, IOException}
import scala.collection.mutable
+import scala.Some
+import spark.rdd.CoalescedRDDPartition
private[spark] case class CoalescedRDDPartition(
index: Int,
@@ -42,7 +44,8 @@ private[spark] case class CoalescedRDDPartition(
* @return locality of this coalesced partition between 0 and 1
*/
def localFraction: Double = {
- val loc = parents.count(p => rdd.preferredLocations(p).contains(preferredLocation))
+ val loc = parents.count(p => rdd.context.getPreferredLocs(rdd, p.index)
+ .contains(preferredLocation))
if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble)
}
}
@@ -126,8 +129,8 @@ class CoalescedRDD[T: ClassManifest](
}
-private[spark] class PartitionCoalescer (maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {
-
+private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_],
+ balanceSlack: Double) {
protected def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size
protected def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean =
if (o1 == None) false else if (o2 == None) true else compare(o1.get, o2.get)
@@ -149,6 +152,10 @@ private[spark] class PartitionCoalescer (maxPartitions: Int, prev: RDD[_], balan
protected var noLocality = true // if true if no preferredLocations exists for parent RDD
+ // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
+ protected def currentPreferredLocations(rdd: RDD[_], part: Partition) : Seq[String] =
+ rdd.context.getPreferredLocs(rdd, part.index)
+
// this class just keeps iterating and rotating infinitely over the partitions of the RDD
// next() returns the next preferred machine that a partition is replicated on
// the rotator first goes through the first replica copy of each partition, then second, third
@@ -161,14 +168,14 @@ private[spark] class PartitionCoalescer (maxPartitions: Int, prev: RDD[_], balan
// initializes/resets to start iterating from the beginning
protected def resetIterator() = {
val i1 = prev.partitions.view.map{ p: Partition =>
- { if (prev.preferredLocations(p).length > 0)
- Some((prev.preferredLocations(p)(0),p)) else None } }
+ { if (currentPreferredLocations(prev, p).length > 0)
+ Some((currentPreferredLocations(prev, p)(0),p)) else None } }
val i2 = prev.partitions.view.map{ p: Partition =>
- { if (prev.preferredLocations(p).length > 1)
- Some((prev.preferredLocations(p)(1),p)) else None } }
+ { if (currentPreferredLocations(prev, p).length > 1)
+ Some((currentPreferredLocations(prev, p)(1),p)) else None } }
val i3 = prev.partitions.view.map{ p: Partition =>
- { if (prev.preferredLocations(p).length > 2)
- Some((prev.preferredLocations(p)(2),p)) else None } }
+ { if (currentPreferredLocations(prev, p).length > 2)
+ Some((currentPreferredLocations(prev, p)(2),p)) else None } }
val res = List(i1,i2,i3)
res.view.flatMap(x => x).flatten.iterator // fuses the 3 iterators (1st replica, 2nd, 3rd)
}
@@ -265,7 +272,8 @@ private[spark] class PartitionCoalescer (maxPartitions: Int, prev: RDD[_], balan
* @return partition group (bin to be put in)
*/
protected def pickBin(p: Partition): PartitionGroup = {
- val pref = prev.preferredLocations(p).map(getLeastGroupHash(_)).sortWith(compare) // least load
+ val pref = prev.context.getPreferredLocs(prev, p.index).
+ map(getLeastGroupHash(_)).sortWith(compare) // least loaded of the pref locations
val prefPart = if (pref == Nil) None else pref.head
val r1 = rnd.nextInt(groupArr.size)
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 9402f18a0f..7275bd346a 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -435,23 +435,24 @@ class DAGScheduler(
if (event != null) {
logDebug("Got event of type " + event.getClass.getName)
}
-
- if (event != null) {
- if (processEvent(event)) {
- return
+ this.synchronized { // needed in case other threads makes calls into methods of this class
+ if (event != null) {
+ if (processEvent(event)) {
+ return
+ }
}
- }
- val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability
- // Periodically resubmit failed stages if some map output fetches have failed and we have
- // waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails,
- // tasks on many other nodes are bound to get a fetch failure, and they won't all get it at
- // the same time, so we want to make sure we've identified all the reduce tasks that depend
- // on the failed node.
- if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) {
- resubmitFailedStages()
- } else {
- submitWaitingStages()
+ val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability
+ // Periodically resubmit failed stages if some map output fetches have failed and we have
+ // waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails,
+ // tasks on many other nodes are bound to get a fetch failure, and they won't all get it at
+ // the same time, so we want to make sure we've identified all the reduce tasks that depend
+ // on the failed node.
+ if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) {
+ resubmitFailedStages()
+ } else {
+ submitWaitingStages()
+ }
}
}
}
@@ -789,7 +790,14 @@ class DAGScheduler(
visitedRdds.contains(target.rdd)
}
- private def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
+ /**
+ * Synchronized method that might be called from other threads.
+ * @param rdd whose partitions are to be looked at
+ * @param partition to lookup locality information for
+ * @return list of machines that are preferred by the partition
+ */
+ private[spark]
+ def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = synchronized {
// If the partition is cached, return the cache locations
val cached = getCacheLocs(rdd)(partition)
if (!cached.isEmpty) {
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index ad91263322..0532435288 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -221,13 +221,13 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(maxImbalance <= 20, "Expected 100 +/- 20 per partition, but got " + maxImbalance)
// TDD: Test for later when we have implemented functionality to get locality from DAGScheduler
-// val data3 = sc.makeRDD(blocks).map( i => i*2 )
-// val coalesced3 = data3.coalesce(numMachines*2)
-// val minLocality2 = coalesced3.partitions
-// .map( part => part.asInstanceOf[CoalescedRDDPartition].localFraction )
-// .foldLeft(1.)( (perc, loc) => math.min(perc,loc) )
-// assert(minLocality2 >= 0.90, "Expected 90% locality for derived RDD but got " +
-// (minLocality2*100.).toInt + "%")
+ val data3 = sc.makeRDD(blocks).map( i => i*2 )
+ val coalesced3 = data3.coalesce(numMachines*2)
+ val minLocality2 = coalesced3.partitions
+ .map( part => part.asInstanceOf[CoalescedRDDPartition].localFraction )
+ .foldLeft(1.)( (perc, loc) => math.min(perc,loc) )
+ assert(minLocality2 >= 0.90, "Expected 90% locality for derived RDD but got " +
+ (minLocality2*100.).toInt + "%")
}
test("zipped RDDs") {