diff options
author | Reynold Xin <rxin@cs.berkeley.edu> | 2013-02-01 00:02:46 -0800 |
---|---|---|
committer | Reynold Xin <rxin@cs.berkeley.edu> | 2013-02-01 00:02:46 -0800 |
commit | f9af9cee6fed9c6af896fb92556ad4f48c7f8e64 (patch) | |
tree | 7573490d1202761fa1d6861baf7650db081844a9 | |
parent | 6289d9654e32fc92418d41cc6e32fee30f85c833 (diff) | |
download | spark-f9af9cee6fed9c6af896fb92556ad4f48c7f8e64.tar.gz spark-f9af9cee6fed9c6af896fb92556ad4f48c7f8e64.tar.bz2 spark-f9af9cee6fed9c6af896fb92556ad4f48c7f8e64.zip |
Moved PruneDependency into PartitionPruningRDD.scala.
-rw-r--r-- | core/src/main/scala/spark/Dependency.scala | 22 | ||||
-rw-r--r-- | core/src/main/scala/spark/rdd/PartitionPruningRDD.scala | 26 |
2 files changed, 22 insertions, 26 deletions
diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala index 827eac850a..5eea907322 100644 --- a/core/src/main/scala/spark/Dependency.scala +++ b/core/src/main/scala/spark/Dependency.scala @@ -61,25 +61,3 @@ class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int) } } } - - -/** - * Represents a dependency between the PartitionPruningRDD and its parent. In this - * case, the child RDD contains a subset of partitions of the parents'. - */ -class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean) - extends NarrowDependency[T](rdd) { - - @transient - val partitions: Array[Split] = rdd.splits.filter(s => partitionFilterFunc(s.index)) - .zipWithIndex - .map { case(split, idx) => new PruneDependency.PartitionPruningRDDSplit(idx, split) : Split } - - override def getParents(partitionId: Int) = List(partitions(partitionId).index) -} - -object PruneDependency { - class PartitionPruningRDDSplit(idx: Int, val parentSplit: Split) extends Split { - override val index = idx - } -} diff --git a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala index 3756870fac..a50ce75171 100644 --- a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala +++ b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala @@ -1,6 +1,26 @@ package spark.rdd -import spark.{PruneDependency, RDD, SparkEnv, Split, TaskContext} +import spark.{NarrowDependency, RDD, SparkEnv, Split, TaskContext} + + +class PartitionPruningRDDSplit(idx: Int, val parentSplit: Split) extends Split { + override val index = idx +} + + +/** + * Represents a dependency between the PartitionPruningRDD and its parent. In this + * case, the child RDD contains a subset of partitions of the parents'. + */ +class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean) + extends NarrowDependency[T](rdd) { + + @transient + val partitions: Array[Split] = rdd.splits.filter(s => partitionFilterFunc(s.index)) + .zipWithIndex.map { case(split, idx) => new PartitionPruningRDDSplit(idx, split) : Split } + + override def getParents(partitionId: Int) = List(partitions(partitionId).index) +} /** @@ -15,10 +35,8 @@ class PartitionPruningRDD[T: ClassManifest]( extends RDD[T](prev.context, List(new PruneDependency(prev, partitionFilterFunc))) { override def compute(split: Split, context: TaskContext) = firstParent[T].iterator( - split.asInstanceOf[PruneDependency.PartitionPruningRDDSplit].parentSplit, context) + split.asInstanceOf[PartitionPruningRDDSplit].parentSplit, context) override protected def getSplits = getDependencies.head.asInstanceOf[PruneDependency[T]].partitions - - override val partitioner = firstParent[T].partitioner } |