aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala21
2 files changed, 20 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
index f552b498a7..3008520f61 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
@@ -66,17 +66,6 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener {
}
}
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
- val info = taskEnd.taskInfo
- val metrics = taskEnd.taskMetrics
- if (info != null && metrics != null) {
- val updatedBlocks = metrics.updatedBlockStatuses
- if (updatedBlocks.length > 0) {
- updateStorageStatus(info.executorId, updatedBlocks)
- }
- }
- }
-
override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = synchronized {
updateStorageStatus(unpersistRDD.rddId)
}
@@ -102,4 +91,14 @@ class StorageStatusListener(conf: SparkConf) extends SparkListener {
}
}
}
+
+ override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
+ val executorId = blockUpdated.blockUpdatedInfo.blockManagerId.executorId
+ val blockId = blockUpdated.blockUpdatedInfo.blockId
+ val storageLevel = blockUpdated.blockUpdatedInfo.storageLevel
+ val memSize = blockUpdated.blockUpdatedInfo.memSize
+ val diskSize = blockUpdated.blockUpdatedInfo.diskSize
+ val blockStatus = BlockStatus(storageLevel, memSize, diskSize)
+ updateStorageStatus(executorId, Seq((blockId, blockStatus)))
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
index 8f75b586e1..50095831b4 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
@@ -57,17 +57,6 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc
StorageUtils.updateRddInfo(rddInfosToUpdate, activeStorageStatusList)
}
- /**
- * Assumes the storage status list is fully up-to-date. This implies the corresponding
- * StorageStatusSparkListener must process the SparkListenerTaskEnd event before this listener.
- */
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
- val metrics = taskEnd.taskMetrics
- if (metrics != null && metrics.updatedBlockStatuses.nonEmpty) {
- updateRDDInfo(metrics.updatedBlockStatuses)
- }
- }
-
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
val rddInfos = stageSubmitted.stageInfo.rddInfos
rddInfos.foreach { info => _rddInfoMap.getOrElseUpdate(info.id, info) }
@@ -84,4 +73,14 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Bloc
override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = synchronized {
_rddInfoMap.remove(unpersistRDD.rddId)
}
+
+ override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
+ super.onBlockUpdated(blockUpdated)
+ val blockId = blockUpdated.blockUpdatedInfo.blockId
+ val storageLevel = blockUpdated.blockUpdatedInfo.storageLevel
+ val memSize = blockUpdated.blockUpdatedInfo.memSize
+ val diskSize = blockUpdated.blockUpdatedInfo.diskSize
+ val blockStatus = BlockStatus(storageLevel, memSize, diskSize)
+ updateRDDInfo(Seq((blockId, blockStatus)))
+ }
}