diff options
Diffstat (limited to 'core/src/main/scala/spark/rdd/MapPartitionsRDD.scala')
-rw-r--r-- | core/src/main/scala/spark/rdd/MapPartitionsRDD.scala | 14 |
1 files changed, 8 insertions, 6 deletions
diff --git a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala index c764505345..073f7d7d2a 100644 --- a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala +++ b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala @@ -1,6 +1,6 @@ package spark.rdd -import spark.{OneToOneDependency, RDD, Split, TaskContext} +import spark.{RDD, Split, TaskContext} private[spark] @@ -8,11 +8,13 @@ class MapPartitionsRDD[U: ClassManifest, T: ClassManifest]( prev: RDD[T], f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false) - extends RDD[U](prev.context) { + extends RDD[U](prev) { - override val partitioner = if (preservesPartitioning) prev.partitioner else None + override val partitioner = + if (preservesPartitioning) firstParent[T].partitioner else None - override def splits = prev.splits - override val dependencies = List(new OneToOneDependency(prev)) - override def compute(split: Split, context: TaskContext) = f(prev.iterator(split, context)) + override def getSplits = firstParent[T].splits + + override def compute(split: Split, context: TaskContext) = + f(firstParent[T].iterator(split, context)) }
\ No newline at end of file |