aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorEvan Chan <ev@ooyala.com>2013-07-21 18:07:19 -0700
committerEvan Chan <ev@ooyala.com>2013-07-21 18:26:14 -0700
commit0337d88321f3681009de548ce10ba7e0ca8f1a58 (patch)
tree51fb93d0731096c1ea14405a66a9e0ebbfdc3733 /core
parentc40f0f21f10a644e39c8c4b53cda6b5a8eed1741 (diff)
downloadspark-0337d88321f3681009de548ce10ba7e0ca8f1a58.tar.gz
spark-0337d88321f3681009de548ce10ba7e0ca8f1a58.tar.bz2
spark-0337d88321f3681009de548ce10ba7e0ca8f1a58.zip
Add a public method getCachedRdds to SparkContext
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/SparkContext.scala8
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala6
2 files changed, 13 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index c01e315e35..1b46665d2c 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -546,6 +546,12 @@ class SparkContext(
StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this)
}
+ /**
+ * Returns an immutable map of RDDs that have marked themselves as cached via cache() call.
+ * Note that this does not necessarily mean the caching or computation was successful.
+ */
+ def getCachedRDDs: Map[Int, RDD[_]] = persistentRdds.asInstanceOf[Map[Int, RDD[_]]]
+
def getStageInfo: Map[Stage,StageInfo] = {
dagScheduler.stageToInfos
}
@@ -580,7 +586,7 @@ class SparkContext(
case null | "file" =>
if (SparkHadoopUtil.isYarnMode()) {
logWarning("local jar specified as parameter to addJar under Yarn mode")
- return
+ return
}
env.httpFileServer.addJar(new File(uri.getPath))
case _ => path
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index cbddf4e523..ff2dcd72d8 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -90,15 +90,19 @@ class RDDSuite extends FunSuite with SharedSparkContext {
}
test("basic caching") {
+ val origCachedRdds = sc.getCachedRDDs.size
val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
assert(rdd.collect().toList === List(1, 2, 3, 4))
assert(rdd.collect().toList === List(1, 2, 3, 4))
assert(rdd.collect().toList === List(1, 2, 3, 4))
+ // Should only result in one cached RDD
+ assert(sc.getCachedRDDs.size === origCachedRdds + 1)
}
test("caching with failures") {
val onlySplit = new Partition { override def index: Int = 0 }
var shouldFail = true
+ val origCachedRdds = sc.getCachedRDDs.size
val rdd = new RDD[Int](sc, Nil) {
override def getPartitions: Array[Partition] = Array(onlySplit)
override val getDependencies = List[Dependency[_]]()
@@ -110,12 +114,14 @@ class RDDSuite extends FunSuite with SharedSparkContext {
}
}
}.cache()
+ assert(sc.getCachedRDDs.size === origCachedRdds + 1)
val thrown = intercept[Exception]{
rdd.collect()
}
assert(thrown.getMessage.contains("injected failure"))
shouldFail = false
assert(rdd.collect().toList === List(1, 2, 3, 4))
+ assert(sc.getCachedRDDs.size === origCachedRdds + 1)
}
test("empty RDD") {