aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala21
1 files changed, 10 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
index f552b498a7..3008520f61 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
@@ -66,17 +66,6 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener {
}
}
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
- val info = taskEnd.taskInfo
- val metrics = taskEnd.taskMetrics
- if (info != null && metrics != null) {
- val updatedBlocks = metrics.updatedBlockStatuses
- if (updatedBlocks.length > 0) {
- updateStorageStatus(info.executorId, updatedBlocks)
- }
- }
- }
-
override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = synchronized {
updateStorageStatus(unpersistRDD.rddId)
}
@@ -102,4 +91,14 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener {
}
}
}
+
+ override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
+ val executorId = blockUpdated.blockUpdatedInfo.blockManagerId.executorId
+ val blockId = blockUpdated.blockUpdatedInfo.blockId
+ val storageLevel = blockUpdated.blockUpdatedInfo.storageLevel
+ val memSize = blockUpdated.blockUpdatedInfo.memSize
+ val diskSize = blockUpdated.blockUpdatedInfo.diskSize
+ val blockStatus = BlockStatus(storageLevel, memSize, diskSize)
+ updateStorageStatus(executorId, Seq((blockId, blockStatus)))
+ }
}