aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala10
1 files changed, 7 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index c19ed1529b..2ec9846e33 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -169,6 +169,11 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
var noLocality = true // if true if no preferredLocations exists for parent RDD
+ // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
+ def currPrefLocs(part: Partition, prev: RDD[_]): Seq[String] = {
+ prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host)
+ }
+
class PartitionLocations(prev: RDD[_]) {
// contains all the partitions from the previous RDD that don't have preferred locations
@@ -184,7 +189,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
val tmpPartsWithLocs = mutable.LinkedHashMap[Partition, Seq[String]]()
// first get the locations for each partition, only do this once since it can be expensive
prev.partitions.foreach(p => {
- val locs = prev.context.getPreferredLocs(prev, p.index).map(tl => tl.host)
+ val locs = currPrefLocs(p, prev)
if (locs.size > 0) {
tmpPartsWithLocs.put(p, locs)
} else {
@@ -287,9 +292,8 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
balanceSlack: Double,
partitionLocs: PartitionLocations): PartitionGroup = {
val slack = (balanceSlack * prev.partitions.length).toInt
- val preflocs = partitionLocs.partsWithLocs.filter(_._2 == p).map(_._1).toSeq
// least loaded pref locs
- val pref = preflocs.map(getLeastGroupHash(_)).sortWith(compare) // least loaded pref locs
+ val pref = currPrefLocs(p, prev).map(getLeastGroupHash(_)).sortWith(compare)
val prefPart = if (pref == Nil) None else pref.head
val r1 = rnd.nextInt(groupArr.size)