aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/StorageUtils.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageUtils.scala99
1 files changed, 68 insertions, 31 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 241aacd74b..8f0d181fc8 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -35,7 +35,11 @@ import org.apache.spark.internal.Logging
* class cannot mutate the source of the information. Accesses are not thread-safe.
*/
@DeveloperApi
-class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
+class StorageStatus(
+ val blockManagerId: BlockManagerId,
+ val maxMemory: Long,
+ val maxOnHeapMem: Option[Long],
+ val maxOffHeapMem: Option[Long]) {
/**
* Internal representation of the blocks stored in this block manager.
@@ -46,25 +50,21 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
private val _rddBlocks = new mutable.HashMap[Int, mutable.Map[BlockId, BlockStatus]]
private val _nonRddBlocks = new mutable.HashMap[BlockId, BlockStatus]
- /**
- * Storage information of the blocks that entails memory, disk, and off-heap memory usage.
- *
- * As with the block maps, we store the storage information separately for RDD blocks and
- * non-RDD blocks for the same reason. In particular, RDD storage information is stored
- * in a map indexed by the RDD ID to the following 4-tuple:
- *
- * (memory size, disk size, storage level)
- *
- * We assume that all the blocks that belong to the same RDD have the same storage level.
- * This field is not relevant to non-RDD blocks, however, so the storage information for
- * non-RDD blocks contains only the first 3 fields (in the same order).
- */
- private val _rddStorageInfo = new mutable.HashMap[Int, (Long, Long, StorageLevel)]
- private var _nonRddStorageInfo: (Long, Long) = (0L, 0L)
+ private case class RddStorageInfo(memoryUsage: Long, diskUsage: Long, level: StorageLevel)
+ private val _rddStorageInfo = new mutable.HashMap[Int, RddStorageInfo]
+
+ private case class NonRddStorageInfo(var onHeapUsage: Long, var offHeapUsage: Long,
+ var diskUsage: Long)
+ private val _nonRddStorageInfo = NonRddStorageInfo(0L, 0L, 0L)
/** Create a storage status with an initial set of blocks, leaving the source unmodified. */
- def this(bmid: BlockManagerId, maxMem: Long, initialBlocks: Map[BlockId, BlockStatus]) {
- this(bmid, maxMem)
+ def this(
+ bmid: BlockManagerId,
+ maxMemory: Long,
+ maxOnHeapMem: Option[Long],
+ maxOffHeapMem: Option[Long],
+ initialBlocks: Map[BlockId, BlockStatus]) {
+ this(bmid, maxMemory, maxOnHeapMem, maxOffHeapMem)
initialBlocks.foreach { case (bid, bstatus) => addBlock(bid, bstatus) }
}
@@ -176,26 +176,57 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
*/
def numRddBlocksById(rddId: Int): Int = _rddBlocks.get(rddId).map(_.size).getOrElse(0)
+ /** Return the max memory can be used by this block manager. */
+ def maxMem: Long = maxMemory
+
/** Return the memory remaining in this block manager. */
def memRemaining: Long = maxMem - memUsed
+ /** Return the memory used by caching RDDs */
+ def cacheSize: Long = onHeapCacheSize.getOrElse(0L) + offHeapCacheSize.getOrElse(0L)
+
/** Return the memory used by this block manager. */
- def memUsed: Long = _nonRddStorageInfo._1 + cacheSize
+ def memUsed: Long = onHeapMemUsed.getOrElse(0L) + offHeapMemUsed.getOrElse(0L)
- /** Return the memory used by caching RDDs */
- def cacheSize: Long = _rddBlocks.keys.toSeq.map(memUsedByRdd).sum
+ /** Return the on-heap memory remaining in this block manager. */
+ def onHeapMemRemaining: Option[Long] =
+ for (m <- maxOnHeapMem; o <- onHeapMemUsed) yield m - o
+
+ /** Return the off-heap memory remaining in this block manager. */
+ def offHeapMemRemaining: Option[Long] =
+ for (m <- maxOffHeapMem; o <- offHeapMemUsed) yield m - o
+
+ /** Return the on-heap memory used by this block manager. */
+ def onHeapMemUsed: Option[Long] = onHeapCacheSize.map(_ + _nonRddStorageInfo.onHeapUsage)
+
+ /** Return the off-heap memory used by this block manager. */
+ def offHeapMemUsed: Option[Long] = offHeapCacheSize.map(_ + _nonRddStorageInfo.offHeapUsage)
+
+ /** Return the memory used by on-heap caching RDDs */
+ def onHeapCacheSize: Option[Long] = maxOnHeapMem.map { _ =>
+ _rddStorageInfo.collect {
+ case (_, storageInfo) if !storageInfo.level.useOffHeap => storageInfo.memoryUsage
+ }.sum
+ }
+
+ /** Return the memory used by off-heap caching RDDs */
+ def offHeapCacheSize: Option[Long] = maxOffHeapMem.map { _ =>
+ _rddStorageInfo.collect {
+ case (_, storageInfo) if storageInfo.level.useOffHeap => storageInfo.memoryUsage
+ }.sum
+ }
/** Return the disk space used by this block manager. */
- def diskUsed: Long = _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum
+ def diskUsed: Long = _nonRddStorageInfo.diskUsage + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum
/** Return the memory used by the given RDD in this block manager in O(1) time. */
- def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._1).getOrElse(0L)
+ def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_.memoryUsage).getOrElse(0L)
/** Return the disk space used by the given RDD in this block manager in O(1) time. */
- def diskUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._2).getOrElse(0L)
+ def diskUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_.diskUsage).getOrElse(0L)
/** Return the storage level, if any, used by the given RDD in this block manager. */
- def rddStorageLevel(rddId: Int): Option[StorageLevel] = _rddStorageInfo.get(rddId).map(_._3)
+ def rddStorageLevel(rddId: Int): Option[StorageLevel] = _rddStorageInfo.get(rddId).map(_.level)
/**
* Update the relevant storage info, taking into account any existing status for this block.
@@ -210,10 +241,12 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
val (oldMem, oldDisk) = blockId match {
case RDDBlockId(rddId, _) =>
_rddStorageInfo.get(rddId)
- .map { case (mem, disk, _) => (mem, disk) }
+ .map { case RddStorageInfo(mem, disk, _) => (mem, disk) }
.getOrElse((0L, 0L))
- case _ =>
- _nonRddStorageInfo
+ case _ if !level.useOffHeap =>
+ (_nonRddStorageInfo.onHeapUsage, _nonRddStorageInfo.diskUsage)
+ case _ if level.useOffHeap =>
+ (_nonRddStorageInfo.offHeapUsage, _nonRddStorageInfo.diskUsage)
}
val newMem = math.max(oldMem + changeInMem, 0L)
val newDisk = math.max(oldDisk + changeInDisk, 0L)
@@ -225,13 +258,17 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
if (newMem + newDisk == 0) {
_rddStorageInfo.remove(rddId)
} else {
- _rddStorageInfo(rddId) = (newMem, newDisk, level)
+ _rddStorageInfo(rddId) = RddStorageInfo(newMem, newDisk, level)
}
case _ =>
- _nonRddStorageInfo = (newMem, newDisk)
+ if (!level.useOffHeap) {
+ _nonRddStorageInfo.onHeapUsage = newMem
+ } else {
+ _nonRddStorageInfo.offHeapUsage = newMem
+ }
+ _nonRddStorageInfo.diskUsage = newDisk
}
}
-
}
/** Helper methods for storage-related objects. */