diff options
author | jerryshao <saisai.shao@intel.com> | 2013-06-13 10:46:22 +0800 |
---|---|---|
committer | jerryshao <saisai.shao@intel.com> | 2013-06-18 09:49:06 +0800 |
commit | 1e9269c3eeeaa3a481b95521c703032ed84abd68 (patch) | |
tree | e8c2a94ef2485c590a73b6cf68476782c75d4552 | |
parent | 2ab311f4cee3f918dc28daaebd287b11c9f63429 (diff) | |
download | spark-1e9269c3eeeaa3a481b95521c703032ed84abd68.tar.gz spark-1e9269c3eeeaa3a481b95521c703032ed84abd68.tar.bz2 spark-1e9269c3eeeaa3a481b95521c703032ed84abd68.zip |
reduce ZippedPartitionsRDD's getPreferredLocations complexity
-rw-r--r-- | core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala | 12 |
1 files changed, 4 insertions, 8 deletions
diff --git a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala index dd9f3c2680..b234428ab2 100644 --- a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala @@ -53,14 +53,10 @@ abstract class ZippedPartitionsBaseRDD[V: ClassManifest]( val exactMatchLocations = exactMatchPreferredLocations.reduce((x, y) => x.intersect(y)) // Remove exact match and then do host local match. - val otherNodePreferredLocations = rddSplitZip.map(x => { - x._1.preferredLocations(x._2).map(hostPort => { - val host = Utils.parseHostPort(hostPort)._1 - - if (exactMatchLocations.contains(host)) null else host - }).filter(_ != null) - }) - val otherNodeLocalLocations = otherNodePreferredLocations.reduce((x, y) => x.intersect(y)) + val exactMatchHosts = exactMatchLocations.map(Utils.parseHostPort(_)._1) + val matchPreferredHosts = exactMatchPreferredLocations.map(locs => locs.map(Utils.parseHostPort(_)._1)) + .reduce((x, y) => x.intersect(y)) + val otherNodeLocalLocations = matchPreferredHosts.filter { s => !exactMatchHosts.contains(s) } otherNodeLocalLocations ++ exactMatchLocations } |