aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCarson Wang <carson.wang@intel.com>2015-08-04 22:12:30 +0900
committerKousuke Saruta <sarutak@oss.nttdata.co.jp>2015-08-04 22:12:30 +0900
commitcb7fa0aa93dae5a25a8e7e387dbd6b55a5a23fb0 (patch)
tree36643eef49ddfb93475e964d5ed30ef5182454b8
parentb211cbc7369af5eb2cb65d93c4c57c4db7143f47 (diff)
downloadspark-cb7fa0aa93dae5a25a8e7e387dbd6b55a5a23fb0.tar.gz
spark-cb7fa0aa93dae5a25a8e7e387dbd6b55a5a23fb0.tar.bz2
spark-cb7fa0aa93dae5a25a8e7e387dbd6b55a5a23fb0.zip
[SPARK-2016] [WEBUI] RDD partition table pagination for the RDD Page
Add pagination for the RDD page to avoid unresponsive UI when the number of the RDD partitions is large. Before: ![rddpagebefore](https://cloud.githubusercontent.com/assets/9278199/8951533/3d9add54-3601-11e5-99d0-5653b473c49b.png) After: ![rddpageafter](https://cloud.githubusercontent.com/assets/9278199/8951536/439d66e0-3601-11e5-9cee-1b380fe6620d.png) Author: Carson Wang <carson.wang@intel.com> Closes #7692 from carsonwang/SPARK-2016 and squashes the following commits: 03c7168 [Carson Wang] Fix style issues 612c18c [Carson Wang] RDD partition table pagination for the RDD Page
-rw-r--r--core/src/main/scala/org/apache/spark/ui/PagedTable.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala228
3 files changed, 209 insertions, 45 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
index 17d7b39c2d..6e2375477a 100644
--- a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
@@ -159,9 +159,9 @@ private[ui] trait PagedTable[T] {
// "goButtonJsFuncName"
val formJs =
s"""$$(function(){
- | $$( "#form-task-page" ).submit(function(event) {
- | var page = $$("#form-task-page-no").val()
- | var pageSize = $$("#form-task-page-size").val()
+ | $$( "#form-$tableId-page" ).submit(function(event) {
+ | var page = $$("#form-$tableId-page-no").val()
+ | var pageSize = $$("#form-$tableId-page-size").val()
| pageSize = pageSize ? pageSize: 100;
| if (page != "") {
| ${goButtonJsFuncName}(page, pageSize);
@@ -173,12 +173,14 @@ private[ui] trait PagedTable[T] {
<div>
<div>
- <form id="form-task-page" class="form-inline pull-right" style="margin-bottom: 0px;">
+ <form id={s"form-$tableId-page"}
+ class="form-inline pull-right" style="margin-bottom: 0px;">
<label>{totalPages} Pages. Jump to</label>
- <input type="text" id="form-task-page-no" value={page.toString} class="span1" />
+ <input type="text" id={s"form-$tableId-page-no"} value={page.toString} class="span1" />
<label>. Show </label>
- <input type="text" id="form-task-page-size" value={pageSize.toString} class="span1" />
- <label>tasks in a page.</label>
+ <input type="text"
+ id={s"form-$tableId-page-size"} value={pageSize.toString} class="span1" />
+ <label>items in a page.</label>
<button type="submit" class="btn">Go</button>
</form>
</div>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 3954c3d1ef..0c94204df6 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -988,8 +988,7 @@ private[ui] class TaskDataSource(
shuffleRead,
shuffleWrite,
bytesSpilled,
- errorMessage.getOrElse("")
- )
+ errorMessage.getOrElse(""))
}
/**
@@ -1197,7 +1196,7 @@ private[ui] class TaskPagedTable(
private val displayPeakExecutionMemory =
conf.getOption("spark.sql.unsafe.enabled").exists(_.toBoolean)
- override def tableId: String = ""
+ override def tableId: String = "task-table"
override def tableCssClass: String = "table table-bordered table-condensed table-striped"
@@ -1212,8 +1211,7 @@ private[ui] class TaskPagedTable(
currentTime,
pageSize,
sortColumn,
- desc
- )
+ desc)
override def pageLink(page: Int): String = {
val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
@@ -1277,7 +1275,7 @@ private[ui] class TaskPagedTable(
Seq(("Errors", ""))
if (!taskHeadersAndCssClasses.map(_._1).contains(sortColumn)) {
- new IllegalArgumentException(s"Unknown column: $sortColumn")
+ throw new IllegalArgumentException(s"Unknown column: $sortColumn")
}
val headerRow: Seq[Node] = {
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index 36943978ff..fd6cc3ed75 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -17,12 +17,13 @@
package org.apache.spark.ui.storage
+import java.net.URLEncoder
import javax.servlet.http.HttpServletRequest
-import scala.xml.Node
+import scala.xml.{Node, Unparsed}
import org.apache.spark.status.api.v1.{AllRDDResource, RDDDataDistribution, RDDPartitionInfo}
-import org.apache.spark.ui.{UIUtils, WebUIPage}
+import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils, WebUIPage}
import org.apache.spark.util.Utils
/** Page showing storage details for a given RDD */
@@ -32,6 +33,17 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
def render(request: HttpServletRequest): Seq[Node] = {
val parameterId = request.getParameter("id")
require(parameterId != null && parameterId.nonEmpty, "Missing id parameter")
+
+ val parameterBlockPage = request.getParameter("block.page")
+ val parameterBlockSortColumn = request.getParameter("block.sort")
+ val parameterBlockSortDesc = request.getParameter("block.desc")
+ val parameterBlockPageSize = request.getParameter("block.pageSize")
+
+ val blockPage = Option(parameterBlockPage).map(_.toInt).getOrElse(1)
+ val blockSortColumn = Option(parameterBlockSortColumn).getOrElse("Block Name")
+ val blockSortDesc = Option(parameterBlockSortDesc).map(_.toBoolean).getOrElse(false)
+ val blockPageSize = Option(parameterBlockPageSize).map(_.toInt).getOrElse(100)
+
val rddId = parameterId.toInt
val rddStorageInfo = AllRDDResource.getRDDStorageInfo(rddId, listener, includeDetails = true)
.getOrElse {
@@ -44,8 +56,34 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
rddStorageInfo.dataDistribution.get, id = Some("rdd-storage-by-worker-table"))
// Block table
- val blockTable = UIUtils.listingTable(blockHeader, blockRow, rddStorageInfo.partitions.get,
- id = Some("rdd-storage-by-block-table"))
+ val (blockTable, blockTableHTML) = try {
+ val _blockTable = new BlockPagedTable(
+ UIUtils.prependBaseUri(parent.basePath) + s"/storage/rdd/?id=${rddId}",
+ rddStorageInfo.partitions.get,
+ blockPageSize,
+ blockSortColumn,
+ blockSortDesc)
+ (_blockTable, _blockTable.table(blockPage))
+ } catch {
+ case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) =>
+ (null, <div class="alert alert-error">{e.getMessage}</div>)
+ }
+
+ val jsForScrollingDownToBlockTable =
+ <script>
+ {
+ Unparsed {
+ """
+ |$(function() {
+ | if (/.*&block.sort=.*$/.test(location.search)) {
+ | var topOffset = $("#blocks-section").offset().top;
+ | $("html,body").animate({scrollTop: topOffset}, 200);
+ | }
+ |});
+ """.stripMargin
+ }
+ }
+ </script>
val content =
<div class="row-fluid">
@@ -85,11 +123,11 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
</div>
</div>
- <div class="row-fluid">
- <div class="span12">
- <h4> {rddStorageInfo.partitions.map(_.size).getOrElse(0)} Partitions </h4>
- {blockTable}
- </div>
+ <div>
+ <h4 id="blocks-section">
+ {rddStorageInfo.partitions.map(_.size).getOrElse(0)} Partitions
+ </h4>
+ {blockTableHTML ++ jsForScrollingDownToBlockTable}
</div>;
UIUtils.headerSparkPage("RDD Storage Info for " + rddStorageInfo.name, content, parent)
@@ -101,14 +139,6 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
"Memory Usage",
"Disk Usage")
- /** Header fields for the block table */
- private def blockHeader = Seq(
- "Block Name",
- "Storage Level",
- "Size in Memory",
- "Size on Disk",
- "Executors")
-
/** Render an HTML row representing a worker */
private def workerRow(worker: RDDDataDistribution): Seq[Node] = {
<tr>
@@ -120,23 +150,157 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
<td>{Utils.bytesToString(worker.diskUsed)}</td>
</tr>
}
+}
+
+private[ui] case class BlockTableRowData(
+ blockName: String,
+ storageLevel: String,
+ memoryUsed: Long,
+ diskUsed: Long,
+ executors: String)
+
+private[ui] class BlockDataSource(
+ rddPartitions: Seq[RDDPartitionInfo],
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean) extends PagedDataSource[BlockTableRowData](pageSize) {
+
+ private val data = rddPartitions.map(blockRow).sorted(ordering(sortColumn, desc))
+
+ override def dataSize: Int = data.size
+
+ override def sliceData(from: Int, to: Int): Seq[BlockTableRowData] = {
+ data.slice(from, to)
+ }
+
+ private def blockRow(rddPartition: RDDPartitionInfo): BlockTableRowData = {
+ BlockTableRowData(
+ rddPartition.blockName,
+ rddPartition.storageLevel,
+ rddPartition.memoryUsed,
+ rddPartition.diskUsed,
+ rddPartition.executors.mkString(" "))
+ }
+
+ /**
+ * Return Ordering according to sortColumn and desc
+ */
+ private def ordering(sortColumn: String, desc: Boolean): Ordering[BlockTableRowData] = {
+ val ordering = sortColumn match {
+ case "Block Name" => new Ordering[BlockTableRowData] {
+ override def compare(x: BlockTableRowData, y: BlockTableRowData): Int =
+ Ordering.String.compare(x.blockName, y.blockName)
+ }
+ case "Storage Level" => new Ordering[BlockTableRowData] {
+ override def compare(x: BlockTableRowData, y: BlockTableRowData): Int =
+ Ordering.String.compare(x.storageLevel, y.storageLevel)
+ }
+ case "Size in Memory" => new Ordering[BlockTableRowData] {
+ override def compare(x: BlockTableRowData, y: BlockTableRowData): Int =
+ Ordering.Long.compare(x.memoryUsed, y.memoryUsed)
+ }
+ case "Size on Disk" => new Ordering[BlockTableRowData] {
+ override def compare(x: BlockTableRowData, y: BlockTableRowData): Int =
+ Ordering.Long.compare(x.diskUsed, y.diskUsed)
+ }
+ case "Executors" => new Ordering[BlockTableRowData] {
+ override def compare(x: BlockTableRowData, y: BlockTableRowData): Int =
+ Ordering.String.compare(x.executors, y.executors)
+ }
+ case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
+ }
+ if (desc) {
+ ordering.reverse
+ } else {
+ ordering
+ }
+ }
+}
+
+private[ui] class BlockPagedTable(
+ basePath: String,
+ rddPartitions: Seq[RDDPartitionInfo],
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean) extends PagedTable[BlockTableRowData] {
+
+ override def tableId: String = "rdd-storage-by-block-table"
+
+ override def tableCssClass: String = "table table-bordered table-condensed table-striped"
+
+ override val dataSource: BlockDataSource = new BlockDataSource(
+ rddPartitions,
+ pageSize,
+ sortColumn,
+ desc)
+
+ override def pageLink(page: Int): String = {
+ val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
+ s"${basePath}&block.page=$page&block.sort=${encodedSortColumn}&block.desc=${desc}" +
+ s"&block.pageSize=${pageSize}"
+ }
+
+ override def goButtonJavascriptFunction: (String, String) = {
+ val jsFuncName = "goToBlockPage"
+ val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
+ val jsFunc = s"""
+ |currentBlockPageSize = ${pageSize}
+ |function goToBlockPage(page, pageSize) {
+ | // Set page to 1 if the page size changes
+ | page = pageSize == currentBlockPageSize ? page : 1;
+ | var url = "${basePath}&block.sort=${encodedSortColumn}&block.desc=${desc}" +
+ | "&block.page=" + page + "&block.pageSize=" + pageSize;
+ | window.location.href = url;
+ |}
+ """.stripMargin
+ (jsFuncName, jsFunc)
+ }
- /** Render an HTML row representing a block */
- private def blockRow(row: RDDPartitionInfo): Seq[Node] = {
+ override def headers: Seq[Node] = {
+ val blockHeaders = Seq(
+ "Block Name",
+ "Storage Level",
+ "Size in Memory",
+ "Size on Disk",
+ "Executors")
+
+ if (!blockHeaders.contains(sortColumn)) {
+ throw new IllegalArgumentException(s"Unknown column: $sortColumn")
+ }
+
+ val headerRow: Seq[Node] = {
+ blockHeaders.map { header =>
+ if (header == sortColumn) {
+ val headerLink =
+ s"$basePath&block.sort=${URLEncoder.encode(header, "UTF-8")}&block.desc=${!desc}" +
+ s"&block.pageSize=${pageSize}"
+ val js = Unparsed(s"window.location.href='${headerLink}'")
+ val arrow = if (desc) "&#x25BE;" else "&#x25B4;" // UP or DOWN
+ <th onclick={js} style="cursor: pointer;">
+ {header}
+ <span>&nbsp;{Unparsed(arrow)}</span>
+ </th>
+ } else {
+ val headerLink =
+ s"$basePath&block.sort=${URLEncoder.encode(header, "UTF-8")}" +
+ s"&block.pageSize=${pageSize}"
+ val js = Unparsed(s"window.location.href='${headerLink}'")
+ <th onclick={js} style="cursor: pointer;">
+ {header}
+ </th>
+ }
+ }
+ }
+ <thead>{headerRow}</thead>
+ }
+
+ override def row(block: BlockTableRowData): Seq[Node] = {
<tr>
- <td>{row.blockName}</td>
- <td>
- {row.storageLevel}
- </td>
- <td sorttable_customkey={row.memoryUsed.toString}>
- {Utils.bytesToString(row.memoryUsed)}
- </td>
- <td sorttable_customkey={row.diskUsed.toString}>
- {Utils.bytesToString(row.diskUsed)}
- </td>
- <td>
- {row.executors.map(l => <span>{l}<br/></span>)}
- </td>
+ <td>{block.blockName}</td>
+ <td>{block.storageLevel}</td>
+ <td>{Utils.bytesToString(block.memoryUsed)}</td>
+ <td>{Utils.bytesToString(block.diskUsed)}</td>
+ <td>{block.executors}</td>
</tr>
}
}