aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2013-06-13 10:46:22 +0800
committerjerryshao <saisai.shao@intel.com>2013-06-18 09:49:06 +0800
commit1e9269c3eeeaa3a481b95521c703032ed84abd68 (patch)
treee8c2a94ef2485c590a73b6cf68476782c75d4552 /core/src
parent2ab311f4cee3f918dc28daaebd287b11c9f63429 (diff)
downloadspark-1e9269c3eeeaa3a481b95521c703032ed84abd68.tar.gz
spark-1e9269c3eeeaa3a481b95521c703032ed84abd68.tar.bz2
spark-1e9269c3eeeaa3a481b95521c703032ed84abd68.zip
reduce ZippedPartitionsRDD's getPreferredLocations complexity
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala12
1 files changed, 4 insertions, 8 deletions
diff --git a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala
index dd9f3c2680..b234428ab2 100644
--- a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala
@@ -53,14 +53,10 @@ abstract class ZippedPartitionsBaseRDD[V: ClassManifest](
val exactMatchLocations = exactMatchPreferredLocations.reduce((x, y) => x.intersect(y))
// Remove exact match and then do host local match.
- val otherNodePreferredLocations = rddSplitZip.map(x => {
- x._1.preferredLocations(x._2).map(hostPort => {
- val host = Utils.parseHostPort(hostPort)._1
-
- if (exactMatchLocations.contains(host)) null else host
- }).filter(_ != null)
- })
- val otherNodeLocalLocations = otherNodePreferredLocations.reduce((x, y) => x.intersect(y))
+ val exactMatchHosts = exactMatchLocations.map(Utils.parseHostPort(_)._1)
+ val matchPreferredHosts = exactMatchPreferredLocations.map(locs => locs.map(Utils.parseHostPort(_)._1))
+ .reduce((x, y) => x.intersect(y))
+ val otherNodeLocalLocations = matchPreferredHosts.filter { s => !exactMatchHosts.contains(s) }
otherNodeLocalLocations ++ exactMatchLocations
}