diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-05-01 22:45:10 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-05-01 22:45:10 -0700 |
commit | 9abcbcc76de0df1b0cd857303560eb75aa94ab64 (patch) | |
tree | ba52849d23cb97eafb52f97c131821364c2e2494 | |
parent | 207afe4088219a0c7350b3f80eb60e86c97e140f (diff) | |
parent | 4a318774088f829fe54c3ef0b5f565a845631b4e (diff) | |
download | spark-9abcbcc76de0df1b0cd857303560eb75aa94ab64.tar.gz spark-9abcbcc76de0df1b0cd857303560eb75aa94ab64.tar.bz2 spark-9abcbcc76de0df1b0cd857303560eb75aa94ab64.zip |
Merge pull request #591 from rxin/removerdd
RDD.unpersist: probably the most desired feature of Spark
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 14 | ||||
-rw-r--r-- | core/src/main/scala/spark/SparkContext.scala | 49 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/java/JavaRDD.scala | 16 | ||||
-rw-r--r-- | core/src/main/scala/spark/storage/BlockManagerMaster.scala | 16 | ||||
-rw-r--r-- | core/src/test/scala/spark/DistributedSuite.scala | 34 | ||||
-rw-r--r-- | core/src/test/scala/spark/RDDSuite.scala | 24 | ||||
-rw-r--r-- | core/src/test/scala/spark/storage/BlockManagerSuite.scala | 25 |
7 files changed, 144 insertions, 34 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 09e52ebf3e..fd14ef17f1 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -107,7 +107,7 @@ abstract class RDD[T: ClassManifest]( // ======================================================================= /** A unique ID for this RDD (within its SparkContext). */ - val id = sc.newRddId() + val id: Int = sc.newRddId() /** A friendly name for this RDD */ var name: String = null @@ -120,7 +120,8 @@ abstract class RDD[T: ClassManifest]( /** * Set this RDD's storage level to persist its values across operations after the first time - * it is computed. Can only be called once on each RDD. + * it is computed. This can only be used to assign a new storage level if the RDD does not + * have a storage level set yet.. */ def persist(newLevel: StorageLevel): RDD[T] = { // TODO: Handle changes of StorageLevel @@ -140,6 +141,15 @@ abstract class RDD[T: ClassManifest]( /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): RDD[T] = persist() + /** Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. */ + def unpersist(): RDD[T] = { + logInfo("Removing RDD " + id + " from persistence list") + sc.env.blockManager.master.removeRdd(id) + sc.persistentRdds.remove(id) + storageLevel = StorageLevel.NONE + this + } + /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */ def getStorageLevel = storageLevel diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 5f5ec0b0f4..2ae4ad8659 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -1,47 +1,50 @@ package spark import java.io._ -import java.util.concurrent.atomic.AtomicInteger import java.net.URI +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger +import scala.collection.JavaConversions._ import scala.collection.Map import scala.collection.generic.Growable -import scala.collection.mutable.HashMap -import scala.collection.JavaConversions._ +import scala.collection.mutable.{ConcurrentMap, HashMap} + +import akka.actor.Actor._ -import org.apache.hadoop.fs.Path import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.mapred.InputFormat -import org.apache.hadoop.mapred.SequenceFileInputFormat -import org.apache.hadoop.io.Writable -import org.apache.hadoop.io.IntWritable -import org.apache.hadoop.io.LongWritable -import org.apache.hadoop.io.FloatWritable -import org.apache.hadoop.io.DoubleWritable +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.ArrayWritable import org.apache.hadoop.io.BooleanWritable import org.apache.hadoop.io.BytesWritable -import org.apache.hadoop.io.ArrayWritable +import org.apache.hadoop.io.DoubleWritable +import org.apache.hadoop.io.FloatWritable +import org.apache.hadoop.io.IntWritable +import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.NullWritable import org.apache.hadoop.io.Text +import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.FileInputFormat +import org.apache.hadoop.mapred.InputFormat import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapred.SequenceFileInputFormat import org.apache.hadoop.mapred.TextInputFormat import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} -import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} import org.apache.hadoop.mapreduce.{Job => NewHadoopJob} +import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} + import org.apache.mesos.MesosNativeLibrary -import spark.deploy.{SparkHadoopUtil, LocalSparkCluster} -import spark.partial.ApproximateEvaluator -import spark.partial.PartialResult +import spark.deploy.{LocalSparkCluster, SparkHadoopUtil} +import spark.partial.{ApproximateEvaluator, PartialResult} import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD} -import spark.scheduler._ +import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler} +import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler} import spark.scheduler.local.LocalScheduler -import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler} import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} -import spark.storage.BlockManagerUI +import spark.storage.{BlockManagerUI, StorageStatus, StorageUtils, RDDInfo} import spark.util.{MetadataCleaner, TimeStampedHashMap} -import spark.storage.{StorageStatus, StorageUtils, RDDInfo} + /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -97,7 +100,7 @@ class SparkContext( private[spark] val addedJars = HashMap[String, Long]() // Keeps track of all persisted RDDs - private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]() + private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]] private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup) @@ -505,7 +508,7 @@ class SparkContext( * Return information about what RDDs are cached, if they are in mem or on disk, how much space * they take, etc. */ - def getRDDStorageInfo : Array[RDDInfo] = { + def getRDDStorageInfo: Array[RDDInfo] = { StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this) } @@ -516,7 +519,7 @@ class SparkContext( /** * Return information about blocks stored in all of the slaves */ - def getExecutorStorageStatus : Array[StorageStatus] = { + def getExecutorStorageStatus: Array[StorageStatus] = { env.blockManager.master.getStorageStatus } diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala index e29f1e5899..eb81ed64cd 100644 --- a/core/src/main/scala/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaRDD.scala @@ -14,12 +14,18 @@ JavaRDDLike[T, JavaRDD[T]] { /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): JavaRDD[T] = wrapRDD(rdd.cache()) - /** + /** * Set this RDD's storage level to persist its values across operations after the first time - * it is computed. Can only be called once on each RDD. + * it is computed. This can only be used to assign a new storage level if the RDD does not + * have a storage level set yet.. */ def persist(newLevel: StorageLevel): JavaRDD[T] = wrapRDD(rdd.persist(newLevel)) + /** + * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. + */ + def unpersist(): JavaRDD[T] = wrapRDD(rdd.unpersist()) + // Transformations (return a new RDD) /** @@ -31,7 +37,7 @@ JavaRDDLike[T, JavaRDD[T]] { * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int): JavaRDD[T] = wrapRDD(rdd.distinct(numPartitions)) - + /** * Return a new RDD containing only the elements that satisfy a predicate. */ @@ -54,7 +60,7 @@ JavaRDDLike[T, JavaRDD[T]] { */ def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaRDD[T] = wrapRDD(rdd.sample(withReplacement, fraction, seed)) - + /** * Return the union of this RDD and another one. Any identical elements will appear multiple * times (use `.distinct()` to eliminate them). @@ -63,7 +69,7 @@ JavaRDDLike[T, JavaRDD[T]] { /** * Return an RDD with the elements from `this` that are not in `other`. - * + * * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be <= us. */ diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 6fae62d373..ac26c16867 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -15,6 +15,7 @@ import akka.util.duration._ import spark.{Logging, SparkException, Utils} + private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Logging { val AKKA_RETRY_ATTEMPTS: Int = System.getProperty("spark.akka.num.retries", "3").toInt @@ -88,6 +89,21 @@ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Loggi } /** + * Remove all blocks belonging to the given RDD. + */ + def removeRdd(rddId: Int) { + val rddBlockPrefix = "rdd_" + rddId + "_" + // Get the list of blocks in block manager, and remove ones that are part of this RDD. + // The runtime complexity is linear to the number of blocks persisted in the cluster. + // It could be expensive if the cluster is large and has a lot of blocks persisted. + getStorageStatus.flatMap(_.blocks).foreach { case(blockId, status) => + if (blockId.startsWith(rddBlockPrefix)) { + removeBlock(blockId) + } + } + } + + /** * Return the memory status for each block manager, in the form of a map from * the block manager's id to two long values. The first value is the maximum * amount of memory allocated for the block manager, while the second is the diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index c9b4707def..ab3e197035 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -3,8 +3,10 @@ package spark import network.ConnectionManagerId import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.Timeouts._ import org.scalatest.matchers.ShouldMatchers import org.scalatest.prop.Checkers +import org.scalatest.time.{Span, Millis} import org.scalacheck.Arbitrary._ import org.scalacheck.Gen import org.scalacheck.Prop._ @@ -252,12 +254,36 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter assert(data2.count === 2) } } + + test("unpersist RDDs") { + DistributedSuite.amMaster = true + sc = new SparkContext("local-cluster[3,1,512]", "test") + val data = sc.parallelize(Seq(true, false, false, false), 4) + data.persist(StorageLevel.MEMORY_ONLY_2) + data.count + assert(sc.persistentRdds.isEmpty == false) + data.unpersist() + assert(sc.persistentRdds.isEmpty == true) + + failAfter(Span(3000, Millis)) { + try { + while (! sc.getRDDStorageInfo.isEmpty) { + Thread.sleep(200) + } + } catch { + case e: Exception => + // Do nothing. We might see exceptions because block manager + // is racing this thread to remove entries from the driver. + } + } + assert(sc.getRDDStorageInfo.isEmpty == true) + } } object DistributedSuite { // Indicates whether this JVM is marked for failure. var mark = false - + // Set by test to remember if we are in the driver program so we can assert // that we are not. var amMaster = false @@ -274,9 +300,9 @@ object DistributedSuite { // Act like an identity function, but if mark was set to true previously, fail, // crashing the entire JVM. def failOnMarkedIdentity(item: Boolean): Boolean = { - if (mark) { + if (mark) { System.exit(42) - } + } item - } + } } diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 7fbdd44340..cee6312572 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -2,6 +2,8 @@ package spark import scala.collection.mutable.HashMap import org.scalatest.FunSuite +import org.scalatest.concurrent.Timeouts._ +import org.scalatest.time.{Span, Millis} import spark.SparkContext._ import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD, ShuffledRDD} @@ -100,6 +102,28 @@ class RDDSuite extends FunSuite with LocalSparkContext { assert(rdd.collect().toList === List(1, 2, 3, 4)) } + test("unpersist RDD") { + sc = new SparkContext("local", "test") + val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache() + rdd.count + assert(sc.persistentRdds.isEmpty == false) + rdd.unpersist() + assert(sc.persistentRdds.isEmpty == true) + + failAfter(Span(3000, Millis)) { + try { + while (! sc.getRDDStorageInfo.isEmpty) { + Thread.sleep(200) + } + } catch { + case e: Exception => + // Do nothing. We might see exceptions because block manager + // is racing this thread to remove entries from the driver. + } + } + assert(sc.getRDDStorageInfo.isEmpty == true) + } + test("caching with failures") { sc = new SparkContext("local", "test") val onlySplit = new Partition { override def index: Int = 0 } diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index 5a11a4483b..9fe0de665c 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -207,6 +207,31 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT } } + test("removing rdd") { + store = new BlockManager("<driver>", actorSystem, master, serializer, 2000) + val a1 = new Array[Byte](400) + val a2 = new Array[Byte](400) + val a3 = new Array[Byte](400) + // Putting a1, a2 and a3 in memory. + store.putSingle("rdd_0_0", a1, StorageLevel.MEMORY_ONLY) + store.putSingle("rdd_0_1", a2, StorageLevel.MEMORY_ONLY) + store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY) + master.removeRdd(0) + + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + store.getSingle("rdd_0_0") should be (None) + master.getLocations("rdd_0_0") should have size 0 + } + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + store.getSingle("rdd_0_1") should be (None) + master.getLocations("rdd_0_1") should have size 0 + } + eventually(timeout(1000 milliseconds), interval(10 milliseconds)) { + store.getSingle("nonrddblock") should not be (None) + master.getLocations("nonrddblock") should have size (1) + } + } + test("reregistration on heart beat") { val heartBeat = PrivateMethod[Unit]('heartBeat) store = new BlockManager("<driver>", actorSystem, master, serializer, 2000) |