aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2013-02-01 00:02:46 -0800
committerReynold Xin <rxin@cs.berkeley.edu>2013-02-01 00:02:46 -0800
commitf9af9cee6fed9c6af896fb92556ad4f48c7f8e64 (patch)
tree7573490d1202761fa1d6861baf7650db081844a9
parent6289d9654e32fc92418d41cc6e32fee30f85c833 (diff)
downloadspark-f9af9cee6fed9c6af896fb92556ad4f48c7f8e64.tar.gz
spark-f9af9cee6fed9c6af896fb92556ad4f48c7f8e64.tar.bz2
spark-f9af9cee6fed9c6af896fb92556ad4f48c7f8e64.zip
Moved PruneDependency into PartitionPruningRDD.scala.
-rw-r--r--core/src/main/scala/spark/Dependency.scala22
-rw-r--r--core/src/main/scala/spark/rdd/PartitionPruningRDD.scala26
2 files changed, 22 insertions, 26 deletions
diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala
index 827eac850a..5eea907322 100644
--- a/core/src/main/scala/spark/Dependency.scala
+++ b/core/src/main/scala/spark/Dependency.scala
@@ -61,25 +61,3 @@ class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
}
}
}
-
-
-/**
- * Represents a dependency between the PartitionPruningRDD and its parent. In this
- * case, the child RDD contains a subset of partitions of the parents'.
- */
-class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean)
- extends NarrowDependency[T](rdd) {
-
- @transient
- val partitions: Array[Split] = rdd.splits.filter(s => partitionFilterFunc(s.index))
- .zipWithIndex
- .map { case(split, idx) => new PruneDependency.PartitionPruningRDDSplit(idx, split) : Split }
-
- override def getParents(partitionId: Int) = List(partitions(partitionId).index)
-}
-
-object PruneDependency {
- class PartitionPruningRDDSplit(idx: Int, val parentSplit: Split) extends Split {
- override val index = idx
- }
-}
diff --git a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala
index 3756870fac..a50ce75171 100644
--- a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala
+++ b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala
@@ -1,6 +1,26 @@
package spark.rdd
-import spark.{PruneDependency, RDD, SparkEnv, Split, TaskContext}
+import spark.{NarrowDependency, RDD, SparkEnv, Split, TaskContext}
+
+
+class PartitionPruningRDDSplit(idx: Int, val parentSplit: Split) extends Split {
+ override val index = idx
+}
+
+
+/**
+ * Represents a dependency between the PartitionPruningRDD and its parent. In this
+ * case, the child RDD contains a subset of partitions of the parents'.
+ */
+class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean)
+ extends NarrowDependency[T](rdd) {
+
+ @transient
+ val partitions: Array[Split] = rdd.splits.filter(s => partitionFilterFunc(s.index))
+ .zipWithIndex.map { case(split, idx) => new PartitionPruningRDDSplit(idx, split) : Split }
+
+ override def getParents(partitionId: Int) = List(partitions(partitionId).index)
+}
/**
@@ -15,10 +35,8 @@ class PartitionPruningRDD[T: ClassManifest](
extends RDD[T](prev.context, List(new PruneDependency(prev, partitionFilterFunc))) {
override def compute(split: Split, context: TaskContext) = firstParent[T].iterator(
- split.asInstanceOf[PruneDependency.PartitionPruningRDDSplit].parentSplit, context)
+ split.asInstanceOf[PartitionPruningRDDSplit].parentSplit, context)
override protected def getSplits =
getDependencies.head.asInstanceOf[PruneDependency[T]].partitions
-
- override val partitioner = firstParent[T].partitioner
}