aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala132
1 files changed, 132 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
new file mode 100644
index 0000000000..43c1257677
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.storage
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.storage.{StorageStatus, StorageUtils}
+import org.apache.spark.storage.BlockManagerMasterActor.BlockStatus
+import org.apache.spark.ui.UIUtils._
+import org.apache.spark.ui.Page._
+import org.apache.spark.util.Utils
+
+
+/** Page showing storage details for a given RDD */
+private[spark] class RDDPage(parent: BlockManagerUI) {
+ val sc = parent.sc
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val id = request.getParameter("id")
+ val prefix = "rdd_" + id.toString
+ val storageStatusList = sc.getExecutorStorageStatus
+ val filteredStorageStatusList = StorageUtils.
+ filterStorageStatusByPrefix(storageStatusList, prefix)
+ val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
+
+ val workerHeaders = Seq("Host", "Memory Usage", "Disk Usage")
+ val workers = filteredStorageStatusList.map((prefix, _))
+ val workerTable = listingTable(workerHeaders, workerRow, workers)
+
+ val blockHeaders = Seq("Block Name", "Storage Level", "Size in Memory", "Size on Disk",
+ "Executors")
+
+ val blockStatuses = filteredStorageStatusList.flatMap(_.blocks).toArray.sortWith(_._1 < _._1)
+ val blockLocations = StorageUtils.blockLocationsFromStorageStatus(filteredStorageStatusList)
+ val blocks = blockStatuses.map {
+ case(id, status) => (id, status, blockLocations.get(id).getOrElse(Seq("UNKNOWN")))
+ }
+ val blockTable = listingTable(blockHeaders, blockRow, blocks)
+
+ val content =
+ <div class="row-fluid">
+ <div class="span12">
+ <ul class="unstyled">
+ <li>
+ <strong>Storage Level:</strong>
+ {rddInfo.storageLevel.description}
+ </li>
+ <li>
+ <strong>Cached Partitions:</strong>
+ {rddInfo.numCachedPartitions}
+ </li>
+ <li>
+ <strong>Total Partitions:</strong>
+ {rddInfo.numPartitions}
+ </li>
+ <li>
+ <strong>Memory Size:</strong>
+ {Utils.bytesToString(rddInfo.memSize)}
+ </li>
+ <li>
+ <strong>Disk Size:</strong>
+ {Utils.bytesToString(rddInfo.diskSize)}
+ </li>
+ </ul>
+ </div>
+ </div>
+
+ <div class="row-fluid">
+ <div class="span12">
+ <h4> Data Distribution on {workers.size} Executors </h4>
+ {workerTable}
+ </div>
+ </div>
+
+ <div class="row-fluid">
+ <div class="span12">
+ <h4> {blocks.size} Partitions </h4>
+ {blockTable}
+ </div>
+ </div>;
+
+ headerSparkPage(content, parent.sc, "RDD Storage Info for " + rddInfo.name, Storage)
+ }
+
+ def blockRow(row: (String, BlockStatus, Seq[String])): Seq[Node] = {
+ val (id, block, locations) = row
+ <tr>
+ <td>{id}</td>
+ <td>
+ {block.storageLevel.description}
+ </td>
+ <td sorttable_customkey={block.memSize.toString}>
+ {Utils.bytesToString(block.memSize)}
+ </td>
+ <td sorttable_customkey={block.diskSize.toString}>
+ {Utils.bytesToString(block.diskSize)}
+ </td>
+ <td>
+ {locations.map(l => <span>{l}<br/></span>)}
+ </td>
+ </tr>
+ }
+
+ def workerRow(worker: (String, StorageStatus)): Seq[Node] = {
+ val (prefix, status) = worker
+ <tr>
+ <td>{status.blockManagerId.host + ":" + status.blockManagerId.port}</td>
+ <td>
+ {Utils.bytesToString(status.memUsed(prefix))}
+ ({Utils.bytesToString(status.memRemaining)} Remaining)
+ </td>
+ <td>{Utils.bytesToString(status.diskUsed(prefix))}</td>
+ </tr>
+ }
+}