diff options
author | harshars <harshars@bigcuttall-lm.corp.yahoo.com> | 2013-03-25 20:09:07 -0700 |
---|---|---|
committer | Reynold Xin <rxin@cs.berkeley.edu> | 2013-05-01 14:42:17 -0700 |
commit | 848156273178bed5763bcbc91baa788bd4a57f6e (patch) | |
tree | a66c090aa924961b7ab8a3ab49f8c76ee58b5712 | |
parent | 1deee67615a1cfcf7d1b53241d1508b97e20f1d5 (diff) | |
download | spark-848156273178bed5763bcbc91baa788bd4a57f6e.tar.gz spark-848156273178bed5763bcbc91baa788bd4a57f6e.tar.bz2 spark-848156273178bed5763bcbc91baa788bd4a57f6e.zip |
Merged Ram's commit on removing RDDs.
Conflicts:
core/src/main/scala/spark/SparkContext.scala
-rw-r--r-- | core/src/main/scala/spark/SparkContext.scala | 62 | ||||
-rw-r--r-- | core/src/test/scala/spark/DistributedSuite.scala | 12 | ||||
-rw-r--r-- | core/src/test/scala/spark/RDDSuite.scala | 7 |
3 files changed, 59 insertions, 22 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 5f5ec0b0f4..8bee1d65a2 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -1,47 +1,50 @@ package spark import java.io._ -import java.util.concurrent.atomic.AtomicInteger import java.net.URI +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger +import scala.collection.JavaConversions._ import scala.collection.Map import scala.collection.generic.Growable -import scala.collection.mutable.HashMap -import scala.collection.JavaConversions._ +import scala.collection.mutable.{ConcurrentMap, HashMap} + +import akka.actor.Actor._ -import org.apache.hadoop.fs.Path import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.mapred.InputFormat -import org.apache.hadoop.mapred.SequenceFileInputFormat -import org.apache.hadoop.io.Writable -import org.apache.hadoop.io.IntWritable -import org.apache.hadoop.io.LongWritable -import org.apache.hadoop.io.FloatWritable -import org.apache.hadoop.io.DoubleWritable +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.ArrayWritable import org.apache.hadoop.io.BooleanWritable import org.apache.hadoop.io.BytesWritable -import org.apache.hadoop.io.ArrayWritable +import org.apache.hadoop.io.DoubleWritable +import org.apache.hadoop.io.FloatWritable +import org.apache.hadoop.io.IntWritable +import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.NullWritable import org.apache.hadoop.io.Text +import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.FileInputFormat +import org.apache.hadoop.mapred.InputFormat import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapred.SequenceFileInputFormat import org.apache.hadoop.mapred.TextInputFormat import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} -import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} import org.apache.hadoop.mapreduce.{Job => NewHadoopJob} +import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} + import org.apache.mesos.MesosNativeLibrary -import spark.deploy.{SparkHadoopUtil, LocalSparkCluster} -import spark.partial.ApproximateEvaluator -import spark.partial.PartialResult +import spark.deploy.{LocalSparkCluster, SparkHadoopUtil} +import spark.partial.{ApproximateEvaluator, PartialResult} import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD} -import spark.scheduler._ +import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler} +import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler} import spark.scheduler.local.LocalScheduler -import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler} import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} -import spark.storage.BlockManagerUI +import spark.storage.{BlockManagerUI, StorageStatus, StorageUtils, RDDInfo} import spark.util.{MetadataCleaner, TimeStampedHashMap} -import spark.storage.{StorageStatus, StorageUtils, RDDInfo} + /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -97,7 +100,7 @@ class SparkContext( private[spark] val addedJars = HashMap[String, Long]() // Keeps track of all persisted RDDs - private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]() + private[spark] val persistentRdds: ConcurrentMap[Int, RDD[_]] = new ConcurrentHashMap[Int, RDD[_]]() private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup) @@ -520,6 +523,21 @@ class SparkContext( env.blockManager.master.getStorageStatus } + def removeRDD(id: Int): Unit = { + val storageStatusList = getExecutorStorageStatus + val groupedRddBlocks = storageStatusList.flatMap(_.blocks).toMap + logInfo("RDD to remove: " + id) + groupedRddBlocks.foreach(x => { + val k = x._1.substring(0,x._1.lastIndexOf('_')) + val rdd_id = "rdd_" + id + logInfo("RDD to check: " + rdd_id) + if(k.equals(rdd_id)) { + env.blockManager.master.removeBlock(x._1) + } + }) + persistentRdds.remove(id) + } + /** * Clear the job's list of files added by `addFile` so that they do not get downloaded to * any new nodes. @@ -743,7 +761,7 @@ class SparkContext( /** Called by MetadataCleaner to clean up the persistentRdds map periodically */ private[spark] def cleanup(cleanupTime: Long) { - persistentRdds.clearOldValues(cleanupTime) + // do nothing. this needs to be removed. } } diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index c9b4707def..c7f6ab3133 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -252,6 +252,18 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter assert(data2.count === 2) } } + + test("remove RDDs cleanly") { + DistributedSuite.amMaster = true + sc = new SparkContext("local-cluster[3,1,512]", "test") + val data = sc.parallelize(Seq(true, false, false, false), 4) + data.persist(StorageLevel.MEMORY_ONLY_2) + data.count + sc.removeRDD(data.id) + assert(sc.persistentRdds.isEmpty == true) + assert(sc.getRDDStorageInfo.isEmpty == true) + + } } object DistributedSuite { diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 7fbdd44340..88b7ab9f52 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -100,6 +100,13 @@ class RDDSuite extends FunSuite with LocalSparkContext { assert(rdd.collect().toList === List(1, 2, 3, 4)) } + test("remove RDD") { + sc = new SparkContext("local", "test") + val rdd = sc.makeRDD(Array(1,2,3,4), 2).cache() + sc.removeRDD(rdd.id) + assert(sc.persistentRdds.empty == true) + } + test("caching with failures") { sc = new SparkContext("local", "test") val onlySplit = new Partition { override def index: Int = 0 } |