From edb57c8331738403d66c15ed99996e8bfb0488f7 Mon Sep 17 00:00:00 2001 From: Mridul Muralidharan Date: Sat, 4 May 2013 19:47:45 +0530 Subject: Add support for instance local in getPreferredLocations of ZippedPartitionsBaseRDD. Add comments to both ZippedPartitionsBaseRDD and ZippedRDD to better describe the potential problem with the approach --- .../main/scala/spark/rdd/ZippedPartitionsRDD.scala | 28 +++++++++++++++++++--- core/src/main/scala/spark/rdd/ZippedRDD.scala | 20 ++++++++++------ 2 files changed, 38 insertions(+), 10 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala index fc3f29ffcd..dd9f3c2680 100644 --- a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala @@ -1,6 +1,6 @@ package spark.rdd -import spark.{OneToOneDependency, RDD, SparkContext, Partition, TaskContext} +import spark.{Utils, OneToOneDependency, RDD, SparkContext, Partition, TaskContext} import java.io.{ObjectOutputStream, IOException} private[spark] class ZippedPartitionsPartition( @@ -38,9 +38,31 @@ abstract class ZippedPartitionsBaseRDD[V: ClassManifest]( } override def getPreferredLocations(s: Partition): Seq[String] = { + // Note that as number of rdd's increase and/or number of slaves in cluster increase, the computed preferredLocations below + // become diminishingly small : so we might need to look at alternate strategies to alleviate this. + // If there are no (or very small number of preferred locations), we will end up transferred the blocks to 'any' node in the + // cluster - paying with n/w and cache cost. + // Maybe pick a node which figures max amount of time ? + // Choose node which is hosting 'larger' of some subset of blocks ? + // Look at rack locality to ensure chosen host is atleast rack local to both hosting node ?, etc (would be good to defer this if possible) val splits = s.asInstanceOf[ZippedPartitionsPartition].partitions - val preferredLocations = rdds.zip(splits).map(x => x._1.preferredLocations(x._2)) - preferredLocations.reduce((x, y) => x.intersect(y)) + val rddSplitZip = rdds.zip(splits) + + // exact match. + val exactMatchPreferredLocations = rddSplitZip.map(x => x._1.preferredLocations(x._2)) + val exactMatchLocations = exactMatchPreferredLocations.reduce((x, y) => x.intersect(y)) + + // Remove exact match and then do host local match. + val otherNodePreferredLocations = rddSplitZip.map(x => { + x._1.preferredLocations(x._2).map(hostPort => { + val host = Utils.parseHostPort(hostPort)._1 + + if (exactMatchLocations.contains(host)) null else host + }).filter(_ != null) + }) + val otherNodeLocalLocations = otherNodePreferredLocations.reduce((x, y) => x.intersect(y)) + + otherNodeLocalLocations ++ exactMatchLocations } override def clearDependencies() { diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala index 51573fe68a..f728e93d24 100644 --- a/core/src/main/scala/spark/rdd/ZippedRDD.scala +++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala @@ -48,21 +48,27 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest]( } override def getPreferredLocations(s: Partition): Seq[String] = { + // Note that as number of slaves in cluster increase, the computed preferredLocations can become small : so we might need + // to look at alternate strategies to alleviate this. (If there are no (or very small number of preferred locations), we + // will end up transferred the blocks to 'any' node in the cluster - paying with n/w and cache cost. + // Maybe pick one or the other ? (so that atleast one block is local ?). + // Choose node which is hosting 'larger' of the blocks ? + // Look at rack locality to ensure chosen host is atleast rack local to both hosting node ?, etc (would be good to defer this if possible) val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions val pref1 = rdd1.preferredLocations(partition1) val pref2 = rdd2.preferredLocations(partition2) - // both partitions are instance local. - val instanceLocalLocations = pref1.intersect(pref2) + // exact match - instance local and host local. + val exactMatchLocations = pref1.intersect(pref2) - // remove locations which are already handled via instanceLocalLocations, and intersect where both partitions are node local. - val nodeLocalPref1 = pref1.filter(loc => ! instanceLocalLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1) - val nodeLocalPref2 = pref2.filter(loc => ! instanceLocalLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1) - val nodeLocalLocations = nodeLocalPref1.intersect(nodeLocalPref2) + // remove locations which are already handled via exactMatchLocations, and intersect where both partitions are node local. + val otherNodeLocalPref1 = pref1.filter(loc => ! exactMatchLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1) + val otherNodeLocalPref2 = pref2.filter(loc => ! exactMatchLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1) + val otherNodeLocalLocations = otherNodeLocalPref1.intersect(otherNodeLocalPref2) // Can have mix of instance local (hostPort) and node local (host) locations as preference ! - instanceLocalLocations ++ nodeLocalLocations + exactMatchLocations ++ otherNodeLocalLocations } override def clearDependencies() { -- cgit v1.2.3