diff options
author | Reynold Xin <rxin@cs.berkeley.edu> | 2013-05-01 16:07:44 -0700 |
---|---|---|
committer | Reynold Xin <rxin@cs.berkeley.edu> | 2013-05-01 16:07:44 -0700 |
commit | 3227ec8edde05cff27c1f9de8861d18b3cda1aae (patch) | |
tree | 614bd907ddb24bcfbf457b6b7b4f9df99d578e33 /core | |
parent | 848156273178bed5763bcbc91baa788bd4a57f6e (diff) | |
download | spark-3227ec8edde05cff27c1f9de8861d18b3cda1aae.tar.gz spark-3227ec8edde05cff27c1f9de8861d18b3cda1aae.tar.bz2 spark-3227ec8edde05cff27c1f9de8861d18b3cda1aae.zip |
Cleaned up Ram's code. Moved SparkContext.remove to RDD.unpersist.
Also updated unit tests to make sure they are properly testing for
concurrency.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 17 | ||||
-rw-r--r-- | core/src/main/scala/spark/SparkContext.scala | 25 | ||||
-rw-r--r-- | core/src/main/scala/spark/storage/BlockManagerUI.scala | 4 | ||||
-rw-r--r-- | core/src/test/scala/spark/DistributedSuite.scala | 30 | ||||
-rw-r--r-- | core/src/test/scala/spark/RDDSuite.scala | 27 |
5 files changed, 68 insertions, 35 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 09e52ebf3e..c77f9915c0 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -140,6 +140,23 @@ abstract class RDD[T: ClassManifest]( /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): RDD[T] = persist() + /** Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. */ + def unpersist(): RDD[T] = { + logInfo("Removing RDD " + id + " from persistence list") + val rddBlockPrefix = "rdd_" + id + "_" + // Get the list of blocks in block manager, and remove ones that are part of this RDD. + // The runtime complexity is linear to the number of blocks persisted in the cluster. + // It could be expensive if the cluster is large and has a lot of blocks persisted. + sc.getExecutorStorageStatus().flatMap(_.blocks).foreach { case(blockId, status) => + if (blockId.startsWith(rddBlockPrefix)) { + sc.env.blockManager.master.removeBlock(blockId) + } + } + sc.persistentRdds.remove(id) + storageLevel = StorageLevel.NONE + this + } + /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */ def getStorageLevel = storageLevel diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 8bee1d65a2..b686c595b8 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -100,7 +100,7 @@ class SparkContext( private[spark] val addedJars = HashMap[String, Long]() // Keeps track of all persisted RDDs - private[spark] val persistentRdds: ConcurrentMap[Int, RDD[_]] = new ConcurrentHashMap[Int, RDD[_]]() + private[spark] val persistentRdds: ConcurrentMap[Int, RDD[_]] = new ConcurrentHashMap[Int, RDD[_]] private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup) @@ -508,36 +508,21 @@ class SparkContext( * Return information about what RDDs are cached, if they are in mem or on disk, how much space * they take, etc. */ - def getRDDStorageInfo : Array[RDDInfo] = { - StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this) + def getRDDStorageInfo(): Array[RDDInfo] = { + StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus(), this) } - def getStageInfo: Map[Stage,StageInfo] = { + def getStageInfo(): Map[Stage,StageInfo] = { dagScheduler.stageToInfos } /** * Return information about blocks stored in all of the slaves */ - def getExecutorStorageStatus : Array[StorageStatus] = { + def getExecutorStorageStatus(): Array[StorageStatus] = { env.blockManager.master.getStorageStatus } - def removeRDD(id: Int): Unit = { - val storageStatusList = getExecutorStorageStatus - val groupedRddBlocks = storageStatusList.flatMap(_.blocks).toMap - logInfo("RDD to remove: " + id) - groupedRddBlocks.foreach(x => { - val k = x._1.substring(0,x._1.lastIndexOf('_')) - val rdd_id = "rdd_" + id - logInfo("RDD to check: " + rdd_id) - if(k.equals(rdd_id)) { - env.blockManager.master.removeBlock(x._1) - } - }) - persistentRdds.remove(id) - } - /** * Clear the job's list of files added by `addFile` so that they do not get downloaded to * any new nodes. diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala index 07da572044..c9e4519efe 100644 --- a/core/src/main/scala/spark/storage/BlockManagerUI.scala +++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala @@ -45,7 +45,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, path("") { completeWith { // Request the current storage status from the Master - val storageStatusList = sc.getExecutorStorageStatus + val storageStatusList = sc.getExecutorStorageStatus() // Calculate macro-level statistics val maxMem = storageStatusList.map(_.maxMem).reduce(_+_) val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_) @@ -60,7 +60,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, parameter("id") { id => completeWith { val prefix = "rdd_" + id.toString - val storageStatusList = sc.getExecutorStorageStatus + val storageStatusList = sc.getExecutorStorageStatus() val filteredStorageStatusList = StorageUtils. filterStorageStatusByPrefix(storageStatusList, prefix) val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index c7f6ab3133..ab3e197035 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -3,8 +3,10 @@ package spark import network.ConnectionManagerId import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.Timeouts._ import org.scalatest.matchers.ShouldMatchers import org.scalatest.prop.Checkers +import org.scalatest.time.{Span, Millis} import org.scalacheck.Arbitrary._ import org.scalacheck.Gen import org.scalacheck.Prop._ @@ -252,24 +254,36 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter assert(data2.count === 2) } } - - test("remove RDDs cleanly") { + + test("unpersist RDDs") { DistributedSuite.amMaster = true sc = new SparkContext("local-cluster[3,1,512]", "test") val data = sc.parallelize(Seq(true, false, false, false), 4) data.persist(StorageLevel.MEMORY_ONLY_2) data.count - sc.removeRDD(data.id) + assert(sc.persistentRdds.isEmpty == false) + data.unpersist() assert(sc.persistentRdds.isEmpty == true) + + failAfter(Span(3000, Millis)) { + try { + while (! sc.getRDDStorageInfo.isEmpty) { + Thread.sleep(200) + } + } catch { + case e: Exception => + // Do nothing. We might see exceptions because block manager + // is racing this thread to remove entries from the driver. + } + } assert(sc.getRDDStorageInfo.isEmpty == true) - } } object DistributedSuite { // Indicates whether this JVM is marked for failure. var mark = false - + // Set by test to remember if we are in the driver program so we can assert // that we are not. var amMaster = false @@ -286,9 +300,9 @@ object DistributedSuite { // Act like an identity function, but if mark was set to true previously, fail, // crashing the entire JVM. def failOnMarkedIdentity(item: Boolean): Boolean = { - if (mark) { + if (mark) { System.exit(42) - } + } item - } + } } diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 88b7ab9f52..cee6312572 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -2,6 +2,8 @@ package spark import scala.collection.mutable.HashMap import org.scalatest.FunSuite +import org.scalatest.concurrent.Timeouts._ +import org.scalatest.time.{Span, Millis} import spark.SparkContext._ import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD, ShuffledRDD} @@ -100,11 +102,26 @@ class RDDSuite extends FunSuite with LocalSparkContext { assert(rdd.collect().toList === List(1, 2, 3, 4)) } - test("remove RDD") { - sc = new SparkContext("local", "test") - val rdd = sc.makeRDD(Array(1,2,3,4), 2).cache() - sc.removeRDD(rdd.id) - assert(sc.persistentRdds.empty == true) + test("unpersist RDD") { + sc = new SparkContext("local", "test") + val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache() + rdd.count + assert(sc.persistentRdds.isEmpty == false) + rdd.unpersist() + assert(sc.persistentRdds.isEmpty == true) + + failAfter(Span(3000, Millis)) { + try { + while (! sc.getRDDStorageInfo.isEmpty) { + Thread.sleep(200) + } + } catch { + case e: Exception => + // Do nothing. We might see exceptions because block manager + // is racing this thread to remove entries from the driver. + } + } + assert(sc.getRDDStorageInfo.isEmpty == true) } test("caching with failures") { |