aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-07-03 22:48:23 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-07-03 22:48:23 -0700
commit3894a49be9b532cc026d908a0f49bca850504498 (patch)
tree2e0482f07c96092fc2b385d39b39b5270c865496 /core
parent586feb5c9528042420f678f78bacb6c254a5eaf8 (diff)
downloadspark-3894a49be9b532cc026d908a0f49bca850504498.tar.gz
spark-3894a49be9b532cc026d908a0f49bca850504498.tar.bz2
spark-3894a49be9b532cc026d908a0f49bca850504498.zip
[SPARK-2307][Reprise] Correctly report RDD blocks on SparkUI
**Problem.** The existing code in `ExecutorPage.scala` requires a linear scan through all the blocks to filter out the uncached ones. Every refresh could be expensive if there are many blocks and many executors. **Solution.** The proper semantics should be the following: `StorageStatusListener` should contain only block statuses that are cached. This means as soon as a block is unpersisted by any mean, its status should be removed. This is reflected in the changes made in `StorageStatusListener.scala`. Further, the `StorageTab` must stop relying on the `StorageStatusListener` changing a dropped block's status to `StorageLevel.NONE` (which no longer happens). This is reflected in the changes made in `StorageTab.scala` and `StorageUtils.scala`. ---------- If you have been following this chain of PRs like pwendell, you will quickly notice that this reverts the changes in #1249, which reverts the changes in #1080. In other words, we are adding back the changes from #1080, and fixing SPARK-2307 on top of those changes. Please ask questions if you are confused. Author: Andrew Or <andrewor14@gmail.com> Closes #1255 from andrewor14/storage-ui-fix-reprise and squashes the following commits: 45416fa [Andrew Or] Merge branch 'master' of github.com:apache/spark into storage-ui-fix-reprise a82ea25 [Andrew Or] Add tests for StorageStatusListener 8773b01 [Andrew Or] Update comment / minor changes 3afde3f [Andrew Or] Correctly report the number of blocks on SparkUI
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageUtils.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala152
6 files changed, 184 insertions, 23 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
index a6e6627d54..41c960c867 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
@@ -28,26 +28,31 @@ import org.apache.spark.scheduler._
*/
@DeveloperApi
class StorageStatusListener extends SparkListener {
- private val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()
+ // This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE)
+ private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()
def storageStatusList = executorIdToStorageStatus.values.toSeq
/** Update storage status list to reflect updated block statuses */
- def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
- val filteredStatus = storageStatusList.find(_.blockManagerId.executorId == execId)
+ private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
+ val filteredStatus = executorIdToStorageStatus.get(execId)
filteredStatus.foreach { storageStatus =>
updatedBlocks.foreach { case (blockId, updatedStatus) =>
- storageStatus.blocks(blockId) = updatedStatus
+ if (updatedStatus.storageLevel == StorageLevel.NONE) {
+ storageStatus.blocks.remove(blockId)
+ } else {
+ storageStatus.blocks(blockId) = updatedStatus
+ }
}
}
}
/** Update storage status list to reflect the removal of an RDD from the cache */
- def updateStorageStatus(unpersistedRDDId: Int) {
+ private def updateStorageStatus(unpersistedRDDId: Int) {
storageStatusList.foreach { storageStatus =>
val unpersistedBlocksIds = storageStatus.rddBlocks.keys.filter(_.rddId == unpersistedRDDId)
unpersistedBlocksIds.foreach { blockId =>
- storageStatus.blocks(blockId) = BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
+ storageStatus.blocks.remove(blockId)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index f3bde1df45..177281f663 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -75,17 +75,26 @@ private[spark] object StorageUtils {
/** Returns storage information of all RDDs in the given list. */
def rddInfoFromStorageStatus(
storageStatuses: Seq[StorageStatus],
- rddInfos: Seq[RDDInfo]): Array[RDDInfo] = {
+ rddInfos: Seq[RDDInfo],
+ updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty): Array[RDDInfo] = {
+
+ // Mapping from a block ID -> its status
+ val blockMap = mutable.Map(storageStatuses.flatMap(_.rddBlocks): _*)
+
+ // Record updated blocks, if any
+ updatedBlocks
+ .collect { case (id: RDDBlockId, status) => (id, status) }
+ .foreach { case (id, status) => blockMap(id) = status }
// Mapping from RDD ID -> an array of associated BlockStatuses
- val blockStatusMap = storageStatuses.flatMap(_.rddBlocks).toMap
+ val rddBlockMap = blockMap
.groupBy { case (k, _) => k.rddId }
.mapValues(_.values.toArray)
// Mapping from RDD ID -> the associated RDDInfo (with potentially outdated storage information)
val rddInfoMap = rddInfos.map { info => (info.id, info) }.toMap
- val rddStorageInfos = blockStatusMap.flatMap { case (rddId, blocks) =>
+ val rddStorageInfos = rddBlockMap.flatMap { case (rddId, blocks) =>
// Add up memory, disk and Tachyon sizes
val persistedBlocks =
blocks.filter { status => status.memSize + status.diskSize + status.tachyonSize > 0 }
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index 9625337ae2..95b4a4e91d 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -110,9 +110,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
val status = listener.storageStatusList(statusId)
val execId = status.blockManagerId.executorId
val hostPort = status.blockManagerId.hostPort
- val rddBlocks = status.blocks.count { case (_, blockStatus) =>
- blockStatus.storageLevel != StorageLevel.NONE
- }
+ val rddBlocks = status.blocks.size
val memUsed = status.memUsed
val maxMem = status.maxMem
val diskUsed = status.diskUsed
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 58eeb86bf9..5c2d1d1fe7 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -39,9 +39,7 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends WebUITab(parent, "execut
* A SparkListener that prepares information to be displayed on the ExecutorsTab
*/
@DeveloperApi
-class ExecutorsListener(storageStatusListener: StorageStatusListener)
- extends SparkListener {
-
+class ExecutorsListener(storageStatusListener: StorageStatusListener) extends SparkListener {
val executorToTasksActive = HashMap[String, Int]()
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
index c4bb7aab50..0cc0cf3117 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.ui._
import org.apache.spark.scheduler._
-import org.apache.spark.storage.{RDDInfo, StorageStatusListener, StorageUtils}
+import org.apache.spark.storage._
/** Web UI showing storage status of all RDD's in the given SparkContext. */
private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage") {
@@ -40,9 +40,7 @@ private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage"
* A SparkListener that prepares information to be displayed on the BlockManagerUI.
*/
@DeveloperApi
-class StorageListener(storageStatusListener: StorageStatusListener)
- extends SparkListener {
-
+class StorageListener(storageStatusListener: StorageStatusListener) extends SparkListener {
private val _rddInfoMap = mutable.Map[Int, RDDInfo]()
def storageStatusList = storageStatusListener.storageStatusList
@@ -51,9 +49,10 @@ class StorageListener(storageStatusListener: StorageStatusListener)
def rddInfoList = _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq
/** Update each RDD's info to reflect any updates to the RDD's storage status */
- private def updateRDDInfo() {
+ private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty) {
val rddInfos = _rddInfoMap.values.toSeq
- val updatedRddInfos = StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos)
+ val updatedRddInfos =
+ StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos, updatedBlocks)
updatedRddInfos.foreach { info => _rddInfoMap(info.id) = info }
}
@@ -64,7 +63,7 @@ class StorageListener(storageStatusListener: StorageStatusListener)
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
val metrics = taskEnd.taskMetrics
if (metrics != null && metrics.updatedBlocks.isDefined) {
- updateRDDInfo()
+ updateRDDInfo(metrics.updatedBlocks.get)
}
}
@@ -79,6 +78,6 @@ class StorageListener(storageStatusListener: StorageStatusListener)
}
override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) = synchronized {
- updateRDDInfo()
+ _rddInfoMap.remove(unpersistRDD.rddId)
}
}
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
new file mode 100644
index 0000000000..2179c6dd33
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import org.scalatest.FunSuite
+import org.apache.spark.Success
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler._
+
+/**
+ * Test the behavior of StorageStatusListener in response to all relevant events.
+ */
+class StorageStatusListenerSuite extends FunSuite {
+ private val bm1 = BlockManagerId("big", "dog", 1, 1)
+ private val bm2 = BlockManagerId("fat", "duck", 2, 2)
+ private val taskInfo1 = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false)
+ private val taskInfo2 = new TaskInfo(0, 0, 0, 0, "fat", "duck", TaskLocality.ANY, false)
+
+ test("block manager added/removed") {
+ val listener = new StorageStatusListener
+
+ // Block manager add
+ assert(listener.executorIdToStorageStatus.size === 0)
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
+ assert(listener.executorIdToStorageStatus.size === 1)
+ assert(listener.executorIdToStorageStatus.get("big").isDefined)
+ assert(listener.executorIdToStorageStatus("big").blockManagerId === bm1)
+ assert(listener.executorIdToStorageStatus("big").maxMem === 1000L)
+ assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
+ assert(listener.executorIdToStorageStatus.size === 2)
+ assert(listener.executorIdToStorageStatus.get("fat").isDefined)
+ assert(listener.executorIdToStorageStatus("fat").blockManagerId === bm2)
+ assert(listener.executorIdToStorageStatus("fat").maxMem === 2000L)
+ assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+
+ // Block manager remove
+ listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm1))
+ assert(listener.executorIdToStorageStatus.size === 1)
+ assert(!listener.executorIdToStorageStatus.get("big").isDefined)
+ assert(listener.executorIdToStorageStatus.get("fat").isDefined)
+ listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm2))
+ assert(listener.executorIdToStorageStatus.size === 0)
+ assert(!listener.executorIdToStorageStatus.get("big").isDefined)
+ assert(!listener.executorIdToStorageStatus.get("fat").isDefined)
+ }
+
+ test("task end without updated blocks") {
+ val listener = new StorageStatusListener
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
+ val taskMetrics = new TaskMetrics
+
+ // Task end with no updated blocks
+ assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
+ assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+ listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics))
+ assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
+ assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+ listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics))
+ assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
+ assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+ }
+
+ test("task end with updated blocks") {
+ val listener = new StorageStatusListener
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
+ val taskMetrics1 = new TaskMetrics
+ val taskMetrics2 = new TaskMetrics
+ val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L))
+ val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L, 0L))
+ val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L, 0L))
+ taskMetrics1.updatedBlocks = Some(Seq(block1, block2))
+ taskMetrics2.updatedBlocks = Some(Seq(block3))
+
+ // Task end with new blocks
+ assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
+ assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+ listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
+ assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
+ assert(listener.executorIdToStorageStatus("fat").blocks.size === 0)
+ assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
+ assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
+ assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+ listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2))
+ assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
+ assert(listener.executorIdToStorageStatus("fat").blocks.size === 1)
+ assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
+ assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
+ assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0)))
+
+ // Task end with dropped blocks
+ val droppedBlock1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
+ val droppedBlock2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
+ val droppedBlock3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
+ taskMetrics1.updatedBlocks = Some(Seq(droppedBlock1, droppedBlock3))
+ taskMetrics2.updatedBlocks = Some(Seq(droppedBlock2, droppedBlock3))
+ listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
+ assert(listener.executorIdToStorageStatus("big").blocks.size === 1)
+ assert(listener.executorIdToStorageStatus("fat").blocks.size === 1)
+ assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
+ assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
+ assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0)))
+ listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2))
+ assert(listener.executorIdToStorageStatus("big").blocks.size === 1)
+ assert(listener.executorIdToStorageStatus("fat").blocks.size === 0)
+ assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
+ assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
+ assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+ }
+
+ test("unpersist RDD") {
+ val listener = new StorageStatusListener
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
+ val taskMetrics1 = new TaskMetrics
+ val taskMetrics2 = new TaskMetrics
+ val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L))
+ val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L, 0L))
+ val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L, 0L))
+ taskMetrics1.updatedBlocks = Some(Seq(block1, block2))
+ taskMetrics2.updatedBlocks = Some(Seq(block3))
+ listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
+ listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics2))
+ assert(listener.executorIdToStorageStatus("big").blocks.size === 3)
+
+ // Unpersist RDD
+ listener.onUnpersistRDD(SparkListenerUnpersistRDD(9090))
+ assert(listener.executorIdToStorageStatus("big").blocks.size === 3)
+ listener.onUnpersistRDD(SparkListenerUnpersistRDD(4))
+ assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
+ assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
+ assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
+ listener.onUnpersistRDD(SparkListenerUnpersistRDD(1))
+ assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
+ }
+}