diff options
author | Matei Zaharia <matei.zaharia@gmail.com> | 2013-07-23 13:22:27 -0700 |
---|---|---|
committer | Matei Zaharia <matei.zaharia@gmail.com> | 2013-07-23 13:22:27 -0700 |
commit | f369e0e51b968acc4ea9209a54d8b3b4aaf98cd9 (patch) | |
tree | 66abad2e4b9e2aa09d66da5212ca364692f684f8 /core | |
parent | 401aac8b189aa6b72ad020ba894ca57b948c53a1 (diff) | |
parent | efd6418c1b99c1ecc2b0a4c72e6430eea4d86260 (diff) | |
download | spark-f369e0e51b968acc4ea9209a54d8b3b4aaf98cd9.tar.gz spark-f369e0e51b968acc4ea9209a54d8b3b4aaf98cd9.tar.bz2 spark-f369e0e51b968acc4ea9209a54d8b3b4aaf98cd9.zip |
Merge pull request #720 from ooyala/2013-07/persistent-rdds-api
Add a public method getCachedRdds to SparkContext
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/SparkContext.scala | 8 | ||||
-rw-r--r-- | core/src/main/scala/spark/util/TimeStampedHashMap.scala | 3 | ||||
-rw-r--r-- | core/src/test/scala/spark/SparkContextInfoSuite.scala | 60 |
3 files changed, 70 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index c01e315e35..24ba605646 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -546,6 +546,12 @@ class SparkContext( StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this) } + /** + * Returns an immutable map of RDDs that have marked themselves as persistent via cache() call. + * Note that this does not necessarily mean the caching or computation was successful. + */ + def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap + def getStageInfo: Map[Stage,StageInfo] = { dagScheduler.stageToInfos } @@ -580,7 +586,7 @@ class SparkContext( case null | "file" => if (SparkHadoopUtil.isYarnMode()) { logWarning("local jar specified as parameter to addJar under Yarn mode") - return + return } env.httpFileServer.addJar(new File(uri.getPath)) case _ => path diff --git a/core/src/main/scala/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/spark/util/TimeStampedHashMap.scala index cc7909194a..07772a0afb 100644 --- a/core/src/main/scala/spark/util/TimeStampedHashMap.scala +++ b/core/src/main/scala/spark/util/TimeStampedHashMap.scala @@ -20,6 +20,7 @@ package spark.util import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConversions import scala.collection.mutable.Map +import scala.collection.immutable import spark.scheduler.MapStatus /** @@ -99,6 +100,8 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() with spark.Logging { } } + def toMap: immutable.Map[A, B] = iterator.toMap + /** * Removes old key-value pairs that have timestamp earlier than `threshTime` */ diff --git a/core/src/test/scala/spark/SparkContextInfoSuite.scala b/core/src/test/scala/spark/SparkContextInfoSuite.scala new file mode 100644 index 0000000000..6d50bf5e1b --- /dev/null +++ b/core/src/test/scala/spark/SparkContextInfoSuite.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark + +import org.scalatest.FunSuite +import spark.SparkContext._ + +class SparkContextInfoSuite extends FunSuite with LocalSparkContext { + test("getPersistentRDDs only returns RDDs that are marked as cached") { + sc = new SparkContext("local", "test") + assert(sc.getPersistentRDDs.isEmpty === true) + + val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2) + assert(sc.getPersistentRDDs.isEmpty === true) + + rdd.cache() + assert(sc.getPersistentRDDs.size === 1) + assert(sc.getPersistentRDDs.values.head === rdd) + } + + test("getPersistentRDDs returns an immutable map") { + sc = new SparkContext("local", "test") + val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).cache() + + val myRdds = sc.getPersistentRDDs + assert(myRdds.size === 1) + assert(myRdds.values.head === rdd1) + + val rdd2 = sc.makeRDD(Array(5, 6, 7, 8), 1).cache() + + // getPersistentRDDs should have 2 RDDs, but myRdds should not change + assert(sc.getPersistentRDDs.size === 2) + assert(myRdds.size === 1) + } + + test("getRDDStorageInfo only reports on RDDs that actually persist data") { + sc = new SparkContext("local", "test") + val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache() + + assert(sc.getRDDStorageInfo.size === 0) + + rdd.collect() + assert(sc.getRDDStorageInfo.size === 1) + } +}
\ No newline at end of file |