aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-04-24 18:18:22 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-04-24 18:18:22 -0700
commit526a518bf32ad55b926a26f16086f445fd0ae29f (patch)
treedc4bcf8aa155aae8fa5e5bdeb40b47c423745b9d /core
parent35e3d199f04fba3230625002a458d43b9578b2e8 (diff)
downloadspark-526a518bf32ad55b926a26f16086f445fd0ae29f.tar.gz
spark-526a518bf32ad55b926a26f16086f445fd0ae29f.tar.bz2
spark-526a518bf32ad55b926a26f16086f445fd0ae29f.zip
[SPARK-1592][streaming] Automatically remove streaming input blocks
The raw input data is stored as blocks in BlockManagers. Earlier they were cleared by cleaner ttl. Now since streaming does not require cleaner TTL to be set, the block would not get cleared. This increases up the Spark's memory usage, which is not even accounted and shown in the Spark storage UI. It may cause the data blocks to spill over to disk, which eventually slows down the receiving of data (persisting to memory become bottlenecked by writing to disk). The solution in this PR is to automatically remove those blocks. The mechanism to keep track of which BlockRDDs (which has presents the raw data blocks as a RDD) can be safely cleared already exists. Just use it to explicitly remove blocks from BlockRDDs. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #512 from tdas/block-rdd-unpersist and squashes the following commits: d25e610 [Tathagata Das] Merge remote-tracking branch 'apache/master' into block-rdd-unpersist 5f46d69 [Tathagata Das] Merge remote-tracking branch 'apache/master' into block-rdd-unpersist 2c320cd [Tathagata Das] Updated configuration with spark.streaming.unpersist setting. 2d4b2fd [Tathagata Das] Automatically removed input blocks
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala45
1 files changed, 40 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index e6c4a6d379..c64da8804d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -19,24 +19,30 @@ package org.apache.spark.rdd
import scala.reflect.ClassTag
-import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark._
import org.apache.spark.storage.{BlockId, BlockManager}
+import scala.Some
private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends Partition {
val index = idx
}
private[spark]
-class BlockRDD[T: ClassTag](sc: SparkContext, @transient blockIds: Array[BlockId])
+class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds: Array[BlockId])
extends RDD[T](sc, Nil) {
@transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
+ @volatile private var _isValid = true
- override def getPartitions: Array[Partition] = (0 until blockIds.size).map(i => {
- new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
- }).toArray
+ override def getPartitions: Array[Partition] = {
+ assertValid()
+ (0 until blockIds.size).map(i => {
+ new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
+ }).toArray
+ }
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
+ assertValid()
val blockManager = SparkEnv.get.blockManager
val blockId = split.asInstanceOf[BlockRDDPartition].blockId
blockManager.get(blockId) match {
@@ -47,7 +53,36 @@ class BlockRDD[T: ClassTag](sc: SparkContext, @transient blockIds: Array[BlockId
}
override def getPreferredLocations(split: Partition): Seq[String] = {
+ assertValid()
locations_(split.asInstanceOf[BlockRDDPartition].blockId)
}
+
+ /**
+ * Remove the data blocks that this BlockRDD is made from. NOTE: This is an
+ * irreversible operation, as the data in the blocks cannot be recovered back
+ * once removed. Use it with caution.
+ */
+ private[spark] def removeBlocks() {
+ blockIds.foreach { blockId =>
+ sc.env.blockManager.master.removeBlock(blockId)
+ }
+ _isValid = false
+ }
+
+ /**
+ * Whether this BlockRDD is actually usable. This will be false if the data blocks have been
+ * removed using `this.removeBlocks`.
+ */
+ private[spark] def isValid: Boolean = {
+ _isValid
+ }
+
+ /** Check if this BlockRDD is valid. If not valid, exception is thrown. */
+ private[spark] def assertValid() {
+ if (!_isValid) {
+ throw new SparkException(
+ "Attempted to use %s after its blocks have been removed!".format(toString))
+ }
+ }
}