diff options
author | Mridul Muralidharan <mridul@gmail.com> | 2013-05-04 20:45:56 +0530 |
---|---|---|
committer | Mridul Muralidharan <mridul@gmail.com> | 2013-05-04 20:45:56 +0530 |
commit | 25198d7e9ed00bc82886a9629988c1129514eec0 (patch) | |
tree | 06827aacf87b0407982b1ab80b1975a047c064e2 /core | |
parent | 5b011d18d7fa1b312012574e2fac0e513c33ebd1 (diff) | |
parent | edb57c8331738403d66c15ed99996e8bfb0488f7 (diff) | |
download | spark-25198d7e9ed00bc82886a9629988c1129514eec0.tar.gz spark-25198d7e9ed00bc82886a9629988c1129514eec0.tar.bz2 spark-25198d7e9ed00bc82886a9629988c1129514eec0.zip |
Merge branch 'master' of github.com:mridulm/spark
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala | 28 | ||||
-rw-r--r-- | core/src/main/scala/spark/rdd/ZippedRDD.scala | 20 |
2 files changed, 38 insertions, 10 deletions
diff --git a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala index fc3f29ffcd..dd9f3c2680 100644 --- a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala @@ -1,6 +1,6 @@ package spark.rdd -import spark.{OneToOneDependency, RDD, SparkContext, Partition, TaskContext} +import spark.{Utils, OneToOneDependency, RDD, SparkContext, Partition, TaskContext} import java.io.{ObjectOutputStream, IOException} private[spark] class ZippedPartitionsPartition( @@ -38,9 +38,31 @@ abstract class ZippedPartitionsBaseRDD[V: ClassManifest]( } override def getPreferredLocations(s: Partition): Seq[String] = { + // Note that as number of rdd's increase and/or number of slaves in cluster increase, the computed preferredLocations below + // become diminishingly small : so we might need to look at alternate strategies to alleviate this. + // If there are no (or very small number of preferred locations), we will end up transferred the blocks to 'any' node in the + // cluster - paying with n/w and cache cost. + // Maybe pick a node which figures max amount of time ? + // Choose node which is hosting 'larger' of some subset of blocks ? + // Look at rack locality to ensure chosen host is atleast rack local to both hosting node ?, etc (would be good to defer this if possible) val splits = s.asInstanceOf[ZippedPartitionsPartition].partitions - val preferredLocations = rdds.zip(splits).map(x => x._1.preferredLocations(x._2)) - preferredLocations.reduce((x, y) => x.intersect(y)) + val rddSplitZip = rdds.zip(splits) + + // exact match. + val exactMatchPreferredLocations = rddSplitZip.map(x => x._1.preferredLocations(x._2)) + val exactMatchLocations = exactMatchPreferredLocations.reduce((x, y) => x.intersect(y)) + + // Remove exact match and then do host local match. + val otherNodePreferredLocations = rddSplitZip.map(x => { + x._1.preferredLocations(x._2).map(hostPort => { + val host = Utils.parseHostPort(hostPort)._1 + + if (exactMatchLocations.contains(host)) null else host + }).filter(_ != null) + }) + val otherNodeLocalLocations = otherNodePreferredLocations.reduce((x, y) => x.intersect(y)) + + otherNodeLocalLocations ++ exactMatchLocations } override def clearDependencies() { diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala index 51573fe68a..f728e93d24 100644 --- a/core/src/main/scala/spark/rdd/ZippedRDD.scala +++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala @@ -48,21 +48,27 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest]( } override def getPreferredLocations(s: Partition): Seq[String] = { + // Note that as number of slaves in cluster increase, the computed preferredLocations can become small : so we might need + // to look at alternate strategies to alleviate this. (If there are no (or very small number of preferred locations), we + // will end up transferred the blocks to 'any' node in the cluster - paying with n/w and cache cost. + // Maybe pick one or the other ? (so that atleast one block is local ?). + // Choose node which is hosting 'larger' of the blocks ? + // Look at rack locality to ensure chosen host is atleast rack local to both hosting node ?, etc (would be good to defer this if possible) val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions val pref1 = rdd1.preferredLocations(partition1) val pref2 = rdd2.preferredLocations(partition2) - // both partitions are instance local. - val instanceLocalLocations = pref1.intersect(pref2) + // exact match - instance local and host local. + val exactMatchLocations = pref1.intersect(pref2) - // remove locations which are already handled via instanceLocalLocations, and intersect where both partitions are node local. - val nodeLocalPref1 = pref1.filter(loc => ! instanceLocalLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1) - val nodeLocalPref2 = pref2.filter(loc => ! instanceLocalLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1) - val nodeLocalLocations = nodeLocalPref1.intersect(nodeLocalPref2) + // remove locations which are already handled via exactMatchLocations, and intersect where both partitions are node local. + val otherNodeLocalPref1 = pref1.filter(loc => ! exactMatchLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1) + val otherNodeLocalPref2 = pref2.filter(loc => ! exactMatchLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1) + val otherNodeLocalLocations = otherNodeLocalPref1.intersect(otherNodeLocalPref2) // Can have mix of instance local (hostPort) and node local (host) locations as preference ! - instanceLocalLocations ++ nodeLocalLocations + exactMatchLocations ++ otherNodeLocalLocations } override def clearDependencies() { |