aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei.zaharia@gmail.com>2013-07-23 13:22:27 -0700
committerMatei Zaharia <matei.zaharia@gmail.com>2013-07-23 13:22:27 -0700
commitf369e0e51b968acc4ea9209a54d8b3b4aaf98cd9 (patch)
tree66abad2e4b9e2aa09d66da5212ca364692f684f8 /core
parent401aac8b189aa6b72ad020ba894ca57b948c53a1 (diff)
parentefd6418c1b99c1ecc2b0a4c72e6430eea4d86260 (diff)
downloadspark-f369e0e51b968acc4ea9209a54d8b3b4aaf98cd9.tar.gz
spark-f369e0e51b968acc4ea9209a54d8b3b4aaf98cd9.tar.bz2
spark-f369e0e51b968acc4ea9209a54d8b3b4aaf98cd9.zip
Merge pull request #720 from ooyala/2013-07/persistent-rdds-api
Add a public method getCachedRdds to SparkContext
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/SparkContext.scala8
-rw-r--r--core/src/main/scala/spark/util/TimeStampedHashMap.scala3
-rw-r--r--core/src/test/scala/spark/SparkContextInfoSuite.scala60
3 files changed, 70 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index c01e315e35..24ba605646 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -546,6 +546,12 @@ class SparkContext(
StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this)
}
+ /**
+ * Returns an immutable map of RDDs that have marked themselves as persistent via cache() call.
+ * Note that this does not necessarily mean the caching or computation was successful.
+ */
+ def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap
+
def getStageInfo: Map[Stage,StageInfo] = {
dagScheduler.stageToInfos
}
@@ -580,7 +586,7 @@ class SparkContext(
case null | "file" =>
if (SparkHadoopUtil.isYarnMode()) {
logWarning("local jar specified as parameter to addJar under Yarn mode")
- return
+ return
}
env.httpFileServer.addJar(new File(uri.getPath))
case _ => path
diff --git a/core/src/main/scala/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
index cc7909194a..07772a0afb 100644
--- a/core/src/main/scala/spark/util/TimeStampedHashMap.scala
+++ b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
@@ -20,6 +20,7 @@ package spark.util
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConversions
import scala.collection.mutable.Map
+import scala.collection.immutable
import spark.scheduler.MapStatus
/**
@@ -99,6 +100,8 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() with spark.Logging {
}
}
+ def toMap: immutable.Map[A, B] = iterator.toMap
+
/**
* Removes old key-value pairs that have timestamp earlier than `threshTime`
*/
diff --git a/core/src/test/scala/spark/SparkContextInfoSuite.scala b/core/src/test/scala/spark/SparkContextInfoSuite.scala
new file mode 100644
index 0000000000..6d50bf5e1b
--- /dev/null
+++ b/core/src/test/scala/spark/SparkContextInfoSuite.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark
+
+import org.scalatest.FunSuite
+import spark.SparkContext._
+
+class SparkContextInfoSuite extends FunSuite with LocalSparkContext {
+ test("getPersistentRDDs only returns RDDs that are marked as cached") {
+ sc = new SparkContext("local", "test")
+ assert(sc.getPersistentRDDs.isEmpty === true)
+
+ val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2)
+ assert(sc.getPersistentRDDs.isEmpty === true)
+
+ rdd.cache()
+ assert(sc.getPersistentRDDs.size === 1)
+ assert(sc.getPersistentRDDs.values.head === rdd)
+ }
+
+ test("getPersistentRDDs returns an immutable map") {
+ sc = new SparkContext("local", "test")
+ val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
+
+ val myRdds = sc.getPersistentRDDs
+ assert(myRdds.size === 1)
+ assert(myRdds.values.head === rdd1)
+
+ val rdd2 = sc.makeRDD(Array(5, 6, 7, 8), 1).cache()
+
+ // getPersistentRDDs should have 2 RDDs, but myRdds should not change
+ assert(sc.getPersistentRDDs.size === 2)
+ assert(myRdds.size === 1)
+ }
+
+ test("getRDDStorageInfo only reports on RDDs that actually persist data") {
+ sc = new SparkContext("local", "test")
+ val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
+
+ assert(sc.getRDDStorageInfo.size === 0)
+
+ rdd.collect()
+ assert(sc.getRDDStorageInfo.size === 1)
+ }
+} \ No newline at end of file