diff options
author | Mridul Muralidharan <mridul@gmail.com> | 2013-05-04 19:47:45 +0530 |
---|---|---|
committer | Mridul Muralidharan <mridul@gmail.com> | 2013-05-04 19:47:45 +0530 |
commit | edb57c8331738403d66c15ed99996e8bfb0488f7 (patch) | |
tree | 71091e553fe62b7f32453da755f8c605e093df44 /core | |
parent | ea2a6f91d383c958b9bab56858f4ef0c8b6ec847 (diff) | |
download | spark-edb57c8331738403d66c15ed99996e8bfb0488f7.tar.gz spark-edb57c8331738403d66c15ed99996e8bfb0488f7.tar.bz2 spark-edb57c8331738403d66c15ed99996e8bfb0488f7.zip |
Add support for instance local in getPreferredLocations of ZippedPartitionsBaseRDD. Add comments to both ZippedPartitionsBaseRDD and ZippedRDD to better describe the potential problem with the approach
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala | 28 | ||||
-rw-r--r-- | core/src/main/scala/spark/rdd/ZippedRDD.scala | 20 |
2 files changed, 38 insertions, 10 deletions
diff --git a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala index fc3f29ffcd..dd9f3c2680 100644 --- a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala @@ -1,6 +1,6 @@ package spark.rdd -import spark.{OneToOneDependency, RDD, SparkContext, Partition, TaskContext} +import spark.{Utils, OneToOneDependency, RDD, SparkContext, Partition, TaskContext} import java.io.{ObjectOutputStream, IOException} private[spark] class ZippedPartitionsPartition( @@ -38,9 +38,31 @@ abstract class ZippedPartitionsBaseRDD[V: ClassManifest]( } override def getPreferredLocations(s: Partition): Seq[String] = { + // Note that as number of rdd's increase and/or number of slaves in cluster increase, the computed preferredLocations below + // become diminishingly small : so we might need to look at alternate strategies to alleviate this. + // If there are no (or very small number of preferred locations), we will end up transferred the blocks to 'any' node in the + // cluster - paying with n/w and cache cost. + // Maybe pick a node which figures max amount of time ? + // Choose node which is hosting 'larger' of some subset of blocks ? + // Look at rack locality to ensure chosen host is atleast rack local to both hosting node ?, etc (would be good to defer this if possible) val splits = s.asInstanceOf[ZippedPartitionsPartition].partitions - val preferredLocations = rdds.zip(splits).map(x => x._1.preferredLocations(x._2)) - preferredLocations.reduce((x, y) => x.intersect(y)) + val rddSplitZip = rdds.zip(splits) + + // exact match. + val exactMatchPreferredLocations = rddSplitZip.map(x => x._1.preferredLocations(x._2)) + val exactMatchLocations = exactMatchPreferredLocations.reduce((x, y) => x.intersect(y)) + + // Remove exact match and then do host local match. + val otherNodePreferredLocations = rddSplitZip.map(x => { + x._1.preferredLocations(x._2).map(hostPort => { + val host = Utils.parseHostPort(hostPort)._1 + + if (exactMatchLocations.contains(host)) null else host + }).filter(_ != null) + }) + val otherNodeLocalLocations = otherNodePreferredLocations.reduce((x, y) => x.intersect(y)) + + otherNodeLocalLocations ++ exactMatchLocations } override def clearDependencies() { diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala index 51573fe68a..f728e93d24 100644 --- a/core/src/main/scala/spark/rdd/ZippedRDD.scala +++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala @@ -48,21 +48,27 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest]( } override def getPreferredLocations(s: Partition): Seq[String] = { + // Note that as number of slaves in cluster increase, the computed preferredLocations can become small : so we might need + // to look at alternate strategies to alleviate this. (If there are no (or very small number of preferred locations), we + // will end up transferred the blocks to 'any' node in the cluster - paying with n/w and cache cost. + // Maybe pick one or the other ? (so that atleast one block is local ?). + // Choose node which is hosting 'larger' of the blocks ? + // Look at rack locality to ensure chosen host is atleast rack local to both hosting node ?, etc (would be good to defer this if possible) val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions val pref1 = rdd1.preferredLocations(partition1) val pref2 = rdd2.preferredLocations(partition2) - // both partitions are instance local. - val instanceLocalLocations = pref1.intersect(pref2) + // exact match - instance local and host local. + val exactMatchLocations = pref1.intersect(pref2) - // remove locations which are already handled via instanceLocalLocations, and intersect where both partitions are node local. - val nodeLocalPref1 = pref1.filter(loc => ! instanceLocalLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1) - val nodeLocalPref2 = pref2.filter(loc => ! instanceLocalLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1) - val nodeLocalLocations = nodeLocalPref1.intersect(nodeLocalPref2) + // remove locations which are already handled via exactMatchLocations, and intersect where both partitions are node local. + val otherNodeLocalPref1 = pref1.filter(loc => ! exactMatchLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1) + val otherNodeLocalPref2 = pref2.filter(loc => ! exactMatchLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1) + val otherNodeLocalLocations = otherNodeLocalPref1.intersect(otherNodeLocalPref2) // Can have mix of instance local (hostPort) and node local (host) locations as preference ! - instanceLocalLocations ++ nodeLocalLocations + exactMatchLocations ++ otherNodeLocalLocations } override def clearDependencies() { |