aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/rdd/ZippedRDD.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/rdd/ZippedRDD.scala')
-rw-r--r--core/src/main/scala/spark/rdd/ZippedRDD.scala23
1 files changed, 21 insertions, 2 deletions
diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala
index 35b0e06785..f728e93d24 100644
--- a/core/src/main/scala/spark/rdd/ZippedRDD.scala
+++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala
@@ -1,6 +1,6 @@
package spark.rdd
-import spark.{OneToOneDependency, RDD, SparkContext, Partition, TaskContext}
+import spark.{Utils, OneToOneDependency, RDD, SparkContext, Partition, TaskContext}
import java.io.{ObjectOutputStream, IOException}
@@ -48,8 +48,27 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest](
}
override def getPreferredLocations(s: Partition): Seq[String] = {
+ // Note that as number of slaves in cluster increase, the computed preferredLocations can become small : so we might need
+ // to look at alternate strategies to alleviate this. (If there are no (or very small number of preferred locations), we
+ // will end up transferred the blocks to 'any' node in the cluster - paying with n/w and cache cost.
+ // Maybe pick one or the other ? (so that atleast one block is local ?).
+ // Choose node which is hosting 'larger' of the blocks ?
+ // Look at rack locality to ensure chosen host is atleast rack local to both hosting node ?, etc (would be good to defer this if possible)
val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions
- rdd1.preferredLocations(partition1).intersect(rdd2.preferredLocations(partition2))
+ val pref1 = rdd1.preferredLocations(partition1)
+ val pref2 = rdd2.preferredLocations(partition2)
+
+ // exact match - instance local and host local.
+ val exactMatchLocations = pref1.intersect(pref2)
+
+ // remove locations which are already handled via exactMatchLocations, and intersect where both partitions are node local.
+ val otherNodeLocalPref1 = pref1.filter(loc => ! exactMatchLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1)
+ val otherNodeLocalPref2 = pref2.filter(loc => ! exactMatchLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1)
+ val otherNodeLocalLocations = otherNodeLocalPref1.intersect(otherNodeLocalPref2)
+
+
+ // Can have mix of instance local (hostPort) and node local (host) locations as preference !
+ exactMatchLocations ++ otherNodeLocalLocations
}
override def clearDependencies() {