aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/StorageUtils.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageUtils.scala15
1 files changed, 12 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index f3bde1df45..177281f663 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -75,17 +75,26 @@ private[spark] object StorageUtils {
/** Returns storage information of all RDDs in the given list. */
def rddInfoFromStorageStatus(
storageStatuses: Seq[StorageStatus],
- rddInfos: Seq[RDDInfo]): Array[RDDInfo] = {
+ rddInfos: Seq[RDDInfo],
+ updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty): Array[RDDInfo] = {
+
+ // Mapping from a block ID -> its status
+ val blockMap = mutable.Map(storageStatuses.flatMap(_.rddBlocks): _*)
+
+ // Record updated blocks, if any
+ updatedBlocks
+ .collect { case (id: RDDBlockId, status) => (id, status) }
+ .foreach { case (id, status) => blockMap(id) = status }
// Mapping from RDD ID -> an array of associated BlockStatuses
- val blockStatusMap = storageStatuses.flatMap(_.rddBlocks).toMap
+ val rddBlockMap = blockMap
.groupBy { case (k, _) => k.rddId }
.mapValues(_.values.toArray)
// Mapping from RDD ID -> the associated RDDInfo (with potentially outdated storage information)
val rddInfoMap = rddInfos.map { info => (info.id, info) }.toMap
- val rddStorageInfos = blockStatusMap.flatMap { case (rddId, blocks) =>
+ val rddStorageInfos = rddBlockMap.flatMap { case (rddId, blocks) =>
// Add up memory, disk and Tachyon sizes
val persistedBlocks =
blocks.filter { status => status.memSize + status.diskSize + status.tachyonSize > 0 }