From c3ad48603632a039a51be3d33e917105149fdd7a Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 4 Dec 2014 00:45:57 -0800 Subject: [SPARK-4719][API] Consolidate various narrow dep RDD classes with MapPartitionsRDD MappedRDD, MappedValuesRDD, FlatMappedValuesRDD, FilteredRDD, GlommedRDD, FlatMappedRDD are not necessary. They can be implemented trivially using MapPartitionsRDD. Author: Reynold Xin Closes #3578 from rxin/SPARK-4719 and squashes the following commits: eed9853 [Reynold Xin] Preserve partitioning for filter. eb1a89b [Reynold Xin] [SPARK-4719][API] Consolidate various narrow dep RDD classes with MapPartitionsRDD. --- .../scala/org/apache/spark/rdd/BinaryFileRDD.scala | 12 +++--- .../scala/org/apache/spark/rdd/FilteredRDD.scala | 35 --------------- .../scala/org/apache/spark/rdd/FlatMappedRDD.scala | 34 --------------- .../org/apache/spark/rdd/FlatMappedValuesRDD.scala | 35 --------------- .../scala/org/apache/spark/rdd/GlommedRDD.scala | 31 -------------- .../scala/org/apache/spark/rdd/MappedRDD.scala | 32 -------------- .../org/apache/spark/rdd/MappedValuesRDD.scala | 33 -------------- .../org/apache/spark/rdd/PairRDDFunctions.scala | 10 ++++- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 28 ++++++++---- .../test/scala/org/apache/spark/rdd/RDDSuite.scala | 50 +++++++++------------- 10 files changed, 55 insertions(+), 245 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala delete mode 100644 core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala delete mode 100644 core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala delete mode 100644 core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala delete mode 100644 core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala delete mode 100644 core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala index 6e66ddbdef..1f755db485 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala @@ -24,12 +24,12 @@ import org.apache.spark.input.StreamFileInputFormat import org.apache.spark.{ Partition, SparkContext } private[spark] class BinaryFileRDD[T]( - sc: SparkContext, - inputFormatClass: Class[_ <: StreamFileInputFormat[T]], - keyClass: Class[String], - valueClass: Class[T], - @transient conf: Configuration, - minPartitions: Int) + sc: SparkContext, + inputFormatClass: Class[_ <: StreamFileInputFormat[T]], + keyClass: Class[String], + valueClass: Class[T], + @transient conf: Configuration, + minPartitions: Int) extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) { override def getPartitions: Array[Partition] = { diff --git a/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala deleted file mode 100644 index 9e41b3d1e2..0000000000 --- a/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.rdd - -import scala.reflect.ClassTag - -import org.apache.spark.{Partition, TaskContext} - -private[spark] class FilteredRDD[T: ClassTag]( - prev: RDD[T], - f: T => Boolean) - extends RDD[T](prev) { - - override def getPartitions: Array[Partition] = firstParent[T].partitions - - override val partitioner = prev.partitioner // Since filter cannot change a partition's keys - - override def compute(split: Partition, context: TaskContext) = - firstParent[T].iterator(split, context).filter(f) -} diff --git a/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala deleted file mode 100644 index d8f87d4e36..0000000000 --- a/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.rdd - -import scala.reflect.ClassTag - -import org.apache.spark.{Partition, TaskContext} - -private[spark] -class FlatMappedRDD[U: ClassTag, T: ClassTag]( - prev: RDD[T], - f: T => TraversableOnce[U]) - extends RDD[U](prev) { - - override def getPartitions: Array[Partition] = firstParent[T].partitions - - override def compute(split: Partition, context: TaskContext) = - firstParent[T].iterator(split, context).flatMap(f) -} diff --git a/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala deleted file mode 100644 index 7c9023f62d..0000000000 --- a/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.rdd - -import org.apache.spark.{Partition, TaskContext} - -private[spark] -class FlatMappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => TraversableOnce[U]) - extends RDD[(K, U)](prev) { - - override def getPartitions = firstParent[Product2[K, V]].partitions - - override val partitioner = firstParent[Product2[K, V]].partitioner - - override def compute(split: Partition, context: TaskContext) = { - firstParent[Product2[K, V]].iterator(split, context).flatMap { case Product2(k, v) => - f(v).map(x => (k, x)) - } - } -} diff --git a/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala deleted file mode 100644 index f6463fa715..0000000000 --- a/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.rdd - -import scala.reflect.ClassTag - -import org.apache.spark.{Partition, TaskContext} - -private[spark] class GlommedRDD[T: ClassTag](prev: RDD[T]) - extends RDD[Array[T]](prev) { - - override def getPartitions: Array[Partition] = firstParent[T].partitions - - override def compute(split: Partition, context: TaskContext) = - Array(firstParent[T].iterator(split, context).toArray).iterator -} diff --git a/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala deleted file mode 100644 index 8d7c288593..0000000000 --- a/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.rdd - -import scala.reflect.ClassTag - -import org.apache.spark.{Partition, TaskContext} - -private[spark] -class MappedRDD[U: ClassTag, T: ClassTag](prev: RDD[T], f: T => U) - extends RDD[U](prev) { - - override def getPartitions: Array[Partition] = firstParent[T].partitions - - override def compute(split: Partition, context: TaskContext) = - firstParent[T].iterator(split, context).map(f) -} diff --git a/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala deleted file mode 100644 index a60952eee5..0000000000 --- a/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.rdd - -import org.apache.spark.{Partition, TaskContext} - -private[spark] -class MappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => U) - extends RDD[(K, U)](prev) { - - override def getPartitions = firstParent[Product2[K, U]].partitions - - override val partitioner = firstParent[Product2[K, U]].partitioner - - override def compute(split: Partition, context: TaskContext): Iterator[(K, U)] = { - firstParent[Product2[K, V]].iterator(split, context).map { pair => (pair._1, f(pair._2)) } - } -} diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index e78e576788..c43e1f2fe1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -660,7 +660,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) */ def mapValues[U](f: V => U): RDD[(K, U)] = { val cleanF = self.context.clean(f) - new MappedValuesRDD(self, cleanF) + new MapPartitionsRDD[(K, U), (K, V)](self, + (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) }, + preservesPartitioning = true) } /** @@ -669,7 +671,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) */ def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = { val cleanF = self.context.clean(f) - new FlatMappedValuesRDD(self, cleanF) + new MapPartitionsRDD[(K, U), (K, V)](self, + (context, pid, iter) => iter.flatMap { case (k, v) => + cleanF(v).map(x => (k, x)) + }, + preservesPartitioning = true) } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 8dfd952298..0bd616ec24 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -17,7 +17,7 @@ package org.apache.spark.rdd -import java.util.{Properties, Random} +import java.util.Random import scala.collection.{mutable, Map} import scala.collection.mutable.ArrayBuffer @@ -36,13 +36,12 @@ import org.apache.spark._ import org.apache.spark.Partitioner._ import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.JavaRDD -import org.apache.spark.broadcast.Broadcast import org.apache.spark.partial.BoundedDouble import org.apache.spark.partial.CountEvaluator import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.{BoundedPriorityQueue, Utils, CallSite} +import org.apache.spark.util.{BoundedPriorityQueue, Utils} import org.apache.spark.util.collection.OpenHashMap import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, BernoulliCellSampler, SamplingUtils} @@ -270,19 +269,30 @@ abstract class RDD[T: ClassTag]( /** * Return a new RDD by applying a function to all elements of this RDD. */ - def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f)) + def map[U: ClassTag](f: T => U): RDD[U] = { + val cleanF = sc.clean(f) + new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) + } /** * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */ - def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = - new FlatMappedRDD(this, sc.clean(f)) + def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = { + val cleanF = sc.clean(f) + new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF)) + } /** * Return a new RDD containing only the elements that satisfy a predicate. */ - def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f)) + def filter(f: T => Boolean): RDD[T] = { + val cleanF = sc.clean(f) + new MapPartitionsRDD[T, T]( + this, + (context, pid, iter) => iter.filter(cleanF), + preservesPartitioning = true) + } /** * Return a new RDD containing the distinct elements in this RDD. @@ -503,7 +513,9 @@ abstract class RDD[T: ClassTag]( /** * Return an RDD created by coalescing all elements within each partition into an array. */ - def glom(): RDD[Array[T]] = new GlommedRDD(this) + def glom(): RDD[Array[T]] = { + new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray)) + } /** * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 6d9be79614..46fcb80fa1 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -24,10 +24,9 @@ import scala.reflect.ClassTag import org.scalatest.FunSuite import org.apache.spark._ -import org.apache.spark.util.Utils - import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.rdd.RDDSuiteUtils._ +import org.apache.spark.util.Utils class RDDSuite extends FunSuite with SharedSparkContext { @@ -37,8 +36,8 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(nums.toLocalIterator.toList === List(1, 2, 3, 4)) val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2) assert(dups.distinct().count() === 4) - assert(dups.distinct.count === 4) // Can distinct and count be called without parentheses? - assert(dups.distinct.collect === dups.distinct().collect) + assert(dups.distinct().count === 4) // Can distinct and count be called without parentheses? + assert(dups.distinct().collect === dups.distinct().collect) assert(dups.distinct(2).collect === dups.distinct().collect) assert(nums.reduce(_ + _) === 10) assert(nums.fold(0)(_ + _) === 10) @@ -617,9 +616,9 @@ class RDDSuite extends FunSuite with SharedSparkContext { for(seed <- 1 to 5) { val splits = data.randomSplit(Array(1.0, 2.0, 3.0), seed) assert(splits.size == 3, "wrong number of splits") - assert(splits.flatMap(_.collect).sorted.toList == data.collect.toList, + assert(splits.flatMap(_.collect()).sorted.toList == data.collect().toList, "incomplete or wrong split") - val s = splits.map(_.count) + val s = splits.map(_.count()) assert(math.abs(s(0) - 100) < 50) // std = 9.13 assert(math.abs(s(1) - 200) < 50) // std = 11.55 assert(math.abs(s(2) - 300) < 50) // std = 12.25 @@ -762,8 +761,8 @@ class RDDSuite extends FunSuite with SharedSparkContext { val rdd3 = rdd2.map(_ + 1) val rdd4 = new UnionRDD(sc, List(rdd1, rdd2, rdd3)) assert(rdd4.parent(0).isInstanceOf[ParallelCollectionRDD[_]]) - assert(rdd4.parent(1).isInstanceOf[FilteredRDD[_]]) - assert(rdd4.parent(2).isInstanceOf[MappedRDD[_, _]]) + assert(rdd4.parent[Int](1) === rdd2) + assert(rdd4.parent[Int](2) === rdd3) } test("getNarrowAncestors") { @@ -781,20 +780,18 @@ class RDDSuite extends FunSuite with SharedSparkContext { // Simple dependency tree with a single branch assert(ancestors1.size === 0) assert(ancestors2.size === 2) - assert(ancestors2.count(_.isInstanceOf[ParallelCollectionRDD[_]]) === 1) - assert(ancestors2.count(_.isInstanceOf[FilteredRDD[_]]) === 1) + assert(ancestors2.count(_ === rdd1) === 1) + assert(ancestors2.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 1) assert(ancestors3.size === 5) - assert(ancestors3.count(_.isInstanceOf[ParallelCollectionRDD[_]]) === 1) - assert(ancestors3.count(_.isInstanceOf[FilteredRDD[_]]) === 2) - assert(ancestors3.count(_.isInstanceOf[MappedRDD[_, _]]) === 2) + assert(ancestors3.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 4) // Any ancestors before the shuffle are not considered assert(ancestors4.size === 0) assert(ancestors4.count(_.isInstanceOf[ShuffledRDD[_, _, _]]) === 0) assert(ancestors5.size === 3) assert(ancestors5.count(_.isInstanceOf[ShuffledRDD[_, _, _]]) === 1) - assert(ancestors5.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 0) - assert(ancestors5.count(_.isInstanceOf[MappedValuesRDD[_, _, _]]) === 2) + assert(ancestors5.count(_ === rdd3) === 0) + assert(ancestors5.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 2) } test("getNarrowAncestors with multiple parents") { @@ -815,16 +812,16 @@ class RDDSuite extends FunSuite with SharedSparkContext { // Simple dependency tree with multiple branches assert(ancestors6.size === 3) assert(ancestors6.count(_.isInstanceOf[ParallelCollectionRDD[_]]) === 2) - assert(ancestors6.count(_.isInstanceOf[MappedRDD[_, _]]) === 1) + assert(ancestors6.count(_ === rdd2) === 1) assert(ancestors7.size === 5) assert(ancestors7.count(_.isInstanceOf[ParallelCollectionRDD[_]]) === 3) - assert(ancestors7.count(_.isInstanceOf[MappedRDD[_, _]]) === 1) - assert(ancestors7.count(_.isInstanceOf[FilteredRDD[_]]) === 1) + assert(ancestors7.count(_ === rdd2) === 1) + assert(ancestors7.count(_ === rdd3) === 1) // Dependency tree with duplicate nodes (e.g. rdd1 should not be reported twice) assert(ancestors8.size === 7) - assert(ancestors8.count(_.isInstanceOf[MappedRDD[_, _]]) === 1) - assert(ancestors8.count(_.isInstanceOf[FilteredRDD[_]]) === 1) + assert(ancestors8.count(_ === rdd2) === 1) + assert(ancestors8.count(_ === rdd3) === 1) assert(ancestors8.count(_.isInstanceOf[UnionRDD[_]]) === 2) assert(ancestors8.count(_.isInstanceOf[ParallelCollectionRDD[_]]) === 3) assert(ancestors8.count(_ == rdd1) === 1) @@ -834,7 +831,6 @@ class RDDSuite extends FunSuite with SharedSparkContext { // Any ancestors before the shuffle are not considered assert(ancestors9.size === 2) assert(ancestors9.count(_.isInstanceOf[CoGroupedRDD[_]]) === 1) - assert(ancestors9.count(_.isInstanceOf[MappedValuesRDD[_, _, _]]) === 1) } /** @@ -868,12 +864,10 @@ class RDDSuite extends FunSuite with SharedSparkContext { val ancestors3 = rdd3.getNarrowAncestors val ancestors4 = rdd4.getNarrowAncestors assert(ancestors3.size === 4) - assert(ancestors3.count(_.isInstanceOf[MappedRDD[_, _]]) === 2) - assert(ancestors3.count(_.isInstanceOf[FilteredRDD[_]]) === 2) + assert(ancestors3.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 4) assert(ancestors3.count(_ == rdd3) === 0) assert(ancestors4.size === 4) - assert(ancestors4.count(_.isInstanceOf[MappedRDD[_, _]]) === 2) - assert(ancestors4.count(_.isInstanceOf[FilteredRDD[_]]) === 1) + assert(ancestors4.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 3) assert(ancestors4.count(_.isInstanceOf[CyclicalDependencyRDD[_]]) === 1) assert(ancestors4.count(_ == rdd3) === 1) assert(ancestors4.count(_ == rdd4) === 0) @@ -881,8 +875,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { // Cycles that do not involve the root val ancestors5 = rdd5.getNarrowAncestors assert(ancestors5.size === 6) - assert(ancestors5.count(_.isInstanceOf[MappedRDD[_, _]]) === 3) - assert(ancestors5.count(_.isInstanceOf[FilteredRDD[_]]) === 2) + assert(ancestors5.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 5) assert(ancestors5.count(_.isInstanceOf[CyclicalDependencyRDD[_]]) === 1) assert(ancestors4.count(_ == rdd3) === 1) @@ -890,8 +883,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { val ancestors6 = rdd6.getNarrowAncestors assert(ancestors6.size === 12) assert(ancestors6.count(_.isInstanceOf[UnionRDD[_]]) === 2) - assert(ancestors6.count(_.isInstanceOf[MappedRDD[_, _]]) === 4) - assert(ancestors6.count(_.isInstanceOf[FilteredRDD[_]]) === 3) + assert(ancestors6.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 7) assert(ancestors6.count(_.isInstanceOf[CyclicalDependencyRDD[_]]) === 3) } -- cgit v1.2.3