aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2013-05-01 16:07:44 -0700
committerReynold Xin <rxin@cs.berkeley.edu>2013-05-01 16:07:44 -0700
commit3227ec8edde05cff27c1f9de8861d18b3cda1aae (patch)
tree614bd907ddb24bcfbf457b6b7b4f9df99d578e33 /core
parent848156273178bed5763bcbc91baa788bd4a57f6e (diff)
downloadspark-3227ec8edde05cff27c1f9de8861d18b3cda1aae.tar.gz
spark-3227ec8edde05cff27c1f9de8861d18b3cda1aae.tar.bz2
spark-3227ec8edde05cff27c1f9de8861d18b3cda1aae.zip
Cleaned up Ram's code. Moved SparkContext.remove to RDD.unpersist.
Also updated unit tests to make sure they are properly testing for concurrency.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/RDD.scala17
-rw-r--r--core/src/main/scala/spark/SparkContext.scala25
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerUI.scala4
-rw-r--r--core/src/test/scala/spark/DistributedSuite.scala30
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala27
5 files changed, 68 insertions, 35 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 09e52ebf3e..c77f9915c0 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -140,6 +140,23 @@ abstract class RDD[T: ClassManifest](
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): RDD[T] = persist()
+ /** Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. */
+ def unpersist(): RDD[T] = {
+ logInfo("Removing RDD " + id + " from persistence list")
+ val rddBlockPrefix = "rdd_" + id + "_"
+ // Get the list of blocks in block manager, and remove ones that are part of this RDD.
+ // The runtime complexity is linear to the number of blocks persisted in the cluster.
+ // It could be expensive if the cluster is large and has a lot of blocks persisted.
+ sc.getExecutorStorageStatus().flatMap(_.blocks).foreach { case(blockId, status) =>
+ if (blockId.startsWith(rddBlockPrefix)) {
+ sc.env.blockManager.master.removeBlock(blockId)
+ }
+ }
+ sc.persistentRdds.remove(id)
+ storageLevel = StorageLevel.NONE
+ this
+ }
+
/** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */
def getStorageLevel = storageLevel
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 8bee1d65a2..b686c595b8 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -100,7 +100,7 @@ class SparkContext(
private[spark] val addedJars = HashMap[String, Long]()
// Keeps track of all persisted RDDs
- private[spark] val persistentRdds: ConcurrentMap[Int, RDD[_]] = new ConcurrentHashMap[Int, RDD[_]]()
+ private[spark] val persistentRdds: ConcurrentMap[Int, RDD[_]] = new ConcurrentHashMap[Int, RDD[_]]
private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup)
@@ -508,36 +508,21 @@ class SparkContext(
* Return information about what RDDs are cached, if they are in mem or on disk, how much space
* they take, etc.
*/
- def getRDDStorageInfo : Array[RDDInfo] = {
- StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this)
+ def getRDDStorageInfo(): Array[RDDInfo] = {
+ StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus(), this)
}
- def getStageInfo: Map[Stage,StageInfo] = {
+ def getStageInfo(): Map[Stage,StageInfo] = {
dagScheduler.stageToInfos
}
/**
* Return information about blocks stored in all of the slaves
*/
- def getExecutorStorageStatus : Array[StorageStatus] = {
+ def getExecutorStorageStatus(): Array[StorageStatus] = {
env.blockManager.master.getStorageStatus
}
- def removeRDD(id: Int): Unit = {
- val storageStatusList = getExecutorStorageStatus
- val groupedRddBlocks = storageStatusList.flatMap(_.blocks).toMap
- logInfo("RDD to remove: " + id)
- groupedRddBlocks.foreach(x => {
- val k = x._1.substring(0,x._1.lastIndexOf('_'))
- val rdd_id = "rdd_" + id
- logInfo("RDD to check: " + rdd_id)
- if(k.equals(rdd_id)) {
- env.blockManager.master.removeBlock(x._1)
- }
- })
- persistentRdds.remove(id)
- }
-
/**
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
* any new nodes.
diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala
index 07da572044..c9e4519efe 100644
--- a/core/src/main/scala/spark/storage/BlockManagerUI.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerUI.scala
@@ -45,7 +45,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef,
path("") {
completeWith {
// Request the current storage status from the Master
- val storageStatusList = sc.getExecutorStorageStatus
+ val storageStatusList = sc.getExecutorStorageStatus()
// Calculate macro-level statistics
val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
@@ -60,7 +60,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef,
parameter("id") { id =>
completeWith {
val prefix = "rdd_" + id.toString
- val storageStatusList = sc.getExecutorStorageStatus
+ val storageStatusList = sc.getExecutorStorageStatus()
val filteredStorageStatusList = StorageUtils.
filterStorageStatusByPrefix(storageStatusList, prefix)
val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index c7f6ab3133..ab3e197035 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -3,8 +3,10 @@ package spark
import network.ConnectionManagerId
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Timeouts._
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.prop.Checkers
+import org.scalatest.time.{Span, Millis}
import org.scalacheck.Arbitrary._
import org.scalacheck.Gen
import org.scalacheck.Prop._
@@ -252,24 +254,36 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
assert(data2.count === 2)
}
}
-
- test("remove RDDs cleanly") {
+
+ test("unpersist RDDs") {
DistributedSuite.amMaster = true
sc = new SparkContext("local-cluster[3,1,512]", "test")
val data = sc.parallelize(Seq(true, false, false, false), 4)
data.persist(StorageLevel.MEMORY_ONLY_2)
data.count
- sc.removeRDD(data.id)
+ assert(sc.persistentRdds.isEmpty == false)
+ data.unpersist()
assert(sc.persistentRdds.isEmpty == true)
+
+ failAfter(Span(3000, Millis)) {
+ try {
+ while (! sc.getRDDStorageInfo.isEmpty) {
+ Thread.sleep(200)
+ }
+ } catch {
+ case e: Exception =>
+ // Do nothing. We might see exceptions because block manager
+ // is racing this thread to remove entries from the driver.
+ }
+ }
assert(sc.getRDDStorageInfo.isEmpty == true)
-
}
}
object DistributedSuite {
// Indicates whether this JVM is marked for failure.
var mark = false
-
+
// Set by test to remember if we are in the driver program so we can assert
// that we are not.
var amMaster = false
@@ -286,9 +300,9 @@ object DistributedSuite {
// Act like an identity function, but if mark was set to true previously, fail,
// crashing the entire JVM.
def failOnMarkedIdentity(item: Boolean): Boolean = {
- if (mark) {
+ if (mark) {
System.exit(42)
- }
+ }
item
- }
+ }
}
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 88b7ab9f52..cee6312572 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -2,6 +2,8 @@ package spark
import scala.collection.mutable.HashMap
import org.scalatest.FunSuite
+import org.scalatest.concurrent.Timeouts._
+import org.scalatest.time.{Span, Millis}
import spark.SparkContext._
import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD, ShuffledRDD}
@@ -100,11 +102,26 @@ class RDDSuite extends FunSuite with LocalSparkContext {
assert(rdd.collect().toList === List(1, 2, 3, 4))
}
- test("remove RDD") {
- sc = new SparkContext("local", "test")
- val rdd = sc.makeRDD(Array(1,2,3,4), 2).cache()
- sc.removeRDD(rdd.id)
- assert(sc.persistentRdds.empty == true)
+ test("unpersist RDD") {
+ sc = new SparkContext("local", "test")
+ val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
+ rdd.count
+ assert(sc.persistentRdds.isEmpty == false)
+ rdd.unpersist()
+ assert(sc.persistentRdds.isEmpty == true)
+
+ failAfter(Span(3000, Millis)) {
+ try {
+ while (! sc.getRDDStorageInfo.isEmpty) {
+ Thread.sleep(200)
+ }
+ } catch {
+ case e: Exception =>
+ // Do nothing. We might see exceptions because block manager
+ // is racing this thread to remove entries from the driver.
+ }
+ }
+ assert(sc.getRDDStorageInfo.isEmpty == true)
}
test("caching with failures") {