diff options
author | Ali Ghodsi <alig@cs.berkeley.edu> | 2013-08-20 16:21:43 -0700 |
---|---|---|
committer | Ali Ghodsi <alig@cs.berkeley.edu> | 2013-08-20 16:21:43 -0700 |
commit | f20ed14e87aa1b9ff148f44b590ffd3c9d024f3c (patch) | |
tree | 50bcc906d7003e6f90ed7c1e4ed5da580d562ce1 | |
parent | 5cd21c41950e3004d3326bbc56286285531063f9 (diff) | |
download | spark-f20ed14e87aa1b9ff148f44b590ffd3c9d024f3c.tar.gz spark-f20ed14e87aa1b9ff148f44b590ffd3c9d024f3c.tar.bz2 spark-f20ed14e87aa1b9ff148f44b590ffd3c9d024f3c.zip |
Merged in from upstream to use TaskLocation instead of strings
-rw-r--r-- | core/src/main/scala/spark/SparkContext.scala | 9 | ||||
-rw-r--r-- | core/src/main/scala/spark/rdd/CoalescedRDD.scala | 10 |
2 files changed, 11 insertions, 8 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index a8cc4f3eb8..23dfbcd604 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -56,8 +56,7 @@ import spark.deploy.LocalSparkCluster import spark.partial.{ApproximateEvaluator, PartialResult} import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD, OrderedRDDFunctions} -import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener, - SplitInfo, Stage, StageInfo, TaskScheduler} +import spark.scheduler._ import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler, Schedulable, SchedulingMode} import spark.scheduler.local.LocalScheduler @@ -65,6 +64,10 @@ import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend import spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource} import spark.ui.SparkUI import spark.util.{MetadataCleaner, TimeStampedHashMap} +import scala.Some +import spark.scheduler.StageInfo +import spark.storage.RDDInfo +import spark.storage.StorageStatus /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -620,7 +623,7 @@ class SparkContext( * @param partition to be looked up for locality * @return list of preferred locations for the partition */ - private [spark] def getPreferredLocs(rdd: RDD[_], partition: Int): List[String] = { + private [spark] def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = { dagScheduler.getPreferredLocs(rdd, partition) } diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala index f880cea2ad..e612d026b2 100644 --- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala @@ -52,7 +52,7 @@ case class CoalescedRDDPartition( */ def localFraction: Double = { val loc = parents.count(p => - rdd.context.getPreferredLocs(rdd, p.index).contains(preferredLocation)) + rdd.context.getPreferredLocs(rdd, p.index).map(tl => tl.host).contains(preferredLocation)) if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble) } @@ -167,8 +167,9 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc var noLocality = true // if true if no preferredLocations exists for parent RDD // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones) - def currPrefLocs(part: Partition): Seq[String] = - prev.context.getPreferredLocs(prev, part.index) + def currPrefLocs(part: Partition): Seq[String] = { + prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host) + } // this class just keeps iterating and rotating infinitely over the partitions of the RDD // next() returns the next preferred machine that a partition is replicated on @@ -282,8 +283,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc * @return partition group (bin to be put in) */ def pickBin(p: Partition): PartitionGroup = { - val pref = prev.context.getPreferredLocs(prev, p.index). - map(getLeastGroupHash(_)).sortWith(compare) // least loaded of the pref locations + val pref = currPrefLocs(p).map(getLeastGroupHash(_)).sortWith(compare) // least loaded pref locs val prefPart = if (pref == Nil) None else pref.head val r1 = rnd.nextInt(groupArr.size) |