aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/rdd/MapPartitionsRDD.scala')
-rw-r--r--core/src/main/scala/spark/rdd/MapPartitionsRDD.scala14
1 files changed, 8 insertions, 6 deletions
diff --git a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
index c764505345..073f7d7d2a 100644
--- a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
+++ b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
@@ -1,6 +1,6 @@
package spark.rdd
-import spark.{OneToOneDependency, RDD, Split, TaskContext}
+import spark.{RDD, Split, TaskContext}
private[spark]
@@ -8,11 +8,13 @@ class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T],
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false)
- extends RDD[U](prev.context) {
+ extends RDD[U](prev) {
- override val partitioner = if (preservesPartitioning) prev.partitioner else None
+ override val partitioner =
+ if (preservesPartitioning) firstParent[T].partitioner else None
- override def splits = prev.splits
- override val dependencies = List(new OneToOneDependency(prev))
- override def compute(split: Split, context: TaskContext) = f(prev.iterator(split, context))
+ override def getSplits = firstParent[T].splits
+
+ override def compute(split: Split, context: TaskContext) =
+ f(firstParent[T].iterator(split, context))
} \ No newline at end of file