aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala8
1 files changed, 5 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
index 798658a15b..1b30d4fa93 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
@@ -41,7 +41,7 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener {
}
def deadStorageStatusList: Seq[StorageStatus] = synchronized {
- deadExecutorStorageStatus.toSeq
+ deadExecutorStorageStatus
}
/** Update storage status list to reflect updated block statuses */
@@ -74,8 +74,10 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener {
synchronized {
val blockManagerId = blockManagerAdded.blockManagerId
val executorId = blockManagerId.executorId
- val maxMem = blockManagerAdded.maxMem
- val storageStatus = new StorageStatus(blockManagerId, maxMem)
+ // The onHeap and offHeap memory are always defined for new applications,
+ // but they can be missing if we are replaying old event logs.
+ val storageStatus = new StorageStatus(blockManagerId, blockManagerAdded.maxMem,
+ blockManagerAdded.maxOnHeapMem, blockManagerAdded.maxOffHeapMem)
executorIdToStorageStatus(executorId) = storageStatus
// Try to remove the dead storage status if same executor register the block manager twice.