diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala | 45 |
1 files changed, 40 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala index e6c4a6d379..c64da8804d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala @@ -19,24 +19,30 @@ package org.apache.spark.rdd import scala.reflect.ClassTag -import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} +import org.apache.spark._ import org.apache.spark.storage.{BlockId, BlockManager} +import scala.Some private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends Partition { val index = idx } private[spark] -class BlockRDD[T: ClassTag](sc: SparkContext, @transient blockIds: Array[BlockId]) +class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds: Array[BlockId]) extends RDD[T](sc, Nil) { @transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get) + @volatile private var _isValid = true - override def getPartitions: Array[Partition] = (0 until blockIds.size).map(i => { - new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition] - }).toArray + override def getPartitions: Array[Partition] = { + assertValid() + (0 until blockIds.size).map(i => { + new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition] + }).toArray + } override def compute(split: Partition, context: TaskContext): Iterator[T] = { + assertValid() val blockManager = SparkEnv.get.blockManager val blockId = split.asInstanceOf[BlockRDDPartition].blockId blockManager.get(blockId) match { @@ -47,7 +53,36 @@ class BlockRDD[T: ClassTag](sc: SparkContext, @transient blockIds: Array[BlockId } override def getPreferredLocations(split: Partition): Seq[String] = { + assertValid() locations_(split.asInstanceOf[BlockRDDPartition].blockId) } + + /** + * Remove the data blocks that this BlockRDD is made from. NOTE: This is an + * irreversible operation, as the data in the blocks cannot be recovered back + * once removed. Use it with caution. + */ + private[spark] def removeBlocks() { + blockIds.foreach { blockId => + sc.env.blockManager.master.removeBlock(blockId) + } + _isValid = false + } + + /** + * Whether this BlockRDD is actually usable. This will be false if the data blocks have been + * removed using `this.removeBlocks`. + */ + private[spark] def isValid: Boolean = { + _isValid + } + + /** Check if this BlockRDD is valid. If not valid, exception is thrown. */ + private[spark] def assertValid() { + if (!_isValid) { + throw new SparkException( + "Attempted to use %s after its blocks have been removed!".format(toString)) + } + } } |