aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-07-03 22:48:23 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-07-03 22:48:23 -0700
commit3894a49be9b532cc026d908a0f49bca850504498 (patch)
tree2e0482f07c96092fc2b385d39b39b5270c865496 /core/src/test
parent586feb5c9528042420f678f78bacb6c254a5eaf8 (diff)
downloadspark-3894a49be9b532cc026d908a0f49bca850504498.tar.gz
spark-3894a49be9b532cc026d908a0f49bca850504498.tar.bz2
spark-3894a49be9b532cc026d908a0f49bca850504498.zip
[SPARK-2307][Reprise] Correctly report RDD blocks on SparkUI
**Problem.** The existing code in `ExecutorPage.scala` requires a linear scan through all the blocks to filter out the uncached ones. Every refresh could be expensive if there are many blocks and many executors. **Solution.** The proper semantics should be the following: `StorageStatusListener` should contain only block statuses that are cached. This means as soon as a block is unpersisted by any mean, its status should be removed. This is reflected in the changes made in `StorageStatusListener.scala`. Further, the `StorageTab` must stop relying on the `StorageStatusListener` changing a dropped block's status to `StorageLevel.NONE` (which no longer happens). This is reflected in the changes made in `StorageTab.scala` and `StorageUtils.scala`. ---------- If you have been following this chain of PRs like pwendell, you will quickly notice that this reverts the changes in #1249, which reverts the changes in #1080. In other words, we are adding back the changes from #1080, and fixing SPARK-2307 on top of those changes. Please ask questions if you are confused. Author: Andrew Or <andrewor14@gmail.com> Closes #1255 from andrewor14/storage-ui-fix-reprise and squashes the following commits: 45416fa [Andrew Or] Merge branch 'master' of github.com:apache/spark into storage-ui-fix-reprise a82ea25 [Andrew Or] Add tests for StorageStatusListener 8773b01 [Andrew Or] Update comment / minor changes 3afde3f [Andrew Or] Correctly report the number of blocks on SparkUI
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala152
1 files changed, 152 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
new file mode 100644
index 0000000000..2179c6dd33
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import org.scalatest.FunSuite
+import org.apache.spark.Success
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler._
+
+/**
+ * Test the behavior of StorageStatusListener in response to all relevant events.
+ */
+class StorageStatusListenerSuite extends FunSuite {
+ private val bm1 = BlockManagerId("big", "dog", 1, 1)
+ private val bm2 = BlockManagerId("fat", "duck", 2, 2)
+ private val taskInfo1 = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false)
+ private val taskInfo2 = new TaskInfo(0, 0, 0, 0, "fat", "duck", TaskLocality.ANY, false)
+
+ test("block manager added/removed") {
+ val listener = new StorageStatusListener
+
+ // Block manager add
+ assert(listener.executorIdToStorageStatus.size === 0)
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
+ assert(listener.executorIdToStorageStatus.size === 1)
+ assert(listener.executorIdToStorageStatus.get("big").isDefined)
+ assert(listener.executorIdToStorageStatus("big").blockManagerId === bm1)
+ assert(listener.executorIdToStorageStatus("big").maxMem === 1000L)
+ assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
+ assert(listener.executorIdToStorageStatus.size === 2)
+ assert(listener.executorIdToStorageStatus.get("fat").isDefined)
+ assert(listener.executorIdToStorageStatus("fat").blockManagerId === bm2)
+ assert(listener.executorIdToStorageStatus("fat").maxMem === 2000L)
+ assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+
+ // Block manager remove
+ listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm1))
+ assert(listener.executorIdToStorageStatus.size === 1)
+ assert(!listener.executorIdToStorageStatus.get("big").isDefined)
+ assert(listener.executorIdToStorageStatus.get("fat").isDefined)
+ listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm2))
+ assert(listener.executorIdToStorageStatus.size === 0)
+ assert(!listener.executorIdToStorageStatus.get("big").isDefined)
+ assert(!listener.executorIdToStorageStatus.get("fat").isDefined)
+ }
+
+ test("task end without updated blocks") {
+ val listener = new StorageStatusListener
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
+ val taskMetrics = new TaskMetrics
+
+ // Task end with no updated blocks
+ assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
+ assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+ listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics))
+ assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
+ assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+ listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics))
+ assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
+ assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+ }
+
+ test("task end with updated blocks") {
+ val listener = new StorageStatusListener
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
+ val taskMetrics1 = new TaskMetrics
+ val taskMetrics2 = new TaskMetrics
+ val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L))
+ val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L, 0L))
+ val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L, 0L))
+ taskMetrics1.updatedBlocks = Some(Seq(block1, block2))
+ taskMetrics2.updatedBlocks = Some(Seq(block3))
+
+ // Task end with new blocks
+ assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
+ assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+ listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
+ assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
+ assert(listener.executorIdToStorageStatus("fat").blocks.size === 0)
+ assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
+ assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
+ assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+ listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2))
+ assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
+ assert(listener.executorIdToStorageStatus("fat").blocks.size === 1)
+ assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
+ assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
+ assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0)))
+
+ // Task end with dropped blocks
+ val droppedBlock1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
+ val droppedBlock2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
+ val droppedBlock3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
+ taskMetrics1.updatedBlocks = Some(Seq(droppedBlock1, droppedBlock3))
+ taskMetrics2.updatedBlocks = Some(Seq(droppedBlock2, droppedBlock3))
+ listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
+ assert(listener.executorIdToStorageStatus("big").blocks.size === 1)
+ assert(listener.executorIdToStorageStatus("fat").blocks.size === 1)
+ assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
+ assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
+ assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0)))
+ listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2))
+ assert(listener.executorIdToStorageStatus("big").blocks.size === 1)
+ assert(listener.executorIdToStorageStatus("fat").blocks.size === 0)
+ assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
+ assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
+ assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+ }
+
+ test("unpersist RDD") {
+ val listener = new StorageStatusListener
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
+ val taskMetrics1 = new TaskMetrics
+ val taskMetrics2 = new TaskMetrics
+ val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L))
+ val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L, 0L))
+ val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L, 0L))
+ taskMetrics1.updatedBlocks = Some(Seq(block1, block2))
+ taskMetrics2.updatedBlocks = Some(Seq(block3))
+ listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
+ listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics2))
+ assert(listener.executorIdToStorageStatus("big").blocks.size === 3)
+
+ // Unpersist RDD
+ listener.onUnpersistRDD(SparkListenerUnpersistRDD(9090))
+ assert(listener.executorIdToStorageStatus("big").blocks.size === 3)
+ listener.onUnpersistRDD(SparkListenerUnpersistRDD(4))
+ assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
+ assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
+ assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
+ listener.onUnpersistRDD(SparkListenerUnpersistRDD(1))
+ assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
+ }
+}