aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-12-06 11:01:42 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-12-06 11:01:42 -0800
commit87676a6af2c8fc33c5b5d4e7eb45e3e8558f3c33 (patch)
treeeaf8ced3e658632b6fb1fcef381cd5a6ed23e2f5 /core
parent078049877e123fe7e4c4553e36055de572cab7c4 (diff)
parent9cf7f31e4d4e542b88b6a474bdf08d07fdd3652c (diff)
downloadspark-87676a6af2c8fc33c5b5d4e7eb45e3e8558f3c33.tar.gz
spark-87676a6af2c8fc33c5b5d4e7eb45e3e8558f3c33.tar.bz2
spark-87676a6af2c8fc33c5b5d4e7eb45e3e8558f3c33.zip
Merge pull request #220 from rxin/zippart
Memoize preferred locations in ZippedPartitionsBaseRDD so preferred location computation doesn't lead to exponential explosion. This was a problem in GraphX where we have a whole chain of RDDs that are ZippedPartitionsRDD's, and the preferred locations were taking eternity to compute. (cherry picked from commit e36fe55a031d2c01c9d7c5d85965951c681a0c74) Signed-off-by: Reynold Xin <rxin@apache.org>
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala27
1 files changed, 11 insertions, 16 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index faeb316664..a97d2a01c8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -22,7 +22,8 @@ import java.io.{ObjectOutputStream, IOException}
private[spark] class ZippedPartitionsPartition(
idx: Int,
- @transient rdds: Seq[RDD[_]])
+ @transient rdds: Seq[RDD[_]],
+ @transient val preferredLocations: Seq[String])
extends Partition {
override val index: Int = idx
@@ -47,27 +48,21 @@ abstract class ZippedPartitionsBaseRDD[V: ClassManifest](
if (preservesPartitioning) firstParent[Any].partitioner else None
override def getPartitions: Array[Partition] = {
- val sizes = rdds.map(x => x.partitions.size)
- if (!sizes.forall(x => x == sizes(0))) {
+ val numParts = rdds.head.partitions.size
+ if (!rdds.forall(rdd => rdd.partitions.size == numParts)) {
throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions")
}
- val array = new Array[Partition](sizes(0))
- for (i <- 0 until sizes(0)) {
- array(i) = new ZippedPartitionsPartition(i, rdds)
+ Array.tabulate[Partition](numParts) { i =>
+ val prefs = rdds.map(rdd => rdd.preferredLocations(rdd.partitions(i)))
+ // Check whether there are any hosts that match all RDDs; otherwise return the union
+ val exactMatchLocations = prefs.reduce((x, y) => x.intersect(y))
+ val locs = if (!exactMatchLocations.isEmpty) exactMatchLocations else prefs.flatten.distinct
+ new ZippedPartitionsPartition(i, rdds, locs)
}
- array
}
override def getPreferredLocations(s: Partition): Seq[String] = {
- val parts = s.asInstanceOf[ZippedPartitionsPartition].partitions
- val prefs = rdds.zip(parts).map { case (rdd, p) => rdd.preferredLocations(p) }
- // Check whether there are any hosts that match all RDDs; otherwise return the union
- val exactMatchLocations = prefs.reduce((x, y) => x.intersect(y))
- if (!exactMatchLocations.isEmpty) {
- exactMatchLocations
- } else {
- prefs.flatten.distinct
- }
+ s.asInstanceOf[ZippedPartitionsPartition].preferredLocations
}
override def clearDependencies() {