aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-07-14 13:58:36 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-07-14 13:58:36 -0700
commitfb1d06fc242ec00320f1a3049673fbb03c4a6eb9 (patch)
tree34962b8d5112611b7886a7f5f7cea0ee5a9d022e /core
parent0a4071eab30db1db80f61ed2cb2e7243291183ce (diff)
downloadspark-fb1d06fc242ec00320f1a3049673fbb03c4a6eb9.tar.gz
spark-fb1d06fc242ec00320f1a3049673fbb03c4a6eb9.tar.bz2
spark-fb1d06fc242ec00320f1a3049673fbb03c4a6eb9.zip
[SPARK-4072] [CORE] Display Streaming blocks in Streaming UI
Replace #6634 This PR adds `SparkListenerBlockUpdated` to SparkListener so that it can monitor all block update infos that are sent to `BlockManagerMasaterEndpoint`, and also add new tables in the Storage tab to display the stream block infos. ![screen shot 2015-07-01 at 5 19 46 pm](https://cloud.githubusercontent.com/assets/1000778/8451562/c291a6ec-2016-11e5-890d-0afc174e1f8c.png) Author: zsxwing <zsxwing@gmail.com> Closes #6672 from zsxwing/SPARK-4072-2 and squashes the following commits: df2c1d8 [zsxwing] Use xml query to check the xml elements 54d54af [zsxwing] Add unit tests for StoragePage e29fb53 [zsxwing] Update as per TD's comments ccbee07 [zsxwing] Fix the code style 6dc42b4 [zsxwing] Fix the replication level of blocks 450fad1 [zsxwing] Merge branch 'master' into SPARK-4072-2 1e9ef52 [zsxwing] Don't categorize by Executor ID ca0ab69 [zsxwing] Fix the code style 3de2762 [zsxwing] Make object BlockUpdatedInfo private e95b594 [zsxwing] Add 'Aggregated Stream Block Metrics by Executor' table ba5d0d1 [zsxwing] Refactor the unit test to improve the readability 4bbe341 [zsxwing] Revert JsonProtocol and don't log SparkListenerBlockUpdated b464dd1 [zsxwing] Add onBlockUpdated to EventLoggingListener 5ba014c [zsxwing] Fix the code style 0b1e47b [zsxwing] Add a developer api BlockUpdatedInfo 04838a9 [zsxwing] Fix the code style 2baa161 [zsxwing] Add unit tests 80f6c6d [zsxwing] Address comments 797ee4b [zsxwing] Display Streaming blocks in Streaming UI
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/JavaSparkListener.java22
-rw-r--r--core/src/main/java/org/apache/spark/SparkFirehoseListener.java6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala105
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockUpdatedInfo.scala47
-rw-r--r--core/src/main/scala/org/apache/spark/ui/UIUtils.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala148
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockStatusListenerSuite.scala119
-rw-r--r--core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala230
13 files changed, 684 insertions, 28 deletions
diff --git a/core/src/main/java/org/apache/spark/JavaSparkListener.java b/core/src/main/java/org/apache/spark/JavaSparkListener.java
index 646496f313..fa9acf0a15 100644
--- a/core/src/main/java/org/apache/spark/JavaSparkListener.java
+++ b/core/src/main/java/org/apache/spark/JavaSparkListener.java
@@ -17,23 +17,7 @@
package org.apache.spark;
-import org.apache.spark.scheduler.SparkListener;
-import org.apache.spark.scheduler.SparkListenerApplicationEnd;
-import org.apache.spark.scheduler.SparkListenerApplicationStart;
-import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
-import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
-import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
-import org.apache.spark.scheduler.SparkListenerExecutorAdded;
-import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
-import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
-import org.apache.spark.scheduler.SparkListenerJobEnd;
-import org.apache.spark.scheduler.SparkListenerJobStart;
-import org.apache.spark.scheduler.SparkListenerStageCompleted;
-import org.apache.spark.scheduler.SparkListenerStageSubmitted;
-import org.apache.spark.scheduler.SparkListenerTaskEnd;
-import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
-import org.apache.spark.scheduler.SparkListenerTaskStart;
-import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
+import org.apache.spark.scheduler.*;
/**
* Java clients should extend this class instead of implementing
@@ -94,4 +78,8 @@ public class JavaSparkListener implements SparkListener {
@Override
public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { }
+
+ @Override
+ public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) { }
+
}
diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
index fbc5666959..1214d05ba6 100644
--- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
+++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
@@ -112,4 +112,10 @@ public class SparkFirehoseListener implements SparkListener {
public final void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) {
onEvent(executorRemoved);
}
+
+ @Override
+ public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) {
+ onEvent(blockUpdated);
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 62b05033a9..5a06ef02f5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -200,6 +200,9 @@ private[spark] class EventLoggingListener(
}
// No-op because logging every update would be overkill
+ override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {}
+
+ // No-op because logging every update would be overkill
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { }
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 9620915f49..896f174333 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -26,7 +26,7 @@ import org.apache.spark.{Logging, TaskEndReason}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.cluster.ExecutorInfo
-import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo}
import org.apache.spark.util.{Distribution, Utils}
@DeveloperApi
@@ -98,6 +98,9 @@ case class SparkListenerExecutorAdded(time: Long, executorId: String, executorIn
case class SparkListenerExecutorRemoved(time: Long, executorId: String, reason: String)
extends SparkListenerEvent
+@DeveloperApi
+case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends SparkListenerEvent
+
/**
* Periodic updates from executors.
* @param execId executor id
@@ -215,6 +218,11 @@ trait SparkListener {
* Called when the driver removes an executor.
*/
def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved) { }
+
+ /**
+ * Called when the driver receives a block update info.
+ */
+ def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated) { }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index 61e69ecc08..04afde33f5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -58,6 +58,8 @@ private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkLi
listener.onExecutorAdded(executorAdded)
case executorRemoved: SparkListenerExecutorRemoved =>
listener.onExecutorRemoved(executorRemoved)
+ case blockUpdated: SparkListenerBlockUpdated =>
+ listener.onBlockUpdated(blockUpdated)
case logStart: SparkListenerLogStart => // ignore event log metadata
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 68ed909673..5dc0c537cb 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -60,10 +60,11 @@ class BlockManagerMasterEndpoint(
register(blockManagerId, maxMemSize, slaveEndpoint)
context.reply(true)
- case UpdateBlockInfo(
+ case _updateBlockInfo @ UpdateBlockInfo(
blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize) =>
context.reply(updateBlockInfo(
blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize))
+ listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
case GetLocations(blockId) =>
context.reply(getLocations(blockId))
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala
new file mode 100644
index 0000000000..2789e25b8d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/BlockStatusListener.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import scala.collection.mutable
+
+import org.apache.spark.scheduler._
+
+private[spark] case class BlockUIData(
+ blockId: BlockId,
+ location: String,
+ storageLevel: StorageLevel,
+ memSize: Long,
+ diskSize: Long,
+ externalBlockStoreSize: Long)
+
+/**
+ * The aggregated status of stream blocks in an executor
+ */
+private[spark] case class ExecutorStreamBlockStatus(
+ executorId: String,
+ location: String,
+ blocks: Seq[BlockUIData]) {
+
+ def totalMemSize: Long = blocks.map(_.memSize).sum
+
+ def totalDiskSize: Long = blocks.map(_.diskSize).sum
+
+ def totalExternalBlockStoreSize: Long = blocks.map(_.externalBlockStoreSize).sum
+
+ def numStreamBlocks: Int = blocks.size
+
+}
+
+private[spark] class BlockStatusListener extends SparkListener {
+
+ private val blockManagers =
+ new mutable.HashMap[BlockManagerId, mutable.HashMap[BlockId, BlockUIData]]
+
+ override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
+ val blockId = blockUpdated.blockUpdatedInfo.blockId
+ if (!blockId.isInstanceOf[StreamBlockId]) {
+ // Now we only monitor StreamBlocks
+ return
+ }
+ val blockManagerId = blockUpdated.blockUpdatedInfo.blockManagerId
+ val storageLevel = blockUpdated.blockUpdatedInfo.storageLevel
+ val memSize = blockUpdated.blockUpdatedInfo.memSize
+ val diskSize = blockUpdated.blockUpdatedInfo.diskSize
+ val externalBlockStoreSize = blockUpdated.blockUpdatedInfo.externalBlockStoreSize
+
+ synchronized {
+ // Drop the update info if the block manager is not registered
+ blockManagers.get(blockManagerId).foreach { blocksInBlockManager =>
+ if (storageLevel.isValid) {
+ blocksInBlockManager.put(blockId,
+ BlockUIData(
+ blockId,
+ blockManagerId.hostPort,
+ storageLevel,
+ memSize,
+ diskSize,
+ externalBlockStoreSize)
+ )
+ } else {
+ // If isValid is not true, it means we should drop the block.
+ blocksInBlockManager -= blockId
+ }
+ }
+ }
+ }
+
+ override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = {
+ synchronized {
+ blockManagers.put(blockManagerAdded.blockManagerId, mutable.HashMap())
+ }
+ }
+
+ override def onBlockManagerRemoved(
+ blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = synchronized {
+ blockManagers -= blockManagerRemoved.blockManagerId
+ }
+
+ def allExecutorStreamBlockStatus: Seq[ExecutorStreamBlockStatus] = synchronized {
+ blockManagers.map { case (blockManagerId, blocks) =>
+ ExecutorStreamBlockStatus(
+ blockManagerId.executorId, blockManagerId.hostPort, blocks.values.toSeq)
+ }.toSeq
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockUpdatedInfo.scala b/core/src/main/scala/org/apache/spark/storage/BlockUpdatedInfo.scala
new file mode 100644
index 0000000000..a5790e4454
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/storage/BlockUpdatedInfo.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.storage.BlockManagerMessages.UpdateBlockInfo
+
+/**
+ * :: DeveloperApi ::
+ * Stores information about a block status in a block manager.
+ */
+@DeveloperApi
+case class BlockUpdatedInfo(
+ blockManagerId: BlockManagerId,
+ blockId: BlockId,
+ storageLevel: StorageLevel,
+ memSize: Long,
+ diskSize: Long,
+ externalBlockStoreSize: Long)
+
+private[spark] object BlockUpdatedInfo {
+
+ private[spark] def apply(updateBlockInfo: UpdateBlockInfo): BlockUpdatedInfo = {
+ BlockUpdatedInfo(
+ updateBlockInfo.blockManagerId,
+ updateBlockInfo.blockId,
+ updateBlockInfo.storageLevel,
+ updateBlockInfo.memSize,
+ updateBlockInfo.diskSize,
+ updateBlockInfo.externalBlockStoreSize)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 7898039519..718aea7e1d 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -27,7 +27,7 @@ import org.apache.spark.ui.scope.RDDOperationGraph
/** Utility functions for generating XML pages with spark content. */
private[spark] object UIUtils extends Logging {
- val TABLE_CLASS_NOT_STRIPED = "table table-bordered table-condensed sortable"
+ val TABLE_CLASS_NOT_STRIPED = "table table-bordered table-condensed"
val TABLE_CLASS_STRIPED = TABLE_CLASS_NOT_STRIPED + " table-striped"
// SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
@@ -267,9 +267,17 @@ private[spark] object UIUtils extends Logging {
fixedWidth: Boolean = false,
id: Option[String] = None,
headerClasses: Seq[String] = Seq.empty,
- stripeRowsWithCss: Boolean = true): Seq[Node] = {
+ stripeRowsWithCss: Boolean = true,
+ sortable: Boolean = true): Seq[Node] = {
- val listingTableClass = if (stripeRowsWithCss) TABLE_CLASS_STRIPED else TABLE_CLASS_NOT_STRIPED
+ val listingTableClass = {
+ val _tableClass = if (stripeRowsWithCss) TABLE_CLASS_STRIPED else TABLE_CLASS_NOT_STRIPED
+ if (sortable) {
+ _tableClass + " sortable"
+ } else {
+ _tableClass
+ }
+ }
val colWidth = 100.toDouble / headers.size
val colWidthAttr = if (fixedWidth) colWidth + "%" else ""
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
index 07db783c57..04f584621e 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
@@ -21,7 +21,7 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
-import org.apache.spark.storage.RDDInfo
+import org.apache.spark.storage._
import org.apache.spark.ui.{UIUtils, WebUIPage}
import org.apache.spark.util.Utils
@@ -30,13 +30,25 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
private val listener = parent.listener
def render(request: HttpServletRequest): Seq[Node] = {
- val rdds = listener.rddInfoList
- val content = UIUtils.listingTable(rddHeader, rddRow, rdds, id = Some("storage-by-rdd-table"))
+ val content = rddTable(listener.rddInfoList) ++
+ receiverBlockTables(listener.allExecutorStreamBlockStatus.sortBy(_.executorId))
UIUtils.headerSparkPage("Storage", content, parent)
}
+ private[storage] def rddTable(rdds: Seq[RDDInfo]): Seq[Node] = {
+ if (rdds.isEmpty) {
+ // Don't show the rdd table if there is no RDD persisted.
+ Nil
+ } else {
+ <div>
+ <h4>RDDs</h4>
+ {UIUtils.listingTable(rddHeader, rddRow, rdds, id = Some("storage-by-rdd-table"))}
+ </div>
+ }
+ }
+
/** Header fields for the RDD table */
- private def rddHeader = Seq(
+ private val rddHeader = Seq(
"RDD Name",
"Storage Level",
"Cached Partitions",
@@ -56,7 +68,7 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
</td>
<td>{rdd.storageLevel.description}
</td>
- <td>{rdd.numCachedPartitions}</td>
+ <td>{rdd.numCachedPartitions.toString}</td>
<td>{"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)}</td>
<td sorttable_customkey={rdd.memSize.toString}>{Utils.bytesToString(rdd.memSize)}</td>
<td sorttable_customkey={rdd.externalBlockStoreSize.toString}>{Utils.bytesToString(rdd.externalBlockStoreSize)}</td>
@@ -64,4 +76,130 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
</tr>
// scalastyle:on
}
+
+ private[storage] def receiverBlockTables(statuses: Seq[ExecutorStreamBlockStatus]): Seq[Node] = {
+ if (statuses.map(_.numStreamBlocks).sum == 0) {
+ // Don't show the tables if there is no stream block
+ Nil
+ } else {
+ val blocks = statuses.flatMap(_.blocks).groupBy(_.blockId).toSeq.sortBy(_._1.toString)
+
+ <div>
+ <h4>Receiver Blocks</h4>
+ {executorMetricsTable(statuses)}
+ {streamBlockTable(blocks)}
+ </div>
+ }
+ }
+
+ private def executorMetricsTable(statuses: Seq[ExecutorStreamBlockStatus]): Seq[Node] = {
+ <div>
+ <h5>Aggregated Block Metrics by Executor</h5>
+ {UIUtils.listingTable(executorMetricsTableHeader, executorMetricsTableRow, statuses,
+ id = Some("storage-by-executor-stream-blocks"))}
+ </div>
+ }
+
+ private val executorMetricsTableHeader = Seq(
+ "Executor ID",
+ "Address",
+ "Total Size in Memory",
+ "Total Size in ExternalBlockStore",
+ "Total Size on Disk",
+ "Stream Blocks")
+
+ private def executorMetricsTableRow(status: ExecutorStreamBlockStatus): Seq[Node] = {
+ <tr>
+ <td>
+ {status.executorId}
+ </td>
+ <td>
+ {status.location}
+ </td>
+ <td sorttable_customkey={status.totalMemSize.toString}>
+ {Utils.bytesToString(status.totalMemSize)}
+ </td>
+ <td sorttable_customkey={status.totalExternalBlockStoreSize.toString}>
+ {Utils.bytesToString(status.totalExternalBlockStoreSize)}
+ </td>
+ <td sorttable_customkey={status.totalDiskSize.toString}>
+ {Utils.bytesToString(status.totalDiskSize)}
+ </td>
+ <td>
+ {status.numStreamBlocks.toString}
+ </td>
+ </tr>
+ }
+
+ private def streamBlockTable(blocks: Seq[(BlockId, Seq[BlockUIData])]): Seq[Node] = {
+ if (blocks.isEmpty) {
+ Nil
+ } else {
+ <div>
+ <h5>Blocks</h5>
+ {UIUtils.listingTable(
+ streamBlockTableHeader,
+ streamBlockTableRow,
+ blocks,
+ id = Some("storage-by-block-table"),
+ sortable = false)}
+ </div>
+ }
+ }
+
+ private val streamBlockTableHeader = Seq(
+ "Block ID",
+ "Replication Level",
+ "Location",
+ "Storage Level",
+ "Size")
+
+ /** Render a stream block */
+ private def streamBlockTableRow(block: (BlockId, Seq[BlockUIData])): Seq[Node] = {
+ val replications = block._2
+ assert(replications.size > 0) // This must be true because it's the result of "groupBy"
+ if (replications.size == 1) {
+ streamBlockTableSubrow(block._1, replications.head, replications.size, true)
+ } else {
+ streamBlockTableSubrow(block._1, replications.head, replications.size, true) ++
+ replications.tail.map(streamBlockTableSubrow(block._1, _, replications.size, false)).flatten
+ }
+ }
+
+ private def streamBlockTableSubrow(
+ blockId: BlockId, block: BlockUIData, replication: Int, firstSubrow: Boolean): Seq[Node] = {
+ val (storageLevel, size) = streamBlockStorageLevelDescriptionAndSize(block)
+
+ <tr>
+ {
+ if (firstSubrow) {
+ <td rowspan={replication.toString}>
+ {block.blockId.toString}
+ </td>
+ <td rowspan={replication.toString}>
+ {replication.toString}
+ </td>
+ }
+ }
+ <td>{block.location}</td>
+ <td>{storageLevel}</td>
+ <td>{Utils.bytesToString(size)}</td>
+ </tr>
+ }
+
+ private[storage] def streamBlockStorageLevelDescriptionAndSize(
+ block: BlockUIData): (String, Long) = {
+ if (block.storageLevel.useDisk) {
+ ("Disk", block.diskSize)
+ } else if (block.storageLevel.useMemory && block.storageLevel.deserialized) {
+ ("Memory", block.memSize)
+ } else if (block.storageLevel.useMemory && !block.storageLevel.deserialized) {
+ ("Memory Serialized", block.memSize)
+ } else if (block.storageLevel.useOffHeap) {
+ ("External", block.externalBlockStoreSize)
+ } else {
+ throw new IllegalStateException(s"Invalid Storage Level: ${block.storageLevel}")
+ }
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
index 0351749700..22e2993b3b 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
@@ -39,7 +39,8 @@ private[ui] class StorageTab(parent: SparkUI) extends SparkUITab(parent, "storag
* This class is thread-safe (unlike JobProgressListener)
*/
@DeveloperApi
-class StorageListener(storageStatusListener: StorageStatusListener) extends SparkListener {
+class StorageListener(storageStatusListener: StorageStatusListener) extends BlockStatusListener {
+
private[ui] val _rddInfoMap = mutable.Map[Int, RDDInfo]() // exposed for testing
def storageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockStatusListenerSuite.scala
new file mode 100644
index 0000000000..d7ffde1e78
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/BlockStatusListenerSuite.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.scheduler._
+
+class BlockStatusListenerSuite extends SparkFunSuite {
+
+ test("basic functions") {
+ val blockManagerId = BlockManagerId("0", "localhost", 10000)
+ val listener = new BlockStatusListener()
+
+ // Add a block manager and a new block status
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(0, blockManagerId, 0))
+ listener.onBlockUpdated(SparkListenerBlockUpdated(
+ BlockUpdatedInfo(
+ blockManagerId,
+ StreamBlockId(0, 100),
+ StorageLevel.MEMORY_AND_DISK,
+ memSize = 100,
+ diskSize = 100,
+ externalBlockStoreSize = 0)))
+ // The new block status should be added to the listener
+ val expectedBlock = BlockUIData(
+ StreamBlockId(0, 100),
+ "localhost:10000",
+ StorageLevel.MEMORY_AND_DISK,
+ memSize = 100,
+ diskSize = 100,
+ externalBlockStoreSize = 0
+ )
+ val expectedExecutorStreamBlockStatus = Seq(
+ ExecutorStreamBlockStatus("0", "localhost:10000", Seq(expectedBlock))
+ )
+ assert(listener.allExecutorStreamBlockStatus === expectedExecutorStreamBlockStatus)
+
+ // Add the second block manager
+ val blockManagerId2 = BlockManagerId("1", "localhost", 10001)
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(0, blockManagerId2, 0))
+ // Add a new replication of the same block id from the second manager
+ listener.onBlockUpdated(SparkListenerBlockUpdated(
+ BlockUpdatedInfo(
+ blockManagerId2,
+ StreamBlockId(0, 100),
+ StorageLevel.MEMORY_AND_DISK,
+ memSize = 100,
+ diskSize = 100,
+ externalBlockStoreSize = 0)))
+ val expectedBlock2 = BlockUIData(
+ StreamBlockId(0, 100),
+ "localhost:10001",
+ StorageLevel.MEMORY_AND_DISK,
+ memSize = 100,
+ diskSize = 100,
+ externalBlockStoreSize = 0
+ )
+ // Each block manager should contain one block
+ val expectedExecutorStreamBlockStatus2 = Set(
+ ExecutorStreamBlockStatus("0", "localhost:10000", Seq(expectedBlock)),
+ ExecutorStreamBlockStatus("1", "localhost:10001", Seq(expectedBlock2))
+ )
+ assert(listener.allExecutorStreamBlockStatus.toSet === expectedExecutorStreamBlockStatus2)
+
+ // Remove a replication of the same block
+ listener.onBlockUpdated(SparkListenerBlockUpdated(
+ BlockUpdatedInfo(
+ blockManagerId2,
+ StreamBlockId(0, 100),
+ StorageLevel.NONE, // StorageLevel.NONE means removing it
+ memSize = 0,
+ diskSize = 0,
+ externalBlockStoreSize = 0)))
+ // Only the first block manager contains a block
+ val expectedExecutorStreamBlockStatus3 = Set(
+ ExecutorStreamBlockStatus("0", "localhost:10000", Seq(expectedBlock)),
+ ExecutorStreamBlockStatus("1", "localhost:10001", Seq.empty)
+ )
+ assert(listener.allExecutorStreamBlockStatus.toSet === expectedExecutorStreamBlockStatus3)
+
+ // Remove the second block manager at first but add a new block status
+ // from this removed block manager
+ listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(0, blockManagerId2))
+ listener.onBlockUpdated(SparkListenerBlockUpdated(
+ BlockUpdatedInfo(
+ blockManagerId2,
+ StreamBlockId(0, 100),
+ StorageLevel.MEMORY_AND_DISK,
+ memSize = 100,
+ diskSize = 100,
+ externalBlockStoreSize = 0)))
+ // The second block manager is removed so we should not see the new block
+ val expectedExecutorStreamBlockStatus4 = Seq(
+ ExecutorStreamBlockStatus("0", "localhost:10000", Seq(expectedBlock))
+ )
+ assert(listener.allExecutorStreamBlockStatus === expectedExecutorStreamBlockStatus4)
+
+ // Remove the last block manager
+ listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(0, blockManagerId))
+ // No block manager now so we should dop all block managers
+ assert(listener.allExecutorStreamBlockStatus.isEmpty)
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala
new file mode 100644
index 0000000000..3dab15a9d4
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StoragePageSuite.scala
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.storage
+
+import scala.xml.Utility
+
+import org.mockito.Mockito._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.storage._
+
+class StoragePageSuite extends SparkFunSuite {
+
+ val storageTab = mock(classOf[StorageTab])
+ when(storageTab.basePath).thenReturn("http://localhost:4040")
+ val storagePage = new StoragePage(storageTab)
+
+ test("rddTable") {
+ val rdd1 = new RDDInfo(1,
+ "rdd1",
+ 10,
+ StorageLevel.MEMORY_ONLY,
+ Seq.empty)
+ rdd1.memSize = 100
+ rdd1.numCachedPartitions = 10
+
+ val rdd2 = new RDDInfo(2,
+ "rdd2",
+ 10,
+ StorageLevel.DISK_ONLY,
+ Seq.empty)
+ rdd2.diskSize = 200
+ rdd2.numCachedPartitions = 5
+
+ val rdd3 = new RDDInfo(3,
+ "rdd3",
+ 10,
+ StorageLevel.MEMORY_AND_DISK_SER,
+ Seq.empty)
+ rdd3.memSize = 400
+ rdd3.diskSize = 500
+ rdd3.numCachedPartitions = 10
+
+ val xmlNodes = storagePage.rddTable(Seq(rdd1, rdd2, rdd3))
+
+ val headers = Seq(
+ "RDD Name",
+ "Storage Level",
+ "Cached Partitions",
+ "Fraction Cached",
+ "Size in Memory",
+ "Size in ExternalBlockStore",
+ "Size on Disk")
+ assert((xmlNodes \\ "th").map(_.text) === headers)
+
+ assert((xmlNodes \\ "tr").size === 3)
+ assert(((xmlNodes \\ "tr")(0) \\ "td").map(_.text.trim) ===
+ Seq("rdd1", "Memory Deserialized 1x Replicated", "10", "100%", "100.0 B", "0.0 B", "0.0 B"))
+ // Check the url
+ assert(((xmlNodes \\ "tr")(0) \\ "td" \ "a")(0).attribute("href").map(_.text) ===
+ Some("http://localhost:4040/storage/rdd?id=1"))
+
+ assert(((xmlNodes \\ "tr")(1) \\ "td").map(_.text.trim) ===
+ Seq("rdd2", "Disk Serialized 1x Replicated", "5", "50%", "0.0 B", "0.0 B", "200.0 B"))
+ // Check the url
+ assert(((xmlNodes \\ "tr")(1) \\ "td" \ "a")(0).attribute("href").map(_.text) ===
+ Some("http://localhost:4040/storage/rdd?id=2"))
+
+ assert(((xmlNodes \\ "tr")(2) \\ "td").map(_.text.trim) ===
+ Seq("rdd3", "Disk Memory Serialized 1x Replicated", "10", "100%", "400.0 B", "0.0 B",
+ "500.0 B"))
+ // Check the url
+ assert(((xmlNodes \\ "tr")(2) \\ "td" \ "a")(0).attribute("href").map(_.text) ===
+ Some("http://localhost:4040/storage/rdd?id=3"))
+ }
+
+ test("empty rddTable") {
+ assert(storagePage.rddTable(Seq.empty).isEmpty)
+ }
+
+ test("streamBlockStorageLevelDescriptionAndSize") {
+ val memoryBlock = BlockUIData(StreamBlockId(0, 0),
+ "localhost:1111",
+ StorageLevel.MEMORY_ONLY,
+ memSize = 100,
+ diskSize = 0,
+ externalBlockStoreSize = 0)
+ assert(("Memory", 100) === storagePage.streamBlockStorageLevelDescriptionAndSize(memoryBlock))
+
+ val memorySerializedBlock = BlockUIData(StreamBlockId(0, 0),
+ "localhost:1111",
+ StorageLevel.MEMORY_ONLY_SER,
+ memSize = 100,
+ diskSize = 0,
+ externalBlockStoreSize = 0)
+ assert(("Memory Serialized", 100) ===
+ storagePage.streamBlockStorageLevelDescriptionAndSize(memorySerializedBlock))
+
+ val diskBlock = BlockUIData(StreamBlockId(0, 0),
+ "localhost:1111",
+ StorageLevel.DISK_ONLY,
+ memSize = 0,
+ diskSize = 100,
+ externalBlockStoreSize = 0)
+ assert(("Disk", 100) === storagePage.streamBlockStorageLevelDescriptionAndSize(diskBlock))
+
+ val externalBlock = BlockUIData(StreamBlockId(0, 0),
+ "localhost:1111",
+ StorageLevel.OFF_HEAP,
+ memSize = 0,
+ diskSize = 0,
+ externalBlockStoreSize = 100)
+ assert(("External", 100) ===
+ storagePage.streamBlockStorageLevelDescriptionAndSize(externalBlock))
+ }
+
+ test("receiverBlockTables") {
+ val blocksForExecutor0 = Seq(
+ BlockUIData(StreamBlockId(0, 0),
+ "localhost:10000",
+ StorageLevel.MEMORY_ONLY,
+ memSize = 100,
+ diskSize = 0,
+ externalBlockStoreSize = 0),
+ BlockUIData(StreamBlockId(1, 1),
+ "localhost:10000",
+ StorageLevel.DISK_ONLY,
+ memSize = 0,
+ diskSize = 100,
+ externalBlockStoreSize = 0)
+ )
+ val executor0 = ExecutorStreamBlockStatus("0", "localhost:10000", blocksForExecutor0)
+
+ val blocksForExecutor1 = Seq(
+ BlockUIData(StreamBlockId(0, 0),
+ "localhost:10001",
+ StorageLevel.MEMORY_ONLY,
+ memSize = 100,
+ diskSize = 0,
+ externalBlockStoreSize = 0),
+ BlockUIData(StreamBlockId(2, 2),
+ "localhost:10001",
+ StorageLevel.OFF_HEAP,
+ memSize = 0,
+ diskSize = 0,
+ externalBlockStoreSize = 200),
+ BlockUIData(StreamBlockId(1, 1),
+ "localhost:10001",
+ StorageLevel.MEMORY_ONLY_SER,
+ memSize = 100,
+ diskSize = 0,
+ externalBlockStoreSize = 0)
+ )
+ val executor1 = ExecutorStreamBlockStatus("1", "localhost:10001", blocksForExecutor1)
+ val xmlNodes = storagePage.receiverBlockTables(Seq(executor0, executor1))
+
+ val executorTable = (xmlNodes \\ "table")(0)
+ val executorHeaders = Seq(
+ "Executor ID",
+ "Address",
+ "Total Size in Memory",
+ "Total Size in ExternalBlockStore",
+ "Total Size on Disk",
+ "Stream Blocks")
+ assert((executorTable \\ "th").map(_.text) === executorHeaders)
+
+ assert((executorTable \\ "tr").size === 2)
+ assert(((executorTable \\ "tr")(0) \\ "td").map(_.text.trim) ===
+ Seq("0", "localhost:10000", "100.0 B", "0.0 B", "100.0 B", "2"))
+ assert(((executorTable \\ "tr")(1) \\ "td").map(_.text.trim) ===
+ Seq("1", "localhost:10001", "200.0 B", "200.0 B", "0.0 B", "3"))
+
+ val blockTable = (xmlNodes \\ "table")(1)
+ val blockHeaders = Seq(
+ "Block ID",
+ "Replication Level",
+ "Location",
+ "Storage Level",
+ "Size")
+ assert((blockTable \\ "th").map(_.text) === blockHeaders)
+
+ assert((blockTable \\ "tr").size === 5)
+ assert(((blockTable \\ "tr")(0) \\ "td").map(_.text.trim) ===
+ Seq("input-0-0", "2", "localhost:10000", "Memory", "100.0 B"))
+ // Check "rowspan=2" for the first 2 columns
+ assert(((blockTable \\ "tr")(0) \\ "td")(0).attribute("rowspan").map(_.text) === Some("2"))
+ assert(((blockTable \\ "tr")(0) \\ "td")(1).attribute("rowspan").map(_.text) === Some("2"))
+
+ assert(((blockTable \\ "tr")(1) \\ "td").map(_.text.trim) ===
+ Seq("localhost:10001", "Memory", "100.0 B"))
+
+ assert(((blockTable \\ "tr")(2) \\ "td").map(_.text.trim) ===
+ Seq("input-1-1", "2", "localhost:10000", "Disk", "100.0 B"))
+ // Check "rowspan=2" for the first 2 columns
+ assert(((blockTable \\ "tr")(2) \\ "td")(0).attribute("rowspan").map(_.text) === Some("2"))
+ assert(((blockTable \\ "tr")(2) \\ "td")(1).attribute("rowspan").map(_.text) === Some("2"))
+
+ assert(((blockTable \\ "tr")(3) \\ "td").map(_.text.trim) ===
+ Seq("localhost:10001", "Memory Serialized", "100.0 B"))
+
+ assert(((blockTable \\ "tr")(4) \\ "td").map(_.text.trim) ===
+ Seq("input-2-2", "1", "localhost:10001", "External", "200.0 B"))
+ // Check "rowspan=1" for the first 2 columns
+ assert(((blockTable \\ "tr")(4) \\ "td")(0).attribute("rowspan").map(_.text) === Some("1"))
+ assert(((blockTable \\ "tr")(4) \\ "td")(1).attribute("rowspan").map(_.text) === Some("1"))
+ }
+
+ test("empty receiverBlockTables") {
+ assert(storagePage.receiverBlockTables(Seq.empty).isEmpty)
+
+ val executor0 = ExecutorStreamBlockStatus("0", "localhost:10000", Seq.empty)
+ val executor1 = ExecutorStreamBlockStatus("1", "localhost:10001", Seq.empty)
+ assert(storagePage.receiverBlockTables(Seq(executor0, executor1)).isEmpty)
+ }
+}