diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/StorageUtils.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/StorageUtils.scala | 15 |
1 files changed, 12 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index f3bde1df45..177281f663 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -75,17 +75,26 @@ private[spark] object StorageUtils { /** Returns storage information of all RDDs in the given list. */ def rddInfoFromStorageStatus( storageStatuses: Seq[StorageStatus], - rddInfos: Seq[RDDInfo]): Array[RDDInfo] = { + rddInfos: Seq[RDDInfo], + updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty): Array[RDDInfo] = { + + // Mapping from a block ID -> its status + val blockMap = mutable.Map(storageStatuses.flatMap(_.rddBlocks): _*) + + // Record updated blocks, if any + updatedBlocks + .collect { case (id: RDDBlockId, status) => (id, status) } + .foreach { case (id, status) => blockMap(id) = status } // Mapping from RDD ID -> an array of associated BlockStatuses - val blockStatusMap = storageStatuses.flatMap(_.rddBlocks).toMap + val rddBlockMap = blockMap .groupBy { case (k, _) => k.rddId } .mapValues(_.values.toArray) // Mapping from RDD ID -> the associated RDDInfo (with potentially outdated storage information) val rddInfoMap = rddInfos.map { info => (info.id, info) }.toMap - val rddStorageInfos = blockStatusMap.flatMap { case (rddId, blocks) => + val rddStorageInfos = rddBlockMap.flatMap { case (rddId, blocks) => // Add up memory, disk and Tachyon sizes val persistedBlocks = blocks.filter { status => status.memSize + status.diskSize + status.tachyonSize > 0 } |