aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-08-01 23:56:24 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-08-01 23:56:24 -0700
commitd934801d53fc2f1d57d3534ae4e1e9384c7dda99 (patch)
tree15af74aeadb86186a9c169c8dc31a37f7c8bc96a /core
parentdab37966b0cfd290919ca5c005f59dde00615c0e (diff)
downloadspark-d934801d53fc2f1d57d3534ae4e1e9384c7dda99.tar.gz
spark-d934801d53fc2f1d57d3534ae4e1e9384c7dda99.tar.bz2
spark-d934801d53fc2f1d57d3534ae4e1e9384c7dda99.zip
[SPARK-2316] Avoid O(blocks) operations in listeners
The existing code in `StorageUtils` is not the most efficient. Every time we want to update an `RDDInfo` we end up iterating through all blocks on all block managers just to discard most of them. The symptoms manifest themselves in the bountiful UI bugs observed in the wild. Many of these bugs are caused by the slow consumption of events in `LiveListenerBus`, which frequently leads to the event queue overflowing and `SparkListenerEvent`s being dropped on the floor. The changes made in this PR avoid this by first filtering out only the blocks relevant to us before computing storage information from them. It's worth a mention that this corner of the Spark code is also not very well-tested at all. The bulk of the changes in this PR (more than 60%) is actually test cases for the various logic in `StorageUtils.scala` as well as `StorageTab.scala`. These will eventually be extended to cover the various listeners that constitute the `SparkUI`. Author: Andrew Or <andrewor14@gmail.com> Closes #1679 from andrewor14/fix-drop-events and squashes the following commits: f80c1fa [Andrew Or] Rewrite fold and reduceOption as sum e132d69 [Andrew Or] Merge branch 'master' of github.com:apache/spark into fix-drop-events 14fa1c3 [Andrew Or] Simplify some code + update a few comments a91be46 [Andrew Or] Make ExecutorsPage blazingly fast bf6f09b [Andrew Or] Minor changes 8981de1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into fix-drop-events af19bc0 [Andrew Or] *UsedByRDD -> *UsedByRdd (minor) 6970bc8 [Andrew Or] Add extensive tests for StorageListener and the new code in StorageUtils e080b9e [Andrew Or] Reduce run time of StorageUtils.updateRddInfo to near constant 2c3ef6a [Andrew Or] Actually filter out only the relevant RDDs 6fef86a [Andrew Or] Add extensive tests for new code in StorageStatus b66b6b0 [Andrew Or] Use more efficient underlying data structures for blocks 6a7b7c0 [Andrew Or] Avoid chained operations on TraversableLike a9ec384 [Andrew Or] Merge branch 'master' of github.com:apache/spark into fix-drop-events b12fcd7 [Andrew Or] Fix tests + simplify sc.getRDDStorageInfo da8e322 [Andrew Or] Merge branch 'master' of github.com:apache/spark into fix-drop-events 8e91921 [Andrew Or] Iterate through a filtered set of blocks when updating RDDInfo 7b2c4aa [Andrew Or] Rewrite blockLocationsFromStorageStatus + clean up method signatures 41fa50d [Andrew Or] Add a legacy constructor for StorageStatus 53af15d [Andrew Or] Refactor StorageStatus + add a bunch of tests
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/storage/RDDInfo.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageUtils.scala316
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala13
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala22
-rw-r--r--core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala72
-rw-r--r--core/src/test/scala/org/apache/spark/storage/StorageSuite.scala354
-rw-r--r--core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala165
13 files changed, 843 insertions, 176 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 368835a867..9ba21cfcde 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -48,7 +48,7 @@ import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend, SimrSchedulerBackend}
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalBackend
-import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
+import org.apache.spark.storage._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
@@ -843,7 +843,9 @@ class SparkContext(config: SparkConf) extends Logging {
*/
@DeveloperApi
def getRDDStorageInfo: Array[RDDInfo] = {
- StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this)
+ val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray
+ StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus)
+ rddInfos.filter(_.isCached)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 94f5a4bb2e..bd31e3c5a1 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -267,9 +267,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
}
private def storageStatus: Array[StorageStatus] = {
- blockManagerInfo.map { case(blockManagerId, info) =>
- val blockMap = mutable.Map[BlockId, BlockStatus](info.blocks.toSeq: _*)
- new StorageStatus(blockManagerId, info.maxMem, blockMap)
+ blockManagerInfo.map { case (blockManagerId, info) =>
+ new StorageStatus(blockManagerId, info.maxMem, info.blocks)
}.toArray
}
@@ -424,7 +423,14 @@ case class BlockStatus(
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
- tachyonSize: Long)
+ tachyonSize: Long) {
+ def isCached: Boolean = memSize + diskSize + tachyonSize > 0
+}
+
+@DeveloperApi
+object BlockStatus {
+ def empty: BlockStatus = BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
+}
private[spark] class BlockManagerInfo(
val blockManagerId: BlockManagerId,
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
index 687586490a..e939318a02 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
@@ -30,7 +30,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
metricRegistry.register(MetricRegistry.name("memory", "maxMem_MB"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
- val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
+ val maxMem = storageStatusList.map(_.maxMem).sum
maxMem / 1024 / 1024
}
})
@@ -38,7 +38,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
metricRegistry.register(MetricRegistry.name("memory", "remainingMem_MB"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
- val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
+ val remainingMem = storageStatusList.map(_.memRemaining).sum
remainingMem / 1024 / 1024
}
})
@@ -46,8 +46,8 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
metricRegistry.register(MetricRegistry.name("memory", "memUsed_MB"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
- val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
- val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
+ val maxMem = storageStatusList.map(_.maxMem).sum
+ val remainingMem = storageStatusList.map(_.memRemaining).sum
(maxMem - remainingMem) / 1024 / 1024
}
})
@@ -55,11 +55,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed_MB"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
- val diskSpaceUsed = storageStatusList
- .flatMap(_.blocks.values.map(_.diskSize))
- .reduceOption(_ + _)
- .getOrElse(0L)
-
+ val diskSpaceUsed = storageStatusList.map(_.diskUsed).sum
diskSpaceUsed / 1024 / 1024
}
})
diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
index 5a72e21687..120c327a7e 100644
--- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
@@ -34,6 +34,8 @@ class RDDInfo(
var diskSize = 0L
var tachyonSize = 0L
+ def isCached: Boolean = (memSize + diskSize + tachyonSize > 0) && numCachedPartitions > 0
+
override def toString = {
import Utils.bytesToString
("RDD \"%s\" (%d) StorageLevel: %s; CachedPartitions: %d; TotalPartitions: %d; " +
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
index 41c960c867..d9066f7664 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
@@ -35,13 +35,12 @@ class StorageStatusListener extends SparkListener {
/** Update storage status list to reflect updated block statuses */
private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
- val filteredStatus = executorIdToStorageStatus.get(execId)
- filteredStatus.foreach { storageStatus =>
+ executorIdToStorageStatus.get(execId).foreach { storageStatus =>
updatedBlocks.foreach { case (blockId, updatedStatus) =>
if (updatedStatus.storageLevel == StorageLevel.NONE) {
- storageStatus.blocks.remove(blockId)
+ storageStatus.removeBlock(blockId)
} else {
- storageStatus.blocks(blockId) = updatedStatus
+ storageStatus.updateBlock(blockId, updatedStatus)
}
}
}
@@ -50,9 +49,8 @@ class StorageStatusListener extends SparkListener {
/** Update storage status list to reflect the removal of an RDD from the cache */
private def updateStorageStatus(unpersistedRDDId: Int) {
storageStatusList.foreach { storageStatus =>
- val unpersistedBlocksIds = storageStatus.rddBlocks.keys.filter(_.rddId == unpersistedRDDId)
- unpersistedBlocksIds.foreach { blockId =>
- storageStatus.blocks.remove(blockId)
+ storageStatus.rddBlocksById(unpersistedRDDId).foreach { case (blockId, _) =>
+ storageStatus.removeBlock(blockId)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 177281f663..0a0a448baa 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -20,122 +20,258 @@ package org.apache.spark.storage
import scala.collection.Map
import scala.collection.mutable
-import org.apache.spark.SparkContext
import org.apache.spark.annotation.DeveloperApi
/**
* :: DeveloperApi ::
* Storage information for each BlockManager.
+ *
+ * This class assumes BlockId and BlockStatus are immutable, such that the consumers of this
+ * class cannot mutate the source of the information. Accesses are not thread-safe.
*/
@DeveloperApi
-class StorageStatus(
- val blockManagerId: BlockManagerId,
- val maxMem: Long,
- val blocks: mutable.Map[BlockId, BlockStatus] = mutable.Map.empty) {
+class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
- def memUsed = blocks.values.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
+ /**
+ * Internal representation of the blocks stored in this block manager.
+ *
+ * We store RDD blocks and non-RDD blocks separately to allow quick retrievals of RDD blocks.
+ * These collections should only be mutated through the add/update/removeBlock methods.
+ */
+ private val _rddBlocks = new mutable.HashMap[Int, mutable.Map[BlockId, BlockStatus]]
+ private val _nonRddBlocks = new mutable.HashMap[BlockId, BlockStatus]
- def memUsedByRDD(rddId: Int) =
- rddBlocks.filterKeys(_.rddId == rddId).values.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
+ /**
+ * Storage information of the blocks that entails memory, disk, and off-heap memory usage.
+ *
+ * As with the block maps, we store the storage information separately for RDD blocks and
+ * non-RDD blocks for the same reason. In particular, RDD storage information is stored
+ * in a map indexed by the RDD ID to the following 4-tuple:
+ *
+ * (memory size, disk size, off-heap size, storage level)
+ *
+ * We assume that all the blocks that belong to the same RDD have the same storage level.
+ * This field is not relevant to non-RDD blocks, however, so the storage information for
+ * non-RDD blocks contains only the first 3 fields (in the same order).
+ */
+ private val _rddStorageInfo = new mutable.HashMap[Int, (Long, Long, Long, StorageLevel)]
+ private var _nonRddStorageInfo: (Long, Long, Long) = (0L, 0L, 0L)
- def diskUsed = blocks.values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
+ /** Create a storage status with an initial set of blocks, leaving the source unmodified. */
+ def this(bmid: BlockManagerId, maxMem: Long, initialBlocks: Map[BlockId, BlockStatus]) {
+ this(bmid, maxMem)
+ initialBlocks.foreach { case (bid, bstatus) => addBlock(bid, bstatus) }
+ }
- def diskUsedByRDD(rddId: Int) =
- rddBlocks.filterKeys(_.rddId == rddId).values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
+ /**
+ * Return the blocks stored in this block manager.
+ *
+ * Note that this is somewhat expensive, as it involves cloning the underlying maps and then
+ * concatenating them together. Much faster alternatives exist for common operations such as
+ * contains, get, and size.
+ */
+ def blocks: Map[BlockId, BlockStatus] = _nonRddBlocks ++ rddBlocks
- def memRemaining: Long = maxMem - memUsed
+ /**
+ * Return the RDD blocks stored in this block manager.
+ *
+ * Note that this is somewhat expensive, as it involves cloning the underlying maps and then
+ * concatenating them together. Much faster alternatives exist for common operations such as
+ * getting the memory, disk, and off-heap memory sizes occupied by this RDD.
+ */
+ def rddBlocks: Map[BlockId, BlockStatus] = _rddBlocks.flatMap { case (_, blocks) => blocks }
- def rddBlocks = blocks.collect { case (rdd: RDDBlockId, status) => (rdd, status) }
-}
+ /** Return the blocks that belong to the given RDD stored in this block manager. */
+ def rddBlocksById(rddId: Int): Map[BlockId, BlockStatus] = {
+ _rddBlocks.get(rddId).getOrElse(Map.empty)
+ }
-/** Helper methods for storage-related objects. */
-private[spark] object StorageUtils {
+ /** Add the given block to this storage status. If it already exists, overwrite it. */
+ private[spark] def addBlock(blockId: BlockId, blockStatus: BlockStatus): Unit = {
+ updateStorageInfo(blockId, blockStatus)
+ blockId match {
+ case RDDBlockId(rddId, _) =>
+ _rddBlocks.getOrElseUpdate(rddId, new mutable.HashMap)(blockId) = blockStatus
+ case _ =>
+ _nonRddBlocks(blockId) = blockStatus
+ }
+ }
+
+ /** Update the given block in this storage status. If it doesn't already exist, add it. */
+ private[spark] def updateBlock(blockId: BlockId, blockStatus: BlockStatus): Unit = {
+ addBlock(blockId, blockStatus)
+ }
+
+ /** Remove the given block from this storage status. */
+ private[spark] def removeBlock(blockId: BlockId): Option[BlockStatus] = {
+ updateStorageInfo(blockId, BlockStatus.empty)
+ blockId match {
+ case RDDBlockId(rddId, _) =>
+ // Actually remove the block, if it exists
+ if (_rddBlocks.contains(rddId)) {
+ val removed = _rddBlocks(rddId).remove(blockId)
+ // If the given RDD has no more blocks left, remove the RDD
+ if (_rddBlocks(rddId).isEmpty) {
+ _rddBlocks.remove(rddId)
+ }
+ removed
+ } else {
+ None
+ }
+ case _ =>
+ _nonRddBlocks.remove(blockId)
+ }
+ }
/**
- * Returns basic information of all RDDs persisted in the given SparkContext. This does not
- * include storage information.
+ * Return whether the given block is stored in this block manager in O(1) time.
+ * Note that this is much faster than `this.blocks.contains`, which is O(blocks) time.
*/
- def rddInfoFromSparkContext(sc: SparkContext): Array[RDDInfo] = {
- sc.persistentRdds.values.map { rdd =>
- val rddName = Option(rdd.name).getOrElse(rdd.id.toString)
- val rddNumPartitions = rdd.partitions.size
- val rddStorageLevel = rdd.getStorageLevel
- val rddInfo = new RDDInfo(rdd.id, rddName, rddNumPartitions, rddStorageLevel)
- rddInfo
- }.toArray
+ def containsBlock(blockId: BlockId): Boolean = {
+ blockId match {
+ case RDDBlockId(rddId, _) =>
+ _rddBlocks.get(rddId).exists(_.contains(blockId))
+ case _ =>
+ _nonRddBlocks.contains(blockId)
+ }
}
- /** Returns storage information of all RDDs persisted in the given SparkContext. */
- def rddInfoFromStorageStatus(
- storageStatuses: Seq[StorageStatus],
- sc: SparkContext): Array[RDDInfo] = {
- rddInfoFromStorageStatus(storageStatuses, rddInfoFromSparkContext(sc))
+ /**
+ * Return the given block stored in this block manager in O(1) time.
+ * Note that this is much faster than `this.blocks.get`, which is O(blocks) time.
+ */
+ def getBlock(blockId: BlockId): Option[BlockStatus] = {
+ blockId match {
+ case RDDBlockId(rddId, _) =>
+ _rddBlocks.get(rddId).map(_.get(blockId)).flatten
+ case _ =>
+ _nonRddBlocks.get(blockId)
+ }
}
- /** Returns storage information of all RDDs in the given list. */
- def rddInfoFromStorageStatus(
- storageStatuses: Seq[StorageStatus],
- rddInfos: Seq[RDDInfo],
- updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty): Array[RDDInfo] = {
-
- // Mapping from a block ID -> its status
- val blockMap = mutable.Map(storageStatuses.flatMap(_.rddBlocks): _*)
-
- // Record updated blocks, if any
- updatedBlocks
- .collect { case (id: RDDBlockId, status) => (id, status) }
- .foreach { case (id, status) => blockMap(id) = status }
-
- // Mapping from RDD ID -> an array of associated BlockStatuses
- val rddBlockMap = blockMap
- .groupBy { case (k, _) => k.rddId }
- .mapValues(_.values.toArray)
-
- // Mapping from RDD ID -> the associated RDDInfo (with potentially outdated storage information)
- val rddInfoMap = rddInfos.map { info => (info.id, info) }.toMap
-
- val rddStorageInfos = rddBlockMap.flatMap { case (rddId, blocks) =>
- // Add up memory, disk and Tachyon sizes
- val persistedBlocks =
- blocks.filter { status => status.memSize + status.diskSize + status.tachyonSize > 0 }
- val _storageLevel =
- if (persistedBlocks.length > 0) persistedBlocks(0).storageLevel else StorageLevel.NONE
- val memSize = persistedBlocks.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
- val diskSize = persistedBlocks.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
- val tachyonSize = persistedBlocks.map(_.tachyonSize).reduceOption(_ + _).getOrElse(0L)
- rddInfoMap.get(rddId).map { rddInfo =>
- rddInfo.storageLevel = _storageLevel
- rddInfo.numCachedPartitions = persistedBlocks.length
- rddInfo.memSize = memSize
- rddInfo.diskSize = diskSize
- rddInfo.tachyonSize = tachyonSize
- rddInfo
- }
- }.toArray
+ /**
+ * Return the number of blocks stored in this block manager in O(RDDs) time.
+ * Note that this is much faster than `this.blocks.size`, which is O(blocks) time.
+ */
+ def numBlocks: Int = _nonRddBlocks.size + numRddBlocks
+
+ /**
+ * Return the number of RDD blocks stored in this block manager in O(RDDs) time.
+ * Note that this is much faster than `this.rddBlocks.size`, which is O(RDD blocks) time.
+ */
+ def numRddBlocks: Int = _rddBlocks.values.map(_.size).sum
- scala.util.Sorting.quickSort(rddStorageInfos)
- rddStorageInfos
+ /**
+ * Return the number of blocks that belong to the given RDD in O(1) time.
+ * Note that this is much faster than `this.rddBlocksById(rddId).size`, which is
+ * O(blocks in this RDD) time.
+ */
+ def numRddBlocksById(rddId: Int): Int = _rddBlocks.get(rddId).map(_.size).getOrElse(0)
+
+ /** Return the memory remaining in this block manager. */
+ def memRemaining: Long = maxMem - memUsed
+
+ /** Return the memory used by this block manager. */
+ def memUsed: Long =
+ _nonRddStorageInfo._1 + _rddBlocks.keys.toSeq.map(memUsedByRdd).sum
+
+ /** Return the disk space used by this block manager. */
+ def diskUsed: Long =
+ _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum
+
+ /** Return the off-heap space used by this block manager. */
+ def offHeapUsed: Long =
+ _nonRddStorageInfo._3 + _rddBlocks.keys.toSeq.map(offHeapUsedByRdd).sum
+
+ /** Return the memory used by the given RDD in this block manager in O(1) time. */
+ def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._1).getOrElse(0L)
+
+ /** Return the disk space used by the given RDD in this block manager in O(1) time. */
+ def diskUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._2).getOrElse(0L)
+
+ /** Return the off-heap space used by the given RDD in this block manager in O(1) time. */
+ def offHeapUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._3).getOrElse(0L)
+
+ /** Return the storage level, if any, used by the given RDD in this block manager. */
+ def rddStorageLevel(rddId: Int): Option[StorageLevel] = _rddStorageInfo.get(rddId).map(_._4)
+
+ /**
+ * Update the relevant storage info, taking into account any existing status for this block.
+ */
+ private def updateStorageInfo(blockId: BlockId, newBlockStatus: BlockStatus): Unit = {
+ val oldBlockStatus = getBlock(blockId).getOrElse(BlockStatus.empty)
+ val changeInMem = newBlockStatus.memSize - oldBlockStatus.memSize
+ val changeInDisk = newBlockStatus.diskSize - oldBlockStatus.diskSize
+ val changeInTachyon = newBlockStatus.tachyonSize - oldBlockStatus.tachyonSize
+ val level = newBlockStatus.storageLevel
+
+ // Compute new info from old info
+ val (oldMem, oldDisk, oldTachyon) = blockId match {
+ case RDDBlockId(rddId, _) =>
+ _rddStorageInfo.get(rddId)
+ .map { case (mem, disk, tachyon, _) => (mem, disk, tachyon) }
+ .getOrElse((0L, 0L, 0L))
+ case _ =>
+ _nonRddStorageInfo
+ }
+ val newMem = math.max(oldMem + changeInMem, 0L)
+ val newDisk = math.max(oldDisk + changeInDisk, 0L)
+ val newTachyon = math.max(oldTachyon + changeInTachyon, 0L)
+
+ // Set the correct info
+ blockId match {
+ case RDDBlockId(rddId, _) =>
+ // If this RDD is no longer persisted, remove it
+ if (newMem + newDisk + newTachyon == 0) {
+ _rddStorageInfo.remove(rddId)
+ } else {
+ _rddStorageInfo(rddId) = (newMem, newDisk, newTachyon, level)
+ }
+ case _ =>
+ _nonRddStorageInfo = (newMem, newDisk, newTachyon)
+ }
}
- /** Returns a mapping from BlockId to the locations of the associated block. */
- def blockLocationsFromStorageStatus(
- storageStatuses: Seq[StorageStatus]): Map[BlockId, Seq[String]] = {
- val blockLocationPairs = storageStatuses.flatMap { storageStatus =>
- storageStatus.blocks.map { case (bid, _) => (bid, storageStatus.blockManagerId.hostPort) }
+}
+
+/** Helper methods for storage-related objects. */
+private[spark] object StorageUtils {
+
+ /**
+ * Update the given list of RDDInfo with the given list of storage statuses.
+ * This method overwrites the old values stored in the RDDInfo's.
+ */
+ def updateRddInfo(rddInfos: Seq[RDDInfo], statuses: Seq[StorageStatus]): Unit = {
+ rddInfos.foreach { rddInfo =>
+ val rddId = rddInfo.id
+ // Assume all blocks belonging to the same RDD have the same storage level
+ val storageLevel = statuses
+ .map(_.rddStorageLevel(rddId)).flatMap(s => s).headOption.getOrElse(StorageLevel.NONE)
+ val numCachedPartitions = statuses.map(_.numRddBlocksById(rddId)).sum
+ val memSize = statuses.map(_.memUsedByRdd(rddId)).sum
+ val diskSize = statuses.map(_.diskUsedByRdd(rddId)).sum
+ val tachyonSize = statuses.map(_.offHeapUsedByRdd(rddId)).sum
+
+ rddInfo.storageLevel = storageLevel
+ rddInfo.numCachedPartitions = numCachedPartitions
+ rddInfo.memSize = memSize
+ rddInfo.diskSize = diskSize
+ rddInfo.tachyonSize = tachyonSize
}
- blockLocationPairs.toMap
- .groupBy { case (blockId, _) => blockId }
- .mapValues(_.values.toSeq)
}
- /** Filters the given list of StorageStatus by the given RDD ID. */
- def filterStorageStatusByRDD(
- storageStatuses: Seq[StorageStatus],
- rddId: Int): Array[StorageStatus] = {
- storageStatuses.map { status =>
- val filteredBlocks = status.rddBlocks.filterKeys(_.rddId == rddId).toSeq
- val filteredBlockMap = mutable.Map[BlockId, BlockStatus](filteredBlocks: _*)
- new StorageStatus(status.blockManagerId, status.maxMem, filteredBlockMap)
- }.toArray
+ /**
+ * Return a mapping from block ID to its locations for each block that belongs to the given RDD.
+ */
+ def getRddBlockLocations(rddId: Int, statuses: Seq[StorageStatus]): Map[BlockId, Seq[String]] = {
+ val blockLocations = new mutable.HashMap[BlockId, mutable.ListBuffer[String]]
+ statuses.foreach { status =>
+ status.rddBlocksById(rddId).foreach { case (bid, _) =>
+ val location = status.blockManagerId.hostPort
+ blockLocations.getOrElseUpdate(bid, mutable.ListBuffer.empty) += location
+ }
+ }
+ blockLocations
}
+
}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index b358c855e1..b814b0e6b8 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -49,9 +49,9 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
def render(request: HttpServletRequest): Seq[Node] = {
val storageStatusList = listener.storageStatusList
- val maxMem = storageStatusList.map(_.maxMem).fold(0L)(_ + _)
- val memUsed = storageStatusList.map(_.memUsed).fold(0L)(_ + _)
- val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_ + _)
+ val maxMem = storageStatusList.map(_.maxMem).sum
+ val memUsed = storageStatusList.map(_.memUsed).sum
+ val diskUsed = storageStatusList.map(_.diskUsed).sum
val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId)
val execInfoSorted = execInfo.sortBy(_.id)
@@ -80,7 +80,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
</th>
</thead>
<tbody>
- {execInfoSorted.map(execRow(_))}
+ {execInfoSorted.map(execRow)}
</tbody>
</table>
@@ -91,7 +91,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
<li><strong>Memory:</strong>
{Utils.bytesToString(memUsed)} Used
({Utils.bytesToString(maxMem)} Total) </li>
- <li><strong>Disk:</strong> {Utils.bytesToString(diskSpaceUsed)} Used </li>
+ <li><strong>Disk:</strong> {Utils.bytesToString(diskUsed)} Used </li>
</ul>
</div>
</div>
@@ -145,7 +145,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
val status = listener.storageStatusList(statusId)
val execId = status.blockManagerId.executorId
val hostPort = status.blockManagerId.hostPort
- val rddBlocks = status.blocks.size
+ val rddBlocks = status.numBlocks
val memUsed = status.memUsed
val maxMem = status.maxMem
val diskUsed = status.diskUsed
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index 2155633b80..84ac53da47 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -45,12 +45,13 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
val workerTable = UIUtils.listingTable(workerHeader, workerRow, workers)
// Block table
- val filteredStorageStatusList = StorageUtils.filterStorageStatusByRDD(storageStatusList, rddId)
- val blockStatuses = filteredStorageStatusList.flatMap(_.blocks).sortWith(_._1.name < _._1.name)
- val blockLocations = StorageUtils.blockLocationsFromStorageStatus(filteredStorageStatusList)
- val blocks = blockStatuses.map { case (blockId, status) =>
- (blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown")))
- }
+ val blockLocations = StorageUtils.getRddBlockLocations(rddId, storageStatusList)
+ val blocks = storageStatusList
+ .flatMap(_.rddBlocksById(rddId))
+ .sortWith(_._1.name < _._1.name)
+ .map { case (blockId, status) =>
+ (blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown")))
+ }
val blockTable = UIUtils.listingTable(blockHeader, blockRow, blocks)
val content =
@@ -119,10 +120,10 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
<tr>
<td>{status.blockManagerId.host + ":" + status.blockManagerId.port}</td>
<td>
- {Utils.bytesToString(status.memUsedByRDD(rddId))}
+ {Utils.bytesToString(status.memUsedByRdd(rddId))}
({Utils.bytesToString(status.memRemaining)} Remaining)
</td>
- <td>{Utils.bytesToString(status.diskUsedByRDD(rddId))}</td>
+ <td>{Utils.bytesToString(status.diskUsedByRdd(rddId))}</td>
</tr>
}
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
index 0cc0cf3117..5f6740d495 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
@@ -41,19 +41,18 @@ private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage"
*/
@DeveloperApi
class StorageListener(storageStatusListener: StorageStatusListener) extends SparkListener {
- private val _rddInfoMap = mutable.Map[Int, RDDInfo]()
+ private[ui] val _rddInfoMap = mutable.Map[Int, RDDInfo]() // exposed for testing
def storageStatusList = storageStatusListener.storageStatusList
/** Filter RDD info to include only those with cached partitions */
def rddInfoList = _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq
- /** Update each RDD's info to reflect any updates to the RDD's storage status */
- private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty) {
- val rddInfos = _rddInfoMap.values.toSeq
- val updatedRddInfos =
- StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos, updatedBlocks)
- updatedRddInfos.foreach { info => _rddInfoMap(info.id) = info }
+ /** Update the storage info of the RDDs whose blocks are among the given updated blocks */
+ private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)]): Unit = {
+ val rddIdsToUpdate = updatedBlocks.flatMap { case (bid, _) => bid.asRDDId.map(_.rddId) }.toSet
+ val rddInfosToUpdate = _rddInfoMap.values.toSeq.filter { s => rddIdsToUpdate.contains(s.id) }
+ StorageUtils.updateRddInfo(rddInfosToUpdate, storageStatusList)
}
/**
diff --git a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
index fb18c3ebfe..e6ab538d77 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark
import org.scalatest.{Assertions, FunSuite}
+import org.apache.spark.storage.StorageLevel
class SparkContextInfoSuite extends FunSuite with LocalSparkContext {
test("getPersistentRDDs only returns RDDs that are marked as cached") {
@@ -35,26 +36,33 @@ class SparkContextInfoSuite extends FunSuite with LocalSparkContext {
test("getPersistentRDDs returns an immutable map") {
sc = new SparkContext("local", "test")
val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
-
val myRdds = sc.getPersistentRDDs
assert(myRdds.size === 1)
- assert(myRdds.values.head === rdd1)
+ assert(myRdds(0) === rdd1)
+ assert(myRdds(0).getStorageLevel === StorageLevel.MEMORY_ONLY)
+ // myRdds2 should have 2 RDDs, but myRdds should not change
val rdd2 = sc.makeRDD(Array(5, 6, 7, 8), 1).cache()
-
- // getPersistentRDDs should have 2 RDDs, but myRdds should not change
- assert(sc.getPersistentRDDs.size === 2)
+ val myRdds2 = sc.getPersistentRDDs
+ assert(myRdds2.size === 2)
+ assert(myRdds2(0) === rdd1)
+ assert(myRdds2(1) === rdd2)
+ assert(myRdds2(0).getStorageLevel === StorageLevel.MEMORY_ONLY)
+ assert(myRdds2(1).getStorageLevel === StorageLevel.MEMORY_ONLY)
assert(myRdds.size === 1)
+ assert(myRdds(0) === rdd1)
+ assert(myRdds(0).getStorageLevel === StorageLevel.MEMORY_ONLY)
}
test("getRDDStorageInfo only reports on RDDs that actually persist data") {
sc = new SparkContext("local", "test")
val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
-
assert(sc.getRDDStorageInfo.size === 0)
-
rdd.collect()
assert(sc.getRDDStorageInfo.size === 1)
+ assert(sc.getRDDStorageInfo.head.isCached)
+ assert(sc.getRDDStorageInfo.head.memSize > 0)
+ assert(sc.getRDDStorageInfo.head.storageLevel === StorageLevel.MEMORY_ONLY)
}
test("call sites report correct locations") {
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
index 2179c6dd33..51fb646a3c 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
@@ -41,13 +41,13 @@ class StorageStatusListenerSuite extends FunSuite {
assert(listener.executorIdToStorageStatus.get("big").isDefined)
assert(listener.executorIdToStorageStatus("big").blockManagerId === bm1)
assert(listener.executorIdToStorageStatus("big").maxMem === 1000L)
- assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
+ assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
assert(listener.executorIdToStorageStatus.size === 2)
assert(listener.executorIdToStorageStatus.get("fat").isDefined)
assert(listener.executorIdToStorageStatus("fat").blockManagerId === bm2)
assert(listener.executorIdToStorageStatus("fat").maxMem === 2000L)
- assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+ assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
// Block manager remove
listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm1))
@@ -67,14 +67,14 @@ class StorageStatusListenerSuite extends FunSuite {
val taskMetrics = new TaskMetrics
// Task end with no updated blocks
- assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
- assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+ assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
+ assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics))
- assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
- assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+ assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
+ assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics))
- assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
- assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+ assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
+ assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
}
test("task end with updated blocks") {
@@ -90,20 +90,20 @@ class StorageStatusListenerSuite extends FunSuite {
taskMetrics2.updatedBlocks = Some(Seq(block3))
// Task end with new blocks
- assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
- assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+ assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
+ assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
- assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
- assert(listener.executorIdToStorageStatus("fat").blocks.size === 0)
- assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
- assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
- assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+ assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
+ assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
+ assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
+ assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
+ assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2))
- assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
- assert(listener.executorIdToStorageStatus("fat").blocks.size === 1)
- assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
- assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
- assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0)))
+ assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
+ assert(listener.executorIdToStorageStatus("fat").numBlocks === 1)
+ assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
+ assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
+ assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0)))
// Task end with dropped blocks
val droppedBlock1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
@@ -112,17 +112,17 @@ class StorageStatusListenerSuite extends FunSuite {
taskMetrics1.updatedBlocks = Some(Seq(droppedBlock1, droppedBlock3))
taskMetrics2.updatedBlocks = Some(Seq(droppedBlock2, droppedBlock3))
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
- assert(listener.executorIdToStorageStatus("big").blocks.size === 1)
- assert(listener.executorIdToStorageStatus("fat").blocks.size === 1)
- assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
- assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
- assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0)))
+ assert(listener.executorIdToStorageStatus("big").numBlocks === 1)
+ assert(listener.executorIdToStorageStatus("fat").numBlocks === 1)
+ assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
+ assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
+ assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0)))
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2))
- assert(listener.executorIdToStorageStatus("big").blocks.size === 1)
- assert(listener.executorIdToStorageStatus("fat").blocks.size === 0)
- assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
- assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
- assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+ assert(listener.executorIdToStorageStatus("big").numBlocks === 1)
+ assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
+ assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
+ assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
+ assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
}
test("unpersist RDD") {
@@ -137,16 +137,16 @@ class StorageStatusListenerSuite extends FunSuite {
taskMetrics2.updatedBlocks = Some(Seq(block3))
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics2))
- assert(listener.executorIdToStorageStatus("big").blocks.size === 3)
+ assert(listener.executorIdToStorageStatus("big").numBlocks === 3)
// Unpersist RDD
listener.onUnpersistRDD(SparkListenerUnpersistRDD(9090))
- assert(listener.executorIdToStorageStatus("big").blocks.size === 3)
+ assert(listener.executorIdToStorageStatus("big").numBlocks === 3)
listener.onUnpersistRDD(SparkListenerUnpersistRDD(4))
- assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
- assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
- assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
+ assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
+ assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
+ assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
listener.onUnpersistRDD(SparkListenerUnpersistRDD(1))
- assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
+ assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
}
}
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
new file mode 100644
index 0000000000..38678bbd1d
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import org.scalatest.FunSuite
+
+/**
+ * Test various functionalities in StorageUtils and StorageStatus.
+ */
+class StorageSuite extends FunSuite {
+ private val memAndDisk = StorageLevel.MEMORY_AND_DISK
+
+ // For testing add, update, and remove (for non-RDD blocks)
+ private def storageStatus1: StorageStatus = {
+ val status = new StorageStatus(BlockManagerId("big", "dog", 1, 1), 1000L)
+ assert(status.blocks.isEmpty)
+ assert(status.rddBlocks.isEmpty)
+ assert(status.memUsed === 0L)
+ assert(status.memRemaining === 1000L)
+ assert(status.diskUsed === 0L)
+ assert(status.offHeapUsed === 0L)
+ status.addBlock(TestBlockId("foo"), BlockStatus(memAndDisk, 10L, 20L, 1L))
+ status.addBlock(TestBlockId("fee"), BlockStatus(memAndDisk, 10L, 20L, 1L))
+ status.addBlock(TestBlockId("faa"), BlockStatus(memAndDisk, 10L, 20L, 1L))
+ status
+ }
+
+ test("storage status add non-RDD blocks") {
+ val status = storageStatus1
+ assert(status.blocks.size === 3)
+ assert(status.blocks.contains(TestBlockId("foo")))
+ assert(status.blocks.contains(TestBlockId("fee")))
+ assert(status.blocks.contains(TestBlockId("faa")))
+ assert(status.rddBlocks.isEmpty)
+ assert(status.memUsed === 30L)
+ assert(status.memRemaining === 970L)
+ assert(status.diskUsed === 60L)
+ assert(status.offHeapUsed === 3L)
+ }
+
+ test("storage status update non-RDD blocks") {
+ val status = storageStatus1
+ status.updateBlock(TestBlockId("foo"), BlockStatus(memAndDisk, 50L, 100L, 1L))
+ status.updateBlock(TestBlockId("fee"), BlockStatus(memAndDisk, 100L, 20L, 0L))
+ assert(status.blocks.size === 3)
+ assert(status.memUsed === 160L)
+ assert(status.memRemaining === 840L)
+ assert(status.diskUsed === 140L)
+ assert(status.offHeapUsed === 2L)
+ }
+
+ test("storage status remove non-RDD blocks") {
+ val status = storageStatus1
+ status.removeBlock(TestBlockId("foo"))
+ status.removeBlock(TestBlockId("faa"))
+ assert(status.blocks.size === 1)
+ assert(status.blocks.contains(TestBlockId("fee")))
+ assert(status.memUsed === 10L)
+ assert(status.memRemaining === 990L)
+ assert(status.diskUsed === 20L)
+ assert(status.offHeapUsed === 1L)
+ }
+
+ // For testing add, update, remove, get, and contains etc. for both RDD and non-RDD blocks
+ private def storageStatus2: StorageStatus = {
+ val status = new StorageStatus(BlockManagerId("big", "dog", 1, 1), 1000L)
+ assert(status.rddBlocks.isEmpty)
+ status.addBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 10L, 20L, 0L))
+ status.addBlock(TestBlockId("man"), BlockStatus(memAndDisk, 10L, 20L, 0L))
+ status.addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 10L, 20L, 1L))
+ status.addBlock(RDDBlockId(1, 1), BlockStatus(memAndDisk, 100L, 200L, 1L))
+ status.addBlock(RDDBlockId(2, 2), BlockStatus(memAndDisk, 10L, 20L, 1L))
+ status.addBlock(RDDBlockId(2, 3), BlockStatus(memAndDisk, 10L, 20L, 0L))
+ status.addBlock(RDDBlockId(2, 4), BlockStatus(memAndDisk, 10L, 40L, 0L))
+ status
+ }
+
+ test("storage status add RDD blocks") {
+ val status = storageStatus2
+ assert(status.blocks.size === 7)
+ assert(status.rddBlocks.size === 5)
+ assert(status.rddBlocks.contains(RDDBlockId(0, 0)))
+ assert(status.rddBlocks.contains(RDDBlockId(1, 1)))
+ assert(status.rddBlocks.contains(RDDBlockId(2, 2)))
+ assert(status.rddBlocks.contains(RDDBlockId(2, 3)))
+ assert(status.rddBlocks.contains(RDDBlockId(2, 4)))
+ assert(status.rddBlocksById(0).size === 1)
+ assert(status.rddBlocksById(0).contains(RDDBlockId(0, 0)))
+ assert(status.rddBlocksById(1).size === 1)
+ assert(status.rddBlocksById(1).contains(RDDBlockId(1, 1)))
+ assert(status.rddBlocksById(2).size === 3)
+ assert(status.rddBlocksById(2).contains(RDDBlockId(2, 2)))
+ assert(status.rddBlocksById(2).contains(RDDBlockId(2, 3)))
+ assert(status.rddBlocksById(2).contains(RDDBlockId(2, 4)))
+ assert(status.memUsedByRdd(0) === 10L)
+ assert(status.memUsedByRdd(1) === 100L)
+ assert(status.memUsedByRdd(2) === 30L)
+ assert(status.diskUsedByRdd(0) === 20L)
+ assert(status.diskUsedByRdd(1) === 200L)
+ assert(status.diskUsedByRdd(2) === 80L)
+ assert(status.offHeapUsedByRdd(0) === 1L)
+ assert(status.offHeapUsedByRdd(1) === 1L)
+ assert(status.offHeapUsedByRdd(2) === 1L)
+ assert(status.rddStorageLevel(0) === Some(memAndDisk))
+ assert(status.rddStorageLevel(1) === Some(memAndDisk))
+ assert(status.rddStorageLevel(2) === Some(memAndDisk))
+
+ // Verify default values for RDDs that don't exist
+ assert(status.rddBlocksById(10).isEmpty)
+ assert(status.memUsedByRdd(10) === 0L)
+ assert(status.diskUsedByRdd(10) === 0L)
+ assert(status.offHeapUsedByRdd(10) === 0L)
+ assert(status.rddStorageLevel(10) === None)
+ }
+
+ test("storage status update RDD blocks") {
+ val status = storageStatus2
+ status.updateBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 5000L, 0L, 0L))
+ status.updateBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 0L, 0L, 0L))
+ status.updateBlock(RDDBlockId(2, 2), BlockStatus(memAndDisk, 0L, 1000L, 0L))
+ assert(status.blocks.size === 7)
+ assert(status.rddBlocks.size === 5)
+ assert(status.rddBlocksById(0).size === 1)
+ assert(status.rddBlocksById(1).size === 1)
+ assert(status.rddBlocksById(2).size === 3)
+ assert(status.memUsedByRdd(0) === 0L)
+ assert(status.memUsedByRdd(1) === 100L)
+ assert(status.memUsedByRdd(2) === 20L)
+ assert(status.diskUsedByRdd(0) === 0L)
+ assert(status.diskUsedByRdd(1) === 200L)
+ assert(status.diskUsedByRdd(2) === 1060L)
+ assert(status.offHeapUsedByRdd(0) === 0L)
+ assert(status.offHeapUsedByRdd(1) === 1L)
+ assert(status.offHeapUsedByRdd(2) === 0L)
+ }
+
+ test("storage status remove RDD blocks") {
+ val status = storageStatus2
+ status.removeBlock(TestBlockId("man"))
+ status.removeBlock(RDDBlockId(1, 1))
+ status.removeBlock(RDDBlockId(2, 2))
+ status.removeBlock(RDDBlockId(2, 4))
+ assert(status.blocks.size === 3)
+ assert(status.rddBlocks.size === 2)
+ assert(status.rddBlocks.contains(RDDBlockId(0, 0)))
+ assert(status.rddBlocks.contains(RDDBlockId(2, 3)))
+ assert(status.rddBlocksById(0).size === 1)
+ assert(status.rddBlocksById(0).contains(RDDBlockId(0, 0)))
+ assert(status.rddBlocksById(1).size === 0)
+ assert(status.rddBlocksById(2).size === 1)
+ assert(status.rddBlocksById(2).contains(RDDBlockId(2, 3)))
+ assert(status.memUsedByRdd(0) === 10L)
+ assert(status.memUsedByRdd(1) === 0L)
+ assert(status.memUsedByRdd(2) === 10L)
+ assert(status.diskUsedByRdd(0) === 20L)
+ assert(status.diskUsedByRdd(1) === 0L)
+ assert(status.diskUsedByRdd(2) === 20L)
+ assert(status.offHeapUsedByRdd(0) === 1L)
+ assert(status.offHeapUsedByRdd(1) === 0L)
+ assert(status.offHeapUsedByRdd(2) === 0L)
+ }
+
+ test("storage status containsBlock") {
+ val status = storageStatus2
+ // blocks that actually exist
+ assert(status.blocks.contains(TestBlockId("dan")) === status.containsBlock(TestBlockId("dan")))
+ assert(status.blocks.contains(TestBlockId("man")) === status.containsBlock(TestBlockId("man")))
+ assert(status.blocks.contains(RDDBlockId(0, 0)) === status.containsBlock(RDDBlockId(0, 0)))
+ assert(status.blocks.contains(RDDBlockId(1, 1)) === status.containsBlock(RDDBlockId(1, 1)))
+ assert(status.blocks.contains(RDDBlockId(2, 2)) === status.containsBlock(RDDBlockId(2, 2)))
+ assert(status.blocks.contains(RDDBlockId(2, 3)) === status.containsBlock(RDDBlockId(2, 3)))
+ assert(status.blocks.contains(RDDBlockId(2, 4)) === status.containsBlock(RDDBlockId(2, 4)))
+ // blocks that don't exist
+ assert(status.blocks.contains(TestBlockId("fan")) === status.containsBlock(TestBlockId("fan")))
+ assert(status.blocks.contains(RDDBlockId(100, 0)) === status.containsBlock(RDDBlockId(100, 0)))
+ }
+
+ test("storage status getBlock") {
+ val status = storageStatus2
+ // blocks that actually exist
+ assert(status.blocks.get(TestBlockId("dan")) === status.getBlock(TestBlockId("dan")))
+ assert(status.blocks.get(TestBlockId("man")) === status.getBlock(TestBlockId("man")))
+ assert(status.blocks.get(RDDBlockId(0, 0)) === status.getBlock(RDDBlockId(0, 0)))
+ assert(status.blocks.get(RDDBlockId(1, 1)) === status.getBlock(RDDBlockId(1, 1)))
+ assert(status.blocks.get(RDDBlockId(2, 2)) === status.getBlock(RDDBlockId(2, 2)))
+ assert(status.blocks.get(RDDBlockId(2, 3)) === status.getBlock(RDDBlockId(2, 3)))
+ assert(status.blocks.get(RDDBlockId(2, 4)) === status.getBlock(RDDBlockId(2, 4)))
+ // blocks that don't exist
+ assert(status.blocks.get(TestBlockId("fan")) === status.getBlock(TestBlockId("fan")))
+ assert(status.blocks.get(RDDBlockId(100, 0)) === status.getBlock(RDDBlockId(100, 0)))
+ }
+
+ test("storage status num[Rdd]Blocks") {
+ val status = storageStatus2
+ assert(status.blocks.size === status.numBlocks)
+ assert(status.rddBlocks.size === status.numRddBlocks)
+ status.addBlock(TestBlockId("Foo"), BlockStatus(memAndDisk, 0L, 0L, 100L))
+ status.addBlock(RDDBlockId(4, 4), BlockStatus(memAndDisk, 0L, 0L, 100L))
+ status.addBlock(RDDBlockId(4, 8), BlockStatus(memAndDisk, 0L, 0L, 100L))
+ assert(status.blocks.size === status.numBlocks)
+ assert(status.rddBlocks.size === status.numRddBlocks)
+ assert(status.rddBlocksById(4).size === status.numRddBlocksById(4))
+ assert(status.rddBlocksById(10).size === status.numRddBlocksById(10))
+ status.updateBlock(TestBlockId("Foo"), BlockStatus(memAndDisk, 0L, 10L, 400L))
+ status.updateBlock(RDDBlockId(4, 0), BlockStatus(memAndDisk, 0L, 0L, 100L))
+ status.updateBlock(RDDBlockId(4, 8), BlockStatus(memAndDisk, 0L, 0L, 100L))
+ status.updateBlock(RDDBlockId(10, 10), BlockStatus(memAndDisk, 0L, 0L, 100L))
+ assert(status.blocks.size === status.numBlocks)
+ assert(status.rddBlocks.size === status.numRddBlocks)
+ assert(status.rddBlocksById(4).size === status.numRddBlocksById(4))
+ assert(status.rddBlocksById(10).size === status.numRddBlocksById(10))
+ assert(status.rddBlocksById(100).size === status.numRddBlocksById(100))
+ status.removeBlock(RDDBlockId(4, 0))
+ status.removeBlock(RDDBlockId(10, 10))
+ assert(status.blocks.size === status.numBlocks)
+ assert(status.rddBlocks.size === status.numRddBlocks)
+ assert(status.rddBlocksById(4).size === status.numRddBlocksById(4))
+ assert(status.rddBlocksById(10).size === status.numRddBlocksById(10))
+ // remove a block that doesn't exist
+ status.removeBlock(RDDBlockId(1000, 999))
+ assert(status.blocks.size === status.numBlocks)
+ assert(status.rddBlocks.size === status.numRddBlocks)
+ assert(status.rddBlocksById(4).size === status.numRddBlocksById(4))
+ assert(status.rddBlocksById(10).size === status.numRddBlocksById(10))
+ assert(status.rddBlocksById(1000).size === status.numRddBlocksById(1000))
+ }
+
+ test("storage status memUsed, diskUsed, tachyonUsed") {
+ val status = storageStatus2
+ def actualMemUsed: Long = status.blocks.values.map(_.memSize).sum
+ def actualDiskUsed: Long = status.blocks.values.map(_.diskSize).sum
+ def actualOffHeapUsed: Long = status.blocks.values.map(_.tachyonSize).sum
+ assert(status.memUsed === actualMemUsed)
+ assert(status.diskUsed === actualDiskUsed)
+ assert(status.offHeapUsed === actualOffHeapUsed)
+ status.addBlock(TestBlockId("fire"), BlockStatus(memAndDisk, 4000L, 5000L, 6000L))
+ status.addBlock(TestBlockId("wire"), BlockStatus(memAndDisk, 400L, 500L, 600L))
+ status.addBlock(RDDBlockId(25, 25), BlockStatus(memAndDisk, 40L, 50L, 60L))
+ assert(status.memUsed === actualMemUsed)
+ assert(status.diskUsed === actualDiskUsed)
+ assert(status.offHeapUsed === actualOffHeapUsed)
+ status.updateBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 4L, 5L, 6L))
+ status.updateBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 4L, 5L, 6L))
+ status.updateBlock(RDDBlockId(1, 1), BlockStatus(memAndDisk, 4L, 5L, 6L))
+ assert(status.memUsed === actualMemUsed)
+ assert(status.diskUsed === actualDiskUsed)
+ assert(status.offHeapUsed === actualOffHeapUsed)
+ status.removeBlock(TestBlockId("fire"))
+ status.removeBlock(TestBlockId("man"))
+ status.removeBlock(RDDBlockId(2, 2))
+ status.removeBlock(RDDBlockId(2, 3))
+ assert(status.memUsed === actualMemUsed)
+ assert(status.diskUsed === actualDiskUsed)
+ assert(status.offHeapUsed === actualOffHeapUsed)
+ }
+
+ // For testing StorageUtils.updateRddInfo and StorageUtils.getRddBlockLocations
+ private def stockStorageStatuses: Seq[StorageStatus] = {
+ val status1 = new StorageStatus(BlockManagerId("big", "dog", 1, 1), 1000L)
+ val status2 = new StorageStatus(BlockManagerId("fat", "duck", 2, 2), 2000L)
+ val status3 = new StorageStatus(BlockManagerId("fat", "cat", 3, 3), 3000L)
+ status1.addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 1L, 2L, 0L))
+ status1.addBlock(RDDBlockId(0, 1), BlockStatus(memAndDisk, 1L, 2L, 0L))
+ status2.addBlock(RDDBlockId(0, 2), BlockStatus(memAndDisk, 1L, 2L, 0L))
+ status2.addBlock(RDDBlockId(0, 3), BlockStatus(memAndDisk, 1L, 2L, 0L))
+ status2.addBlock(RDDBlockId(1, 0), BlockStatus(memAndDisk, 1L, 2L, 0L))
+ status2.addBlock(RDDBlockId(1, 1), BlockStatus(memAndDisk, 1L, 2L, 0L))
+ status3.addBlock(RDDBlockId(0, 4), BlockStatus(memAndDisk, 1L, 2L, 0L))
+ status3.addBlock(RDDBlockId(1, 2), BlockStatus(memAndDisk, 1L, 2L, 0L))
+ Seq(status1, status2, status3)
+ }
+
+ // For testing StorageUtils.updateRddInfo
+ private def stockRDDInfos: Seq[RDDInfo] = {
+ val info0 = new RDDInfo(0, "0", 10, memAndDisk)
+ val info1 = new RDDInfo(1, "1", 3, memAndDisk)
+ Seq(info0, info1)
+ }
+
+ test("StorageUtils.updateRddInfo") {
+ val storageStatuses = stockStorageStatuses
+ val rddInfos = stockRDDInfos
+ StorageUtils.updateRddInfo(rddInfos, storageStatuses)
+ assert(rddInfos(0).storageLevel === memAndDisk)
+ assert(rddInfos(0).numCachedPartitions === 5)
+ assert(rddInfos(0).memSize === 5L)
+ assert(rddInfos(0).diskSize === 10L)
+ assert(rddInfos(0).tachyonSize === 0L)
+ assert(rddInfos(1).storageLevel === memAndDisk)
+ assert(rddInfos(1).numCachedPartitions === 3)
+ assert(rddInfos(1).memSize === 3L)
+ assert(rddInfos(1).diskSize === 6L)
+ assert(rddInfos(1).tachyonSize === 0L)
+ }
+
+ test("StorageUtils.getRddBlockLocations") {
+ val storageStatuses = stockStorageStatuses
+ val blockLocations0 = StorageUtils.getRddBlockLocations(0, storageStatuses)
+ val blockLocations1 = StorageUtils.getRddBlockLocations(1, storageStatuses)
+ assert(blockLocations0.size === 5)
+ assert(blockLocations1.size === 3)
+ assert(blockLocations0.contains(RDDBlockId(0, 0)))
+ assert(blockLocations0.contains(RDDBlockId(0, 1)))
+ assert(blockLocations0.contains(RDDBlockId(0, 2)))
+ assert(blockLocations0.contains(RDDBlockId(0, 3)))
+ assert(blockLocations0.contains(RDDBlockId(0, 4)))
+ assert(blockLocations1.contains(RDDBlockId(1, 0)))
+ assert(blockLocations1.contains(RDDBlockId(1, 1)))
+ assert(blockLocations1.contains(RDDBlockId(1, 2)))
+ assert(blockLocations0(RDDBlockId(0, 0)) === Seq("dog:1"))
+ assert(blockLocations0(RDDBlockId(0, 1)) === Seq("dog:1"))
+ assert(blockLocations0(RDDBlockId(0, 2)) === Seq("duck:2"))
+ assert(blockLocations0(RDDBlockId(0, 3)) === Seq("duck:2"))
+ assert(blockLocations0(RDDBlockId(0, 4)) === Seq("cat:3"))
+ assert(blockLocations1(RDDBlockId(1, 0)) === Seq("duck:2"))
+ assert(blockLocations1(RDDBlockId(1, 1)) === Seq("duck:2"))
+ assert(blockLocations1(RDDBlockId(1, 2)) === Seq("cat:3"))
+ }
+
+ test("StorageUtils.getRddBlockLocations with multiple locations") {
+ val storageStatuses = stockStorageStatuses
+ storageStatuses(0).addBlock(RDDBlockId(1, 0), BlockStatus(memAndDisk, 1L, 2L, 0L))
+ storageStatuses(0).addBlock(RDDBlockId(0, 4), BlockStatus(memAndDisk, 1L, 2L, 0L))
+ storageStatuses(2).addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 1L, 2L, 0L))
+ val blockLocations0 = StorageUtils.getRddBlockLocations(0, storageStatuses)
+ val blockLocations1 = StorageUtils.getRddBlockLocations(1, storageStatuses)
+ assert(blockLocations0.size === 5)
+ assert(blockLocations1.size === 3)
+ assert(blockLocations0(RDDBlockId(0, 0)) === Seq("dog:1", "cat:3"))
+ assert(blockLocations0(RDDBlockId(0, 1)) === Seq("dog:1"))
+ assert(blockLocations0(RDDBlockId(0, 2)) === Seq("duck:2"))
+ assert(blockLocations0(RDDBlockId(0, 3)) === Seq("duck:2"))
+ assert(blockLocations0(RDDBlockId(0, 4)) === Seq("dog:1", "cat:3"))
+ assert(blockLocations1(RDDBlockId(1, 0)) === Seq("dog:1", "duck:2"))
+ assert(blockLocations1(RDDBlockId(1, 1)) === Seq("duck:2"))
+ assert(blockLocations1(RDDBlockId(1, 2)) === Seq("cat:3"))
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
new file mode 100644
index 0000000000..6e68dcb342
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.storage
+
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.apache.spark.Success
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler._
+import org.apache.spark.storage._
+
+/**
+ * Test various functionality in the StorageListener that supports the StorageTab.
+ */
+class StorageTabSuite extends FunSuite with BeforeAndAfter {
+ private var bus: LiveListenerBus = _
+ private var storageStatusListener: StorageStatusListener = _
+ private var storageListener: StorageListener = _
+ private val memAndDisk = StorageLevel.MEMORY_AND_DISK
+ private val memOnly = StorageLevel.MEMORY_ONLY
+ private val none = StorageLevel.NONE
+ private val taskInfo = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false)
+ private def rddInfo0 = new RDDInfo(0, "freedom", 100, memOnly)
+ private def rddInfo1 = new RDDInfo(1, "hostage", 200, memOnly)
+ private def rddInfo2 = new RDDInfo(2, "sanity", 300, memAndDisk)
+ private def rddInfo3 = new RDDInfo(3, "grace", 400, memAndDisk)
+ private val bm1 = BlockManagerId("big", "dog", 1, 1)
+
+ before {
+ bus = new LiveListenerBus
+ storageStatusListener = new StorageStatusListener
+ storageListener = new StorageListener(storageStatusListener)
+ bus.addListener(storageStatusListener)
+ bus.addListener(storageListener)
+ }
+
+ test("stage submitted / completed") {
+ assert(storageListener._rddInfoMap.isEmpty)
+ assert(storageListener.rddInfoList.isEmpty)
+
+ // 2 RDDs are known, but none are cached
+ val stageInfo0 = new StageInfo(0, "0", 100, Seq(rddInfo0, rddInfo1), "details")
+ bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
+ assert(storageListener._rddInfoMap.size === 2)
+ assert(storageListener.rddInfoList.isEmpty)
+
+ // 4 RDDs are known, but only 2 are cached
+ val rddInfo2Cached = rddInfo2
+ val rddInfo3Cached = rddInfo3
+ rddInfo2Cached.numCachedPartitions = 1
+ rddInfo3Cached.numCachedPartitions = 1
+ val stageInfo1 = new StageInfo(1, "0", 100, Seq(rddInfo2Cached, rddInfo3Cached), "details")
+ bus.postToAll(SparkListenerStageSubmitted(stageInfo1))
+ assert(storageListener._rddInfoMap.size === 4)
+ assert(storageListener.rddInfoList.size === 2)
+
+ // Submitting RDDInfos with duplicate IDs does nothing
+ val rddInfo0Cached = new RDDInfo(0, "freedom", 100, StorageLevel.MEMORY_ONLY)
+ rddInfo0Cached.numCachedPartitions = 1
+ val stageInfo0Cached = new StageInfo(0, "0", 100, Seq(rddInfo0), "details")
+ bus.postToAll(SparkListenerStageSubmitted(stageInfo0Cached))
+ assert(storageListener._rddInfoMap.size === 4)
+ assert(storageListener.rddInfoList.size === 2)
+
+ // We only keep around the RDDs that are cached
+ bus.postToAll(SparkListenerStageCompleted(stageInfo0))
+ assert(storageListener._rddInfoMap.size === 2)
+ assert(storageListener.rddInfoList.size === 2)
+ }
+
+ test("unpersist") {
+ val rddInfo0Cached = rddInfo0
+ val rddInfo1Cached = rddInfo1
+ rddInfo0Cached.numCachedPartitions = 1
+ rddInfo1Cached.numCachedPartitions = 1
+ val stageInfo0 = new StageInfo(0, "0", 100, Seq(rddInfo0Cached, rddInfo1Cached), "details")
+ bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
+ assert(storageListener._rddInfoMap.size === 2)
+ assert(storageListener.rddInfoList.size === 2)
+ bus.postToAll(SparkListenerUnpersistRDD(0))
+ assert(storageListener._rddInfoMap.size === 1)
+ assert(storageListener.rddInfoList.size === 1)
+ bus.postToAll(SparkListenerUnpersistRDD(4)) // doesn't exist
+ assert(storageListener._rddInfoMap.size === 1)
+ assert(storageListener.rddInfoList.size === 1)
+ bus.postToAll(SparkListenerUnpersistRDD(1))
+ assert(storageListener._rddInfoMap.size === 0)
+ assert(storageListener.rddInfoList.size === 0)
+ }
+
+ test("task end") {
+ val myRddInfo0 = rddInfo0
+ val myRddInfo1 = rddInfo1
+ val myRddInfo2 = rddInfo2
+ val stageInfo0 = new StageInfo(0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), "details")
+ bus.postToAll(SparkListenerBlockManagerAdded(bm1, 1000L))
+ bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
+ assert(storageListener._rddInfoMap.size === 3)
+ assert(storageListener.rddInfoList.size === 0) // not cached
+ assert(!storageListener._rddInfoMap(0).isCached)
+ assert(!storageListener._rddInfoMap(1).isCached)
+ assert(!storageListener._rddInfoMap(2).isCached)
+
+ // Task end with no updated blocks. This should not change anything.
+ bus.postToAll(SparkListenerTaskEnd(0, "obliteration", Success, taskInfo, new TaskMetrics))
+ assert(storageListener._rddInfoMap.size === 3)
+ assert(storageListener.rddInfoList.size === 0)
+
+ // Task end with a few new persisted blocks, some from the same RDD
+ val metrics1 = new TaskMetrics
+ metrics1.updatedBlocks = Some(Seq(
+ (RDDBlockId(0, 100), BlockStatus(memAndDisk, 400L, 0L, 0L)),
+ (RDDBlockId(0, 101), BlockStatus(memAndDisk, 0L, 400L, 0L)),
+ (RDDBlockId(0, 102), BlockStatus(memAndDisk, 400L, 0L, 200L)),
+ (RDDBlockId(1, 20), BlockStatus(memAndDisk, 0L, 240L, 0L))
+ ))
+ bus.postToAll(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo, metrics1))
+ assert(storageListener._rddInfoMap(0).memSize === 800L)
+ assert(storageListener._rddInfoMap(0).diskSize === 400L)
+ assert(storageListener._rddInfoMap(0).tachyonSize === 200L)
+ assert(storageListener._rddInfoMap(0).numCachedPartitions === 3)
+ assert(storageListener._rddInfoMap(0).isCached)
+ assert(storageListener._rddInfoMap(1).memSize === 0L)
+ assert(storageListener._rddInfoMap(1).diskSize === 240L)
+ assert(storageListener._rddInfoMap(1).tachyonSize === 0L)
+ assert(storageListener._rddInfoMap(1).numCachedPartitions === 1)
+ assert(storageListener._rddInfoMap(1).isCached)
+ assert(!storageListener._rddInfoMap(2).isCached)
+ assert(storageListener._rddInfoMap(2).numCachedPartitions === 0)
+
+ // Task end with a few dropped blocks
+ val metrics2 = new TaskMetrics
+ metrics2.updatedBlocks = Some(Seq(
+ (RDDBlockId(0, 100), BlockStatus(none, 0L, 0L, 0L)),
+ (RDDBlockId(1, 20), BlockStatus(none, 0L, 0L, 0L)),
+ (RDDBlockId(2, 40), BlockStatus(none, 0L, 0L, 0L)), // doesn't actually exist
+ (RDDBlockId(4, 80), BlockStatus(none, 0L, 0L, 0L)) // doesn't actually exist
+ ))
+ bus.postToAll(SparkListenerTaskEnd(2, "obliteration", Success, taskInfo, metrics2))
+ assert(storageListener._rddInfoMap(0).memSize === 400L)
+ assert(storageListener._rddInfoMap(0).diskSize === 400L)
+ assert(storageListener._rddInfoMap(0).tachyonSize === 200L)
+ assert(storageListener._rddInfoMap(0).numCachedPartitions === 2)
+ assert(storageListener._rddInfoMap(0).isCached)
+ assert(!storageListener._rddInfoMap(1).isCached)
+ assert(storageListener._rddInfoMap(2).numCachedPartitions === 0)
+ assert(!storageListener._rddInfoMap(2).isCached)
+ assert(storageListener._rddInfoMap(2).numCachedPartitions === 0)
+ }
+
+}