aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorErik Erlandson <eerlands@redhat.com>2014-08-07 23:45:16 -0700
committerReynold Xin <rxin@apache.org>2014-08-07 23:45:16 -0700
commit9a54de16ed9de536e0436d532c587384e1ea0af6 (patch)
tree3f4414cf9c211d45c639435e2a805ccd483e122d /core
parent9de6a42bb34ea8963225ce90f1a45adcfee38b58 (diff)
downloadspark-9a54de16ed9de536e0436d532c587384e1ea0af6.tar.gz
spark-9a54de16ed9de536e0436d532c587384e1ea0af6.tar.bz2
spark-9a54de16ed9de536e0436d532c587384e1ea0af6.zip
[SPARK-2911]: provide rdd.parent[T](j) to obtain jth parent RDD
Author: Erik Erlandson <eerlands@redhat.com> Closes #1841 from erikerlandson/spark-2911-pr and squashes the following commits: 4699e2f [Erik Erlandson] [SPARK-2911]: provide rdd.parent[T](j) to obtain jth parent RDD
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala10
2 files changed, 15 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 0159003c88..19e10bd046 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1233,6 +1233,11 @@ abstract class RDD[T: ClassTag](
dependencies.head.rdd.asInstanceOf[RDD[U]]
}
+ /** Returns the jth parent RDD: e.g. rdd.parent[T](0) is equivalent to rdd.firstParent[T] */
+ protected[spark] def parent[U: ClassTag](j: Int) = {
+ dependencies(j).rdd.asInstanceOf[RDD[U]]
+ }
+
/** The [[org.apache.spark.SparkContext]] that this RDD was created on. */
def context = sc
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 4a7dc8dca2..926d4fecb5 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -726,6 +726,16 @@ class RDDSuite extends FunSuite with SharedSparkContext {
jrdd.rdd.retag.collect()
}
+ test("parent method") {
+ val rdd1 = sc.parallelize(1 to 10, 2)
+ val rdd2 = rdd1.filter(_ % 2 == 0)
+ val rdd3 = rdd2.map(_ + 1)
+ val rdd4 = new UnionRDD(sc, List(rdd1, rdd2, rdd3))
+ assert(rdd4.parent(0).isInstanceOf[ParallelCollectionRDD[_]])
+ assert(rdd4.parent(1).isInstanceOf[FilteredRDD[_]])
+ assert(rdd4.parent(2).isInstanceOf[MappedRDD[_, _]])
+ }
+
test("getNarrowAncestors") {
val rdd1 = sc.parallelize(1 to 100, 4)
val rdd2 = rdd1.filter(_ % 2 == 0).map(_ + 1)