aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-02-01 00:32:41 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-02-01 00:32:41 -0800
commit571af31304bd72d310c3b47a8471a4de206aa6fe (patch)
treeb3a7fa148b96951e8871213721b9a4c4cb331c51
parent5ce5efec104364ea6e97965ab5757db7d66f355e (diff)
parentf9af9cee6fed9c6af896fb92556ad4f48c7f8e64 (diff)
downloadspark-571af31304bd72d310c3b47a8471a4de206aa6fe.tar.gz
spark-571af31304bd72d310c3b47a8471a4de206aa6fe.tar.bz2
spark-571af31304bd72d310c3b47a8471a4de206aa6fe.zip
Merge pull request #433 from rxin/master
Changed PartitionPruningRDD's split to make sure it returns the correct split index.
-rw-r--r--core/src/main/scala/spark/Dependency.scala14
-rw-r--r--core/src/main/scala/spark/rdd/PartitionPruningRDD.scala30
2 files changed, 24 insertions, 20 deletions
diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala
index 647aee6eb5..5eea907322 100644
--- a/core/src/main/scala/spark/Dependency.scala
+++ b/core/src/main/scala/spark/Dependency.scala
@@ -61,17 +61,3 @@ class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
}
}
}
-
-
-/**
- * Represents a dependency between the PartitionPruningRDD and its parent. In this
- * case, the child RDD contains a subset of partitions of the parents'.
- */
-class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean)
- extends NarrowDependency[T](rdd) {
-
- @transient
- val partitions: Array[Split] = rdd.splits.filter(s => partitionFilterFunc(s.index))
-
- override def getParents(partitionId: Int) = List(partitions(partitionId).index)
-}
diff --git a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala
index b8482338c6..a50ce75171 100644
--- a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala
+++ b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala
@@ -1,24 +1,42 @@
package spark.rdd
-import spark.{PruneDependency, RDD, SparkEnv, Split, TaskContext}
+import spark.{NarrowDependency, RDD, SparkEnv, Split, TaskContext}
+
+
+class PartitionPruningRDDSplit(idx: Int, val parentSplit: Split) extends Split {
+ override val index = idx
+}
+
+
+/**
+ * Represents a dependency between the PartitionPruningRDD and its parent. In this
+ * case, the child RDD contains a subset of partitions of the parents'.
+ */
+class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean)
+ extends NarrowDependency[T](rdd) {
+
+ @transient
+ val partitions: Array[Split] = rdd.splits.filter(s => partitionFilterFunc(s.index))
+ .zipWithIndex.map { case(split, idx) => new PartitionPruningRDDSplit(idx, split) : Split }
+
+ override def getParents(partitionId: Int) = List(partitions(partitionId).index)
+}
+
/**
* A RDD used to prune RDD partitions/splits so we can avoid launching tasks on
* all partitions. An example use case: If we know the RDD is partitioned by range,
* and the execution DAG has a filter on the key, we can avoid launching tasks
* on partitions that don't have the range covering the key.
- *
- * TODO: This currently doesn't give partition IDs properly!
*/
class PartitionPruningRDD[T: ClassManifest](
@transient prev: RDD[T],
@transient partitionFilterFunc: Int => Boolean)
extends RDD[T](prev.context, List(new PruneDependency(prev, partitionFilterFunc))) {
- override def compute(split: Split, context: TaskContext) = firstParent[T].iterator(split, context)
+ override def compute(split: Split, context: TaskContext) = firstParent[T].iterator(
+ split.asInstanceOf[PartitionPruningRDDSplit].parentSplit, context)
override protected def getSplits =
getDependencies.head.asInstanceOf[PruneDependency[T]].partitions
-
- override val partitioner = firstParent[T].partitioner
}