aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAli Ghodsi <alig@cs.berkeley.edu>2013-08-20 16:21:43 -0700
committerAli Ghodsi <alig@cs.berkeley.edu>2013-08-20 16:21:43 -0700
commitf20ed14e87aa1b9ff148f44b590ffd3c9d024f3c (patch)
tree50bcc906d7003e6f90ed7c1e4ed5da580d562ce1 /core
parent5cd21c41950e3004d3326bbc56286285531063f9 (diff)
downloadspark-f20ed14e87aa1b9ff148f44b590ffd3c9d024f3c.tar.gz
spark-f20ed14e87aa1b9ff148f44b590ffd3c9d024f3c.tar.bz2
spark-f20ed14e87aa1b9ff148f44b590ffd3c9d024f3c.zip
Merged in from upstream to use TaskLocation instead of strings
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/SparkContext.scala9
-rw-r--r--core/src/main/scala/spark/rdd/CoalescedRDD.scala10
2 files changed, 11 insertions, 8 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index a8cc4f3eb8..23dfbcd604 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -56,8 +56,7 @@ import spark.deploy.LocalSparkCluster
import spark.partial.{ApproximateEvaluator, PartialResult}
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD,
OrderedRDDFunctions}
-import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener,
- SplitInfo, Stage, StageInfo, TaskScheduler}
+import spark.scheduler._
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
ClusterScheduler, Schedulable, SchedulingMode}
import spark.scheduler.local.LocalScheduler
@@ -65,6 +64,10 @@ import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend
import spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource}
import spark.ui.SparkUI
import spark.util.{MetadataCleaner, TimeStampedHashMap}
+import scala.Some
+import spark.scheduler.StageInfo
+import spark.storage.RDDInfo
+import spark.storage.StorageStatus
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -620,7 +623,7 @@ class SparkContext(
* @param partition to be looked up for locality
* @return list of preferred locations for the partition
*/
- private [spark] def getPreferredLocs(rdd: RDD[_], partition: Int): List[String] = {
+ private [spark] def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
dagScheduler.getPreferredLocs(rdd, partition)
}
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
index f880cea2ad..e612d026b2 100644
--- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
@@ -52,7 +52,7 @@ case class CoalescedRDDPartition(
*/
def localFraction: Double = {
val loc = parents.count(p =>
- rdd.context.getPreferredLocs(rdd, p.index).contains(preferredLocation))
+ rdd.context.getPreferredLocs(rdd, p.index).map(tl => tl.host).contains(preferredLocation))
if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble)
}
@@ -167,8 +167,9 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
var noLocality = true // if true if no preferredLocations exists for parent RDD
// gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
- def currPrefLocs(part: Partition): Seq[String] =
- prev.context.getPreferredLocs(prev, part.index)
+ def currPrefLocs(part: Partition): Seq[String] = {
+ prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host)
+ }
// this class just keeps iterating and rotating infinitely over the partitions of the RDD
// next() returns the next preferred machine that a partition is replicated on
@@ -282,8 +283,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
* @return partition group (bin to be put in)
*/
def pickBin(p: Partition): PartitionGroup = {
- val pref = prev.context.getPreferredLocs(prev, p.index).
- map(getLeastGroupHash(_)).sortWith(compare) // least loaded of the pref locations
+ val pref = currPrefLocs(p).map(getLeastGroupHash(_)).sortWith(compare) // least loaded pref locs
val prefPart = if (pref == Nil) None else pref.head
val r1 = rnd.nextInt(groupArr.size)