diff options
author | Andrew Or <andrewor14@gmail.com> | 2014-07-03 22:48:23 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-07-03 22:48:23 -0700 |
commit | 3894a49be9b532cc026d908a0f49bca850504498 (patch) | |
tree | 2e0482f07c96092fc2b385d39b39b5270c865496 | |
parent | 586feb5c9528042420f678f78bacb6c254a5eaf8 (diff) | |
download | spark-3894a49be9b532cc026d908a0f49bca850504498.tar.gz spark-3894a49be9b532cc026d908a0f49bca850504498.tar.bz2 spark-3894a49be9b532cc026d908a0f49bca850504498.zip |
[SPARK-2307][Reprise] Correctly report RDD blocks on SparkUI
**Problem.** The existing code in `ExecutorPage.scala` requires a linear scan through all the blocks to filter out the uncached ones. Every refresh could be expensive if there are many blocks and many executors.
**Solution.** The proper semantics should be the following: `StorageStatusListener` should contain only block statuses that are cached. This means as soon as a block is unpersisted by any mean, its status should be removed. This is reflected in the changes made in `StorageStatusListener.scala`.
Further, the `StorageTab` must stop relying on the `StorageStatusListener` changing a dropped block's status to `StorageLevel.NONE` (which no longer happens). This is reflected in the changes made in `StorageTab.scala` and `StorageUtils.scala`.
----------
If you have been following this chain of PRs like pwendell, you will quickly notice that this reverts the changes in #1249, which reverts the changes in #1080. In other words, we are adding back the changes from #1080, and fixing SPARK-2307 on top of those changes. Please ask questions if you are confused.
Author: Andrew Or <andrewor14@gmail.com>
Closes #1255 from andrewor14/storage-ui-fix-reprise and squashes the following commits:
45416fa [Andrew Or] Merge branch 'master' of github.com:apache/spark into storage-ui-fix-reprise
a82ea25 [Andrew Or] Add tests for StorageStatusListener
8773b01 [Andrew Or] Update comment / minor changes
3afde3f [Andrew Or] Correctly report the number of blocks on SparkUI
6 files changed, 184 insertions, 23 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index a6e6627d54..41c960c867 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -28,26 +28,31 @@ import org.apache.spark.scheduler._ */ @DeveloperApi class StorageStatusListener extends SparkListener { - private val executorIdToStorageStatus = mutable.Map[String, StorageStatus]() + // This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE) + private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]() def storageStatusList = executorIdToStorageStatus.values.toSeq /** Update storage status list to reflect updated block statuses */ - def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) { - val filteredStatus = storageStatusList.find(_.blockManagerId.executorId == execId) + private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) { + val filteredStatus = executorIdToStorageStatus.get(execId) filteredStatus.foreach { storageStatus => updatedBlocks.foreach { case (blockId, updatedStatus) => - storageStatus.blocks(blockId) = updatedStatus + if (updatedStatus.storageLevel == StorageLevel.NONE) { + storageStatus.blocks.remove(blockId) + } else { + storageStatus.blocks(blockId) = updatedStatus + } } } } /** Update storage status list to reflect the removal of an RDD from the cache */ - def updateStorageStatus(unpersistedRDDId: Int) { + private def updateStorageStatus(unpersistedRDDId: Int) { storageStatusList.foreach { storageStatus => val unpersistedBlocksIds = storageStatus.rddBlocks.keys.filter(_.rddId == unpersistedRDDId) unpersistedBlocksIds.foreach { blockId => - storageStatus.blocks(blockId) = BlockStatus(StorageLevel.NONE, 0L, 0L, 0L) + storageStatus.blocks.remove(blockId) } } } diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index f3bde1df45..177281f663 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -75,17 +75,26 @@ private[spark] object StorageUtils { /** Returns storage information of all RDDs in the given list. */ def rddInfoFromStorageStatus( storageStatuses: Seq[StorageStatus], - rddInfos: Seq[RDDInfo]): Array[RDDInfo] = { + rddInfos: Seq[RDDInfo], + updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty): Array[RDDInfo] = { + + // Mapping from a block ID -> its status + val blockMap = mutable.Map(storageStatuses.flatMap(_.rddBlocks): _*) + + // Record updated blocks, if any + updatedBlocks + .collect { case (id: RDDBlockId, status) => (id, status) } + .foreach { case (id, status) => blockMap(id) = status } // Mapping from RDD ID -> an array of associated BlockStatuses - val blockStatusMap = storageStatuses.flatMap(_.rddBlocks).toMap + val rddBlockMap = blockMap .groupBy { case (k, _) => k.rddId } .mapValues(_.values.toArray) // Mapping from RDD ID -> the associated RDDInfo (with potentially outdated storage information) val rddInfoMap = rddInfos.map { info => (info.id, info) }.toMap - val rddStorageInfos = blockStatusMap.flatMap { case (rddId, blocks) => + val rddStorageInfos = rddBlockMap.flatMap { case (rddId, blocks) => // Add up memory, disk and Tachyon sizes val persistedBlocks = blocks.filter { status => status.memSize + status.diskSize + status.tachyonSize > 0 } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 9625337ae2..95b4a4e91d 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -110,9 +110,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { val status = listener.storageStatusList(statusId) val execId = status.blockManagerId.executorId val hostPort = status.blockManagerId.hostPort - val rddBlocks = status.blocks.count { case (_, blockStatus) => - blockStatus.storageLevel != StorageLevel.NONE - } + val rddBlocks = status.blocks.size val memUsed = status.memUsed val maxMem = status.maxMem val diskUsed = status.diskUsed diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 58eeb86bf9..5c2d1d1fe7 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -39,9 +39,7 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends WebUITab(parent, "execut * A SparkListener that prepares information to be displayed on the ExecutorsTab */ @DeveloperApi -class ExecutorsListener(storageStatusListener: StorageStatusListener) - extends SparkListener { - +class ExecutorsListener(storageStatusListener: StorageStatusListener) extends SparkListener { val executorToTasksActive = HashMap[String, Int]() val executorToTasksComplete = HashMap[String, Int]() val executorToTasksFailed = HashMap[String, Int]() diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala index c4bb7aab50..0cc0cf3117 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala @@ -22,7 +22,7 @@ import scala.collection.mutable import org.apache.spark.annotation.DeveloperApi import org.apache.spark.ui._ import org.apache.spark.scheduler._ -import org.apache.spark.storage.{RDDInfo, StorageStatusListener, StorageUtils} +import org.apache.spark.storage._ /** Web UI showing storage status of all RDD's in the given SparkContext. */ private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage") { @@ -40,9 +40,7 @@ private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage" * A SparkListener that prepares information to be displayed on the BlockManagerUI. */ @DeveloperApi -class StorageListener(storageStatusListener: StorageStatusListener) - extends SparkListener { - +class StorageListener(storageStatusListener: StorageStatusListener) extends SparkListener { private val _rddInfoMap = mutable.Map[Int, RDDInfo]() def storageStatusList = storageStatusListener.storageStatusList @@ -51,9 +49,10 @@ class StorageListener(storageStatusListener: StorageStatusListener) def rddInfoList = _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq /** Update each RDD's info to reflect any updates to the RDD's storage status */ - private def updateRDDInfo() { + private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty) { val rddInfos = _rddInfoMap.values.toSeq - val updatedRddInfos = StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos) + val updatedRddInfos = + StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos, updatedBlocks) updatedRddInfos.foreach { info => _rddInfoMap(info.id) = info } } @@ -64,7 +63,7 @@ class StorageListener(storageStatusListener: StorageStatusListener) override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { val metrics = taskEnd.taskMetrics if (metrics != null && metrics.updatedBlocks.isDefined) { - updateRDDInfo() + updateRDDInfo(metrics.updatedBlocks.get) } } @@ -79,6 +78,6 @@ class StorageListener(storageStatusListener: StorageStatusListener) } override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) = synchronized { - updateRDDInfo() + _rddInfoMap.remove(unpersistRDD.rddId) } } diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala new file mode 100644 index 0000000000..2179c6dd33 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import org.scalatest.FunSuite +import org.apache.spark.Success +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.scheduler._ + +/** + * Test the behavior of StorageStatusListener in response to all relevant events. + */ +class StorageStatusListenerSuite extends FunSuite { + private val bm1 = BlockManagerId("big", "dog", 1, 1) + private val bm2 = BlockManagerId("fat", "duck", 2, 2) + private val taskInfo1 = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false) + private val taskInfo2 = new TaskInfo(0, 0, 0, 0, "fat", "duck", TaskLocality.ANY, false) + + test("block manager added/removed") { + val listener = new StorageStatusListener + + // Block manager add + assert(listener.executorIdToStorageStatus.size === 0) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L)) + assert(listener.executorIdToStorageStatus.size === 1) + assert(listener.executorIdToStorageStatus.get("big").isDefined) + assert(listener.executorIdToStorageStatus("big").blockManagerId === bm1) + assert(listener.executorIdToStorageStatus("big").maxMem === 1000L) + assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L)) + assert(listener.executorIdToStorageStatus.size === 2) + assert(listener.executorIdToStorageStatus.get("fat").isDefined) + assert(listener.executorIdToStorageStatus("fat").blockManagerId === bm2) + assert(listener.executorIdToStorageStatus("fat").maxMem === 2000L) + assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + + // Block manager remove + listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm1)) + assert(listener.executorIdToStorageStatus.size === 1) + assert(!listener.executorIdToStorageStatus.get("big").isDefined) + assert(listener.executorIdToStorageStatus.get("fat").isDefined) + listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm2)) + assert(listener.executorIdToStorageStatus.size === 0) + assert(!listener.executorIdToStorageStatus.get("big").isDefined) + assert(!listener.executorIdToStorageStatus.get("fat").isDefined) + } + + test("task end without updated blocks") { + val listener = new StorageStatusListener + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L)) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L)) + val taskMetrics = new TaskMetrics + + // Task end with no updated blocks + assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics)) + assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics)) + assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + } + + test("task end with updated blocks") { + val listener = new StorageStatusListener + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L)) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L)) + val taskMetrics1 = new TaskMetrics + val taskMetrics2 = new TaskMetrics + val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L)) + val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L, 0L)) + val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L, 0L)) + taskMetrics1.updatedBlocks = Some(Seq(block1, block2)) + taskMetrics2.updatedBlocks = Some(Seq(block3)) + + // Task end with new blocks + assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1)) + assert(listener.executorIdToStorageStatus("big").blocks.size === 2) + assert(listener.executorIdToStorageStatus("fat").blocks.size === 0) + assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1))) + assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2))) + assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2)) + assert(listener.executorIdToStorageStatus("big").blocks.size === 2) + assert(listener.executorIdToStorageStatus("fat").blocks.size === 1) + assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1))) + assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2))) + assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0))) + + // Task end with dropped blocks + val droppedBlock1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)) + val droppedBlock2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)) + val droppedBlock3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)) + taskMetrics1.updatedBlocks = Some(Seq(droppedBlock1, droppedBlock3)) + taskMetrics2.updatedBlocks = Some(Seq(droppedBlock2, droppedBlock3)) + listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1)) + assert(listener.executorIdToStorageStatus("big").blocks.size === 1) + assert(listener.executorIdToStorageStatus("fat").blocks.size === 1) + assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1))) + assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2))) + assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0))) + listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2)) + assert(listener.executorIdToStorageStatus("big").blocks.size === 1) + assert(listener.executorIdToStorageStatus("fat").blocks.size === 0) + assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1))) + assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2))) + assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + } + + test("unpersist RDD") { + val listener = new StorageStatusListener + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L)) + val taskMetrics1 = new TaskMetrics + val taskMetrics2 = new TaskMetrics + val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L)) + val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L, 0L)) + val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L, 0L)) + taskMetrics1.updatedBlocks = Some(Seq(block1, block2)) + taskMetrics2.updatedBlocks = Some(Seq(block3)) + listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1)) + listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics2)) + assert(listener.executorIdToStorageStatus("big").blocks.size === 3) + + // Unpersist RDD + listener.onUnpersistRDD(SparkListenerUnpersistRDD(9090)) + assert(listener.executorIdToStorageStatus("big").blocks.size === 3) + listener.onUnpersistRDD(SparkListenerUnpersistRDD(4)) + assert(listener.executorIdToStorageStatus("big").blocks.size === 2) + assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1))) + assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2))) + listener.onUnpersistRDD(SparkListenerUnpersistRDD(1)) + assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) + } +} |