aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorMridul Muralidharan <mridul@gmail.com>2013-05-04 20:45:56 +0530
committerMridul Muralidharan <mridul@gmail.com>2013-05-04 20:45:56 +0530
commit25198d7e9ed00bc82886a9629988c1129514eec0 (patch)
tree06827aacf87b0407982b1ab80b1975a047c064e2 /core/src
parent5b011d18d7fa1b312012574e2fac0e513c33ebd1 (diff)
parentedb57c8331738403d66c15ed99996e8bfb0488f7 (diff)
downloadspark-25198d7e9ed00bc82886a9629988c1129514eec0.tar.gz
spark-25198d7e9ed00bc82886a9629988c1129514eec0.tar.bz2
spark-25198d7e9ed00bc82886a9629988c1129514eec0.zip
Merge branch 'master' of github.com:mridulm/spark
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala28
-rw-r--r--core/src/main/scala/spark/rdd/ZippedRDD.scala20
2 files changed, 38 insertions, 10 deletions
diff --git a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala
index fc3f29ffcd..dd9f3c2680 100644
--- a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala
@@ -1,6 +1,6 @@
package spark.rdd
-import spark.{OneToOneDependency, RDD, SparkContext, Partition, TaskContext}
+import spark.{Utils, OneToOneDependency, RDD, SparkContext, Partition, TaskContext}
import java.io.{ObjectOutputStream, IOException}
private[spark] class ZippedPartitionsPartition(
@@ -38,9 +38,31 @@ abstract class ZippedPartitionsBaseRDD[V: ClassManifest](
}
override def getPreferredLocations(s: Partition): Seq[String] = {
+ // Note that as number of rdd's increase and/or number of slaves in cluster increase, the computed preferredLocations below
+ // become diminishingly small : so we might need to look at alternate strategies to alleviate this.
+ // If there are no (or very small number of preferred locations), we will end up transferred the blocks to 'any' node in the
+ // cluster - paying with n/w and cache cost.
+ // Maybe pick a node which figures max amount of time ?
+ // Choose node which is hosting 'larger' of some subset of blocks ?
+ // Look at rack locality to ensure chosen host is atleast rack local to both hosting node ?, etc (would be good to defer this if possible)
val splits = s.asInstanceOf[ZippedPartitionsPartition].partitions
- val preferredLocations = rdds.zip(splits).map(x => x._1.preferredLocations(x._2))
- preferredLocations.reduce((x, y) => x.intersect(y))
+ val rddSplitZip = rdds.zip(splits)
+
+ // exact match.
+ val exactMatchPreferredLocations = rddSplitZip.map(x => x._1.preferredLocations(x._2))
+ val exactMatchLocations = exactMatchPreferredLocations.reduce((x, y) => x.intersect(y))
+
+ // Remove exact match and then do host local match.
+ val otherNodePreferredLocations = rddSplitZip.map(x => {
+ x._1.preferredLocations(x._2).map(hostPort => {
+ val host = Utils.parseHostPort(hostPort)._1
+
+ if (exactMatchLocations.contains(host)) null else host
+ }).filter(_ != null)
+ })
+ val otherNodeLocalLocations = otherNodePreferredLocations.reduce((x, y) => x.intersect(y))
+
+ otherNodeLocalLocations ++ exactMatchLocations
}
override def clearDependencies() {
diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala
index 51573fe68a..f728e93d24 100644
--- a/core/src/main/scala/spark/rdd/ZippedRDD.scala
+++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala
@@ -48,21 +48,27 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest](
}
override def getPreferredLocations(s: Partition): Seq[String] = {
+ // Note that as number of slaves in cluster increase, the computed preferredLocations can become small : so we might need
+ // to look at alternate strategies to alleviate this. (If there are no (or very small number of preferred locations), we
+ // will end up transferred the blocks to 'any' node in the cluster - paying with n/w and cache cost.
+ // Maybe pick one or the other ? (so that atleast one block is local ?).
+ // Choose node which is hosting 'larger' of the blocks ?
+ // Look at rack locality to ensure chosen host is atleast rack local to both hosting node ?, etc (would be good to defer this if possible)
val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions
val pref1 = rdd1.preferredLocations(partition1)
val pref2 = rdd2.preferredLocations(partition2)
- // both partitions are instance local.
- val instanceLocalLocations = pref1.intersect(pref2)
+ // exact match - instance local and host local.
+ val exactMatchLocations = pref1.intersect(pref2)
- // remove locations which are already handled via instanceLocalLocations, and intersect where both partitions are node local.
- val nodeLocalPref1 = pref1.filter(loc => ! instanceLocalLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1)
- val nodeLocalPref2 = pref2.filter(loc => ! instanceLocalLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1)
- val nodeLocalLocations = nodeLocalPref1.intersect(nodeLocalPref2)
+ // remove locations which are already handled via exactMatchLocations, and intersect where both partitions are node local.
+ val otherNodeLocalPref1 = pref1.filter(loc => ! exactMatchLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1)
+ val otherNodeLocalPref2 = pref2.filter(loc => ! exactMatchLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1)
+ val otherNodeLocalLocations = otherNodeLocalPref1.intersect(otherNodeLocalPref2)
// Can have mix of instance local (hostPort) and node local (host) locations as preference !
- instanceLocalLocations ++ nodeLocalLocations
+ exactMatchLocations ++ otherNodeLocalLocations
}
override def clearDependencies() {