aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorharshars <harshars@bigcuttall-lm.corp.yahoo.com>2013-03-25 20:09:07 -0700
committerReynold Xin <rxin@cs.berkeley.edu>2013-05-01 14:42:17 -0700
commit848156273178bed5763bcbc91baa788bd4a57f6e (patch)
treea66c090aa924961b7ab8a3ab49f8c76ee58b5712
parent1deee67615a1cfcf7d1b53241d1508b97e20f1d5 (diff)
downloadspark-848156273178bed5763bcbc91baa788bd4a57f6e.tar.gz
spark-848156273178bed5763bcbc91baa788bd4a57f6e.tar.bz2
spark-848156273178bed5763bcbc91baa788bd4a57f6e.zip
Merged Ram's commit on removing RDDs.
Conflicts: core/src/main/scala/spark/SparkContext.scala
-rw-r--r--core/src/main/scala/spark/SparkContext.scala62
-rw-r--r--core/src/test/scala/spark/DistributedSuite.scala12
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala7
3 files changed, 59 insertions, 22 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 5f5ec0b0f4..8bee1d65a2 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -1,47 +1,50 @@
package spark
import java.io._
-import java.util.concurrent.atomic.AtomicInteger
import java.net.URI
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicInteger
+import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.generic.Growable
-import scala.collection.mutable.HashMap
-import scala.collection.JavaConversions._
+import scala.collection.mutable.{ConcurrentMap, HashMap}
+
+import akka.actor.Actor._
-import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.InputFormat
-import org.apache.hadoop.mapred.SequenceFileInputFormat
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.io.IntWritable
-import org.apache.hadoop.io.LongWritable
-import org.apache.hadoop.io.FloatWritable
-import org.apache.hadoop.io.DoubleWritable
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.ArrayWritable
import org.apache.hadoop.io.BooleanWritable
import org.apache.hadoop.io.BytesWritable
-import org.apache.hadoop.io.ArrayWritable
+import org.apache.hadoop.io.DoubleWritable
+import org.apache.hadoop.io.FloatWritable
+import org.apache.hadoop.io.IntWritable
+import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.Text
+import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.FileInputFormat
+import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapred.SequenceFileInputFormat
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
+
import org.apache.mesos.MesosNativeLibrary
-import spark.deploy.{SparkHadoopUtil, LocalSparkCluster}
-import spark.partial.ApproximateEvaluator
-import spark.partial.PartialResult
+import spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
+import spark.partial.{ApproximateEvaluator, PartialResult}
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
-import spark.scheduler._
+import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler}
+import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler}
import spark.scheduler.local.LocalScheduler
-import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler}
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
-import spark.storage.BlockManagerUI
+import spark.storage.{BlockManagerUI, StorageStatus, StorageUtils, RDDInfo}
import spark.util.{MetadataCleaner, TimeStampedHashMap}
-import spark.storage.{StorageStatus, StorageUtils, RDDInfo}
+
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -97,7 +100,7 @@ class SparkContext(
private[spark] val addedJars = HashMap[String, Long]()
// Keeps track of all persisted RDDs
- private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]()
+ private[spark] val persistentRdds: ConcurrentMap[Int, RDD[_]] = new ConcurrentHashMap[Int, RDD[_]]()
private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup)
@@ -520,6 +523,21 @@ class SparkContext(
env.blockManager.master.getStorageStatus
}
+ def removeRDD(id: Int): Unit = {
+ val storageStatusList = getExecutorStorageStatus
+ val groupedRddBlocks = storageStatusList.flatMap(_.blocks).toMap
+ logInfo("RDD to remove: " + id)
+ groupedRddBlocks.foreach(x => {
+ val k = x._1.substring(0,x._1.lastIndexOf('_'))
+ val rdd_id = "rdd_" + id
+ logInfo("RDD to check: " + rdd_id)
+ if(k.equals(rdd_id)) {
+ env.blockManager.master.removeBlock(x._1)
+ }
+ })
+ persistentRdds.remove(id)
+ }
+
/**
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
* any new nodes.
@@ -743,7 +761,7 @@ class SparkContext(
/** Called by MetadataCleaner to clean up the persistentRdds map periodically */
private[spark] def cleanup(cleanupTime: Long) {
- persistentRdds.clearOldValues(cleanupTime)
+ // do nothing. this needs to be removed.
}
}
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index c9b4707def..c7f6ab3133 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -252,6 +252,18 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
assert(data2.count === 2)
}
}
+
+ test("remove RDDs cleanly") {
+ DistributedSuite.amMaster = true
+ sc = new SparkContext("local-cluster[3,1,512]", "test")
+ val data = sc.parallelize(Seq(true, false, false, false), 4)
+ data.persist(StorageLevel.MEMORY_ONLY_2)
+ data.count
+ sc.removeRDD(data.id)
+ assert(sc.persistentRdds.isEmpty == true)
+ assert(sc.getRDDStorageInfo.isEmpty == true)
+
+ }
}
object DistributedSuite {
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 7fbdd44340..88b7ab9f52 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -100,6 +100,13 @@ class RDDSuite extends FunSuite with LocalSparkContext {
assert(rdd.collect().toList === List(1, 2, 3, 4))
}
+ test("remove RDD") {
+ sc = new SparkContext("local", "test")
+ val rdd = sc.makeRDD(Array(1,2,3,4), 2).cache()
+ sc.removeRDD(rdd.id)
+ assert(sc.persistentRdds.empty == true)
+ }
+
test("caching with failures") {
sc = new SparkContext("local", "test")
val onlySplit = new Partition { override def index: Int = 0 }