diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-02-01 00:32:41 -0800 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-02-01 00:32:41 -0800 |
commit | 571af31304bd72d310c3b47a8471a4de206aa6fe (patch) | |
tree | b3a7fa148b96951e8871213721b9a4c4cb331c51 | |
parent | 5ce5efec104364ea6e97965ab5757db7d66f355e (diff) | |
parent | f9af9cee6fed9c6af896fb92556ad4f48c7f8e64 (diff) | |
download | spark-571af31304bd72d310c3b47a8471a4de206aa6fe.tar.gz spark-571af31304bd72d310c3b47a8471a4de206aa6fe.tar.bz2 spark-571af31304bd72d310c3b47a8471a4de206aa6fe.zip |
Merge pull request #433 from rxin/master
Changed PartitionPruningRDD's split to make sure it returns the correct split index.
-rw-r--r-- | core/src/main/scala/spark/Dependency.scala | 14 | ||||
-rw-r--r-- | core/src/main/scala/spark/rdd/PartitionPruningRDD.scala | 30 |
2 files changed, 24 insertions, 20 deletions
diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala index 647aee6eb5..5eea907322 100644 --- a/core/src/main/scala/spark/Dependency.scala +++ b/core/src/main/scala/spark/Dependency.scala @@ -61,17 +61,3 @@ class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int) } } } - - -/** - * Represents a dependency between the PartitionPruningRDD and its parent. In this - * case, the child RDD contains a subset of partitions of the parents'. - */ -class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean) - extends NarrowDependency[T](rdd) { - - @transient - val partitions: Array[Split] = rdd.splits.filter(s => partitionFilterFunc(s.index)) - - override def getParents(partitionId: Int) = List(partitions(partitionId).index) -} diff --git a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala index b8482338c6..a50ce75171 100644 --- a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala +++ b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala @@ -1,24 +1,42 @@ package spark.rdd -import spark.{PruneDependency, RDD, SparkEnv, Split, TaskContext} +import spark.{NarrowDependency, RDD, SparkEnv, Split, TaskContext} + + +class PartitionPruningRDDSplit(idx: Int, val parentSplit: Split) extends Split { + override val index = idx +} + + +/** + * Represents a dependency between the PartitionPruningRDD and its parent. In this + * case, the child RDD contains a subset of partitions of the parents'. + */ +class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean) + extends NarrowDependency[T](rdd) { + + @transient + val partitions: Array[Split] = rdd.splits.filter(s => partitionFilterFunc(s.index)) + .zipWithIndex.map { case(split, idx) => new PartitionPruningRDDSplit(idx, split) : Split } + + override def getParents(partitionId: Int) = List(partitions(partitionId).index) +} + /** * A RDD used to prune RDD partitions/splits so we can avoid launching tasks on * all partitions. An example use case: If we know the RDD is partitioned by range, * and the execution DAG has a filter on the key, we can avoid launching tasks * on partitions that don't have the range covering the key. - * - * TODO: This currently doesn't give partition IDs properly! */ class PartitionPruningRDD[T: ClassManifest]( @transient prev: RDD[T], @transient partitionFilterFunc: Int => Boolean) extends RDD[T](prev.context, List(new PruneDependency(prev, partitionFilterFunc))) { - override def compute(split: Split, context: TaskContext) = firstParent[T].iterator(split, context) + override def compute(split: Split, context: TaskContext) = firstParent[T].iterator( + split.asInstanceOf[PartitionPruningRDDSplit].parentSplit, context) override protected def getSplits = getDependencies.head.asInstanceOf[PruneDependency[T]].partitions - - override val partitioner = firstParent[T].partitioner } |