aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMridul Muralidharan <mridul@gmail.com>2013-05-03 12:23:30 +0530
committerMridul Muralidharan <mridul@gmail.com>2013-05-03 12:23:30 +0530
commit11589c39d9f75e9757ba1717c5202f77d30031b2 (patch)
tree1c20eb806dc5c96267f535ecf4deeec6930830ad /core
parentdfde9ce9dde0a151d42f7aecb826b40a4c98b459 (diff)
downloadspark-11589c39d9f75e9757ba1717c5202f77d30031b2.tar.gz
spark-11589c39d9f75e9757ba1717c5202f77d30031b2.tar.bz2
spark-11589c39d9f75e9757ba1717c5202f77d30031b2.zip
Fix ZippedRDD as part Matei's suggestion
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/rdd/ZippedRDD.scala19
1 files changed, 15 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala
index e80250a99b..51573fe68a 100644
--- a/core/src/main/scala/spark/rdd/ZippedRDD.scala
+++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala
@@ -1,6 +1,6 @@
package spark.rdd
-import spark.{OneToOneDependency, RDD, SparkContext, Partition, TaskContext}
+import spark.{Utils, OneToOneDependency, RDD, SparkContext, Partition, TaskContext}
import java.io.{ObjectOutputStream, IOException}
@@ -49,9 +49,20 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest](
override def getPreferredLocations(s: Partition): Seq[String] = {
val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions
- // TODO: becomes complicated - intersect on hostPort if available, else fallback to host (removing intersected hostPort's).
- // Since I am not very sure about this RDD, leaving it to others to comment better !
- rdd1.preferredLocations(partition1).intersect(rdd2.preferredLocations(partition2))
+ val pref1 = rdd1.preferredLocations(partition1)
+ val pref2 = rdd2.preferredLocations(partition2)
+
+ // both partitions are instance local.
+ val instanceLocalLocations = pref1.intersect(pref2)
+
+ // remove locations which are already handled via instanceLocalLocations, and intersect where both partitions are node local.
+ val nodeLocalPref1 = pref1.filter(loc => ! instanceLocalLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1)
+ val nodeLocalPref2 = pref2.filter(loc => ! instanceLocalLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1)
+ val nodeLocalLocations = nodeLocalPref1.intersect(nodeLocalPref2)
+
+
+ // Can have mix of instance local (hostPort) and node local (host) locations as preference !
+ instanceLocalLocations ++ nodeLocalLocations
}
override def clearDependencies() {