aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2014-12-04 00:45:57 -0800
committerReynold Xin <rxin@databricks.com>2014-12-04 00:45:57 -0800
commitc3ad48603632a039a51be3d33e917105149fdd7a (patch)
tree6561c006639096ab15c65df80b2f931eeede9d58
parented88db4cb21d029ca14ebc428fae122adf5128f0 (diff)
downloadspark-c3ad48603632a039a51be3d33e917105149fdd7a.tar.gz
spark-c3ad48603632a039a51be3d33e917105149fdd7a.tar.bz2
spark-c3ad48603632a039a51be3d33e917105149fdd7a.zip
[SPARK-4719][API] Consolidate various narrow dep RDD classes with MapPartitionsRDD
MappedRDD, MappedValuesRDD, FlatMappedValuesRDD, FilteredRDD, GlommedRDD, FlatMappedRDD are not necessary. They can be implemented trivially using MapPartitionsRDD. Author: Reynold Xin <rxin@databricks.com> Closes #3578 from rxin/SPARK-4719 and squashes the following commits: eed9853 [Reynold Xin] Preserve partitioning for filter. eb1a89b [Reynold Xin] [SPARK-4719][API] Consolidate various narrow dep RDD classes with MapPartitionsRDD.
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala35
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala34
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala35
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala33
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala28
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala50
10 files changed, 55 insertions, 245 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
index 6e66ddbdef..1f755db485 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
@@ -24,12 +24,12 @@ import org.apache.spark.input.StreamFileInputFormat
import org.apache.spark.{ Partition, SparkContext }
private[spark] class BinaryFileRDD[T](
- sc: SparkContext,
- inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
- keyClass: Class[String],
- valueClass: Class[T],
- @transient conf: Configuration,
- minPartitions: Int)
+ sc: SparkContext,
+ inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
+ keyClass: Class[String],
+ valueClass: Class[T],
+ @transient conf: Configuration,
+ minPartitions: Int)
extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) {
override def getPartitions: Array[Partition] = {
diff --git a/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
deleted file mode 100644
index 9e41b3d1e2..0000000000
--- a/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rdd
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.{Partition, TaskContext}
-
-private[spark] class FilteredRDD[T: ClassTag](
- prev: RDD[T],
- f: T => Boolean)
- extends RDD[T](prev) {
-
- override def getPartitions: Array[Partition] = firstParent[T].partitions
-
- override val partitioner = prev.partitioner // Since filter cannot change a partition's keys
-
- override def compute(split: Partition, context: TaskContext) =
- firstParent[T].iterator(split, context).filter(f)
-}
diff --git a/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
deleted file mode 100644
index d8f87d4e36..0000000000
--- a/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rdd
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.{Partition, TaskContext}
-
-private[spark]
-class FlatMappedRDD[U: ClassTag, T: ClassTag](
- prev: RDD[T],
- f: T => TraversableOnce[U])
- extends RDD[U](prev) {
-
- override def getPartitions: Array[Partition] = firstParent[T].partitions
-
- override def compute(split: Partition, context: TaskContext) =
- firstParent[T].iterator(split, context).flatMap(f)
-}
diff --git a/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala
deleted file mode 100644
index 7c9023f62d..0000000000
--- a/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rdd
-
-import org.apache.spark.{Partition, TaskContext}
-
-private[spark]
-class FlatMappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => TraversableOnce[U])
- extends RDD[(K, U)](prev) {
-
- override def getPartitions = firstParent[Product2[K, V]].partitions
-
- override val partitioner = firstParent[Product2[K, V]].partitioner
-
- override def compute(split: Partition, context: TaskContext) = {
- firstParent[Product2[K, V]].iterator(split, context).flatMap { case Product2(k, v) =>
- f(v).map(x => (k, x))
- }
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
deleted file mode 100644
index f6463fa715..0000000000
--- a/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rdd
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.{Partition, TaskContext}
-
-private[spark] class GlommedRDD[T: ClassTag](prev: RDD[T])
- extends RDD[Array[T]](prev) {
-
- override def getPartitions: Array[Partition] = firstParent[T].partitions
-
- override def compute(split: Partition, context: TaskContext) =
- Array(firstParent[T].iterator(split, context).toArray).iterator
-}
diff --git a/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala
deleted file mode 100644
index 8d7c288593..0000000000
--- a/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rdd
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.{Partition, TaskContext}
-
-private[spark]
-class MappedRDD[U: ClassTag, T: ClassTag](prev: RDD[T], f: T => U)
- extends RDD[U](prev) {
-
- override def getPartitions: Array[Partition] = firstParent[T].partitions
-
- override def compute(split: Partition, context: TaskContext) =
- firstParent[T].iterator(split, context).map(f)
-}
diff --git a/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala
deleted file mode 100644
index a60952eee5..0000000000
--- a/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rdd
-
-import org.apache.spark.{Partition, TaskContext}
-
-private[spark]
-class MappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => U)
- extends RDD[(K, U)](prev) {
-
- override def getPartitions = firstParent[Product2[K, U]].partitions
-
- override val partitioner = firstParent[Product2[K, U]].partitioner
-
- override def compute(split: Partition, context: TaskContext): Iterator[(K, U)] = {
- firstParent[Product2[K, V]].iterator(split, context).map { pair => (pair._1, f(pair._2)) }
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index e78e576788..c43e1f2fe1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -660,7 +660,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
*/
def mapValues[U](f: V => U): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
- new MappedValuesRDD(self, cleanF)
+ new MapPartitionsRDD[(K, U), (K, V)](self,
+ (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) },
+ preservesPartitioning = true)
}
/**
@@ -669,7 +671,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
*/
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
- new FlatMappedValuesRDD(self, cleanF)
+ new MapPartitionsRDD[(K, U), (K, V)](self,
+ (context, pid, iter) => iter.flatMap { case (k, v) =>
+ cleanF(v).map(x => (k, x))
+ },
+ preservesPartitioning = true)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 8dfd952298..0bd616ec24 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.rdd
-import java.util.{Properties, Random}
+import java.util.Random
import scala.collection.{mutable, Map}
import scala.collection.mutable.ArrayBuffer
@@ -36,13 +36,12 @@ import org.apache.spark._
import org.apache.spark.Partitioner._
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.JavaRDD
-import org.apache.spark.broadcast.Broadcast
import org.apache.spark.partial.BoundedDouble
import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{BoundedPriorityQueue, Utils, CallSite}
+import org.apache.spark.util.{BoundedPriorityQueue, Utils}
import org.apache.spark.util.collection.OpenHashMap
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, BernoulliCellSampler,
SamplingUtils}
@@ -270,19 +269,30 @@ abstract class RDD[T: ClassTag](
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
- def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
+ def map[U: ClassTag](f: T => U): RDD[U] = {
+ val cleanF = sc.clean(f)
+ new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
+ }
/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
- def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] =
- new FlatMappedRDD(this, sc.clean(f))
+ def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = {
+ val cleanF = sc.clean(f)
+ new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
+ }
/**
* Return a new RDD containing only the elements that satisfy a predicate.
*/
- def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
+ def filter(f: T => Boolean): RDD[T] = {
+ val cleanF = sc.clean(f)
+ new MapPartitionsRDD[T, T](
+ this,
+ (context, pid, iter) => iter.filter(cleanF),
+ preservesPartitioning = true)
+ }
/**
* Return a new RDD containing the distinct elements in this RDD.
@@ -503,7 +513,9 @@ abstract class RDD[T: ClassTag](
/**
* Return an RDD created by coalescing all elements within each partition into an array.
*/
- def glom(): RDD[Array[T]] = new GlommedRDD(this)
+ def glom(): RDD[Array[T]] = {
+ new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
+ }
/**
* Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 6d9be79614..46fcb80fa1 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -24,10 +24,9 @@ import scala.reflect.ClassTag
import org.scalatest.FunSuite
import org.apache.spark._
-import org.apache.spark.util.Utils
-
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDDSuiteUtils._
+import org.apache.spark.util.Utils
class RDDSuite extends FunSuite with SharedSparkContext {
@@ -37,8 +36,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(nums.toLocalIterator.toList === List(1, 2, 3, 4))
val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2)
assert(dups.distinct().count() === 4)
- assert(dups.distinct.count === 4) // Can distinct and count be called without parentheses?
- assert(dups.distinct.collect === dups.distinct().collect)
+ assert(dups.distinct().count === 4) // Can distinct and count be called without parentheses?
+ assert(dups.distinct().collect === dups.distinct().collect)
assert(dups.distinct(2).collect === dups.distinct().collect)
assert(nums.reduce(_ + _) === 10)
assert(nums.fold(0)(_ + _) === 10)
@@ -617,9 +616,9 @@ class RDDSuite extends FunSuite with SharedSparkContext {
for(seed <- 1 to 5) {
val splits = data.randomSplit(Array(1.0, 2.0, 3.0), seed)
assert(splits.size == 3, "wrong number of splits")
- assert(splits.flatMap(_.collect).sorted.toList == data.collect.toList,
+ assert(splits.flatMap(_.collect()).sorted.toList == data.collect().toList,
"incomplete or wrong split")
- val s = splits.map(_.count)
+ val s = splits.map(_.count())
assert(math.abs(s(0) - 100) < 50) // std = 9.13
assert(math.abs(s(1) - 200) < 50) // std = 11.55
assert(math.abs(s(2) - 300) < 50) // std = 12.25
@@ -762,8 +761,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val rdd3 = rdd2.map(_ + 1)
val rdd4 = new UnionRDD(sc, List(rdd1, rdd2, rdd3))
assert(rdd4.parent(0).isInstanceOf[ParallelCollectionRDD[_]])
- assert(rdd4.parent(1).isInstanceOf[FilteredRDD[_]])
- assert(rdd4.parent(2).isInstanceOf[MappedRDD[_, _]])
+ assert(rdd4.parent[Int](1) === rdd2)
+ assert(rdd4.parent[Int](2) === rdd3)
}
test("getNarrowAncestors") {
@@ -781,20 +780,18 @@ class RDDSuite extends FunSuite with SharedSparkContext {
// Simple dependency tree with a single branch
assert(ancestors1.size === 0)
assert(ancestors2.size === 2)
- assert(ancestors2.count(_.isInstanceOf[ParallelCollectionRDD[_]]) === 1)
- assert(ancestors2.count(_.isInstanceOf[FilteredRDD[_]]) === 1)
+ assert(ancestors2.count(_ === rdd1) === 1)
+ assert(ancestors2.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 1)
assert(ancestors3.size === 5)
- assert(ancestors3.count(_.isInstanceOf[ParallelCollectionRDD[_]]) === 1)
- assert(ancestors3.count(_.isInstanceOf[FilteredRDD[_]]) === 2)
- assert(ancestors3.count(_.isInstanceOf[MappedRDD[_, _]]) === 2)
+ assert(ancestors3.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 4)
// Any ancestors before the shuffle are not considered
assert(ancestors4.size === 0)
assert(ancestors4.count(_.isInstanceOf[ShuffledRDD[_, _, _]]) === 0)
assert(ancestors5.size === 3)
assert(ancestors5.count(_.isInstanceOf[ShuffledRDD[_, _, _]]) === 1)
- assert(ancestors5.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 0)
- assert(ancestors5.count(_.isInstanceOf[MappedValuesRDD[_, _, _]]) === 2)
+ assert(ancestors5.count(_ === rdd3) === 0)
+ assert(ancestors5.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 2)
}
test("getNarrowAncestors with multiple parents") {
@@ -815,16 +812,16 @@ class RDDSuite extends FunSuite with SharedSparkContext {
// Simple dependency tree with multiple branches
assert(ancestors6.size === 3)
assert(ancestors6.count(_.isInstanceOf[ParallelCollectionRDD[_]]) === 2)
- assert(ancestors6.count(_.isInstanceOf[MappedRDD[_, _]]) === 1)
+ assert(ancestors6.count(_ === rdd2) === 1)
assert(ancestors7.size === 5)
assert(ancestors7.count(_.isInstanceOf[ParallelCollectionRDD[_]]) === 3)
- assert(ancestors7.count(_.isInstanceOf[MappedRDD[_, _]]) === 1)
- assert(ancestors7.count(_.isInstanceOf[FilteredRDD[_]]) === 1)
+ assert(ancestors7.count(_ === rdd2) === 1)
+ assert(ancestors7.count(_ === rdd3) === 1)
// Dependency tree with duplicate nodes (e.g. rdd1 should not be reported twice)
assert(ancestors8.size === 7)
- assert(ancestors8.count(_.isInstanceOf[MappedRDD[_, _]]) === 1)
- assert(ancestors8.count(_.isInstanceOf[FilteredRDD[_]]) === 1)
+ assert(ancestors8.count(_ === rdd2) === 1)
+ assert(ancestors8.count(_ === rdd3) === 1)
assert(ancestors8.count(_.isInstanceOf[UnionRDD[_]]) === 2)
assert(ancestors8.count(_.isInstanceOf[ParallelCollectionRDD[_]]) === 3)
assert(ancestors8.count(_ == rdd1) === 1)
@@ -834,7 +831,6 @@ class RDDSuite extends FunSuite with SharedSparkContext {
// Any ancestors before the shuffle are not considered
assert(ancestors9.size === 2)
assert(ancestors9.count(_.isInstanceOf[CoGroupedRDD[_]]) === 1)
- assert(ancestors9.count(_.isInstanceOf[MappedValuesRDD[_, _, _]]) === 1)
}
/**
@@ -868,12 +864,10 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val ancestors3 = rdd3.getNarrowAncestors
val ancestors4 = rdd4.getNarrowAncestors
assert(ancestors3.size === 4)
- assert(ancestors3.count(_.isInstanceOf[MappedRDD[_, _]]) === 2)
- assert(ancestors3.count(_.isInstanceOf[FilteredRDD[_]]) === 2)
+ assert(ancestors3.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 4)
assert(ancestors3.count(_ == rdd3) === 0)
assert(ancestors4.size === 4)
- assert(ancestors4.count(_.isInstanceOf[MappedRDD[_, _]]) === 2)
- assert(ancestors4.count(_.isInstanceOf[FilteredRDD[_]]) === 1)
+ assert(ancestors4.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 3)
assert(ancestors4.count(_.isInstanceOf[CyclicalDependencyRDD[_]]) === 1)
assert(ancestors4.count(_ == rdd3) === 1)
assert(ancestors4.count(_ == rdd4) === 0)
@@ -881,8 +875,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
// Cycles that do not involve the root
val ancestors5 = rdd5.getNarrowAncestors
assert(ancestors5.size === 6)
- assert(ancestors5.count(_.isInstanceOf[MappedRDD[_, _]]) === 3)
- assert(ancestors5.count(_.isInstanceOf[FilteredRDD[_]]) === 2)
+ assert(ancestors5.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 5)
assert(ancestors5.count(_.isInstanceOf[CyclicalDependencyRDD[_]]) === 1)
assert(ancestors4.count(_ == rdd3) === 1)
@@ -890,8 +883,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val ancestors6 = rdd6.getNarrowAncestors
assert(ancestors6.size === 12)
assert(ancestors6.count(_.isInstanceOf[UnionRDD[_]]) === 2)
- assert(ancestors6.count(_.isInstanceOf[MappedRDD[_, _]]) === 4)
- assert(ancestors6.count(_.isInstanceOf[FilteredRDD[_]]) === 3)
+ assert(ancestors6.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 7)
assert(ancestors6.count(_.isInstanceOf[CyclicalDependencyRDD[_]]) === 3)
}