aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala45
1 files changed, 40 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index e6c4a6d379..c64da8804d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -19,24 +19,30 @@ package org.apache.spark.rdd
import scala.reflect.ClassTag
-import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark._
import org.apache.spark.storage.{BlockId, BlockManager}
+import scala.Some
private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends Partition {
val index = idx
}
private[spark]
-class BlockRDD[T: ClassTag](sc: SparkContext, @transient blockIds: Array[BlockId])
+class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds: Array[BlockId])
extends RDD[T](sc, Nil) {
@transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
+ @volatile private var _isValid = true
- override def getPartitions: Array[Partition] = (0 until blockIds.size).map(i => {
- new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
- }).toArray
+ override def getPartitions: Array[Partition] = {
+ assertValid()
+ (0 until blockIds.size).map(i => {
+ new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
+ }).toArray
+ }
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
+ assertValid()
val blockManager = SparkEnv.get.blockManager
val blockId = split.asInstanceOf[BlockRDDPartition].blockId
blockManager.get(blockId) match {
@@ -47,7 +53,36 @@ class BlockRDD[T: ClassTag](sc: SparkContext, @transient blockIds: Array[BlockId
}
override def getPreferredLocations(split: Partition): Seq[String] = {
+ assertValid()
locations_(split.asInstanceOf[BlockRDDPartition].blockId)
}
+
+ /**
+ * Remove the data blocks that this BlockRDD is made from. NOTE: This is an
+ * irreversible operation, as the data in the blocks cannot be recovered back
+ * once removed. Use it with caution.
+ */
+ private[spark] def removeBlocks() {
+ blockIds.foreach { blockId =>
+ sc.env.blockManager.master.removeBlock(blockId)
+ }
+ _isValid = false
+ }
+
+ /**
+ * Whether this BlockRDD is actually usable. This will be false if the data blocks have been
+ * removed using `this.removeBlocks`.
+ */
+ private[spark] def isValid: Boolean = {
+ _isValid
+ }
+
+ /** Check if this BlockRDD is valid. If not valid, exception is thrown. */
+ private[spark] def assertValid() {
+ if (!_isValid) {
+ throw new SparkException(
+ "Attempted to use %s after its blocks have been removed!".format(toString))
+ }
+ }
}