aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTao Lin <nblintao@gmail.com>2016-07-25 17:35:50 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-07-25 17:35:50 -0700
commitdb36e1e75d69d63b76312e85ae3a6c95cebbe65e (patch)
treef984bf6e395fc5d028c25cc0b3b64f10ea40f2c7 /core
parentc979c8bba02bc89cb9ad81b212f085a8a5490a07 (diff)
downloadspark-db36e1e75d69d63b76312e85ae3a6c95cebbe65e.tar.gz
spark-db36e1e75d69d63b76312e85ae3a6c95cebbe65e.tar.bz2
spark-db36e1e75d69d63b76312e85ae3a6c95cebbe65e.zip
[SPARK-15590][WEBUI] Paginate Job Table in Jobs tab
## What changes were proposed in this pull request? This patch adds pagination support for the Job Tables in the Jobs tab. Pagination is provided for all of the three Job Tables (active, completed, and failed). Interactions (jumping, sorting, and setting page size) for paged tables are also included. The diff didn't keep track of some lines based on the original ones. The function `makeRow`of the original `AllJobsPage.scala` is reused. They are separated at the beginning of the function `jobRow` (L427-439) and the function `row`(L594-618) in the new `AllJobsPage.scala`. ## How was this patch tested? Tested manually by using checking the Web UI after completing and failing hundreds of jobs. Generate completed jobs by: ```scala val d = sc.parallelize(Array(1,2,3,4,5)) for(i <- 1 to 255){ var b = d.collect() } ``` Generate failed jobs by calling the following code multiple times: ```scala var b = d.map(_/0).collect() ``` Interactions like jumping, sorting, and setting page size are all tested. This shows the pagination for completed jobs: ![paginate success jobs](https://cloud.githubusercontent.com/assets/5558370/15986498/efa12ef6-303b-11e6-8b1d-c3382aeb9ad0.png) This shows the sorting works in job tables: ![sorting](https://cloud.githubusercontent.com/assets/5558370/15986539/98c8a81a-303c-11e6-86f2-8d2bc7924ee9.png) This shows the pagination for failed jobs and the effect of jumping and setting page size: ![paginate failed jobs](https://cloud.githubusercontent.com/assets/5558370/15986556/d8c1323e-303c-11e6-8e4b-7bdb030ea42b.png) Author: Tao Lin <nblintao@gmail.com> Closes #13620 from nblintao/dev.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala369
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala5
2 files changed, 312 insertions, 62 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
index 035d70601c..e5363ce8ca 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
@@ -17,17 +17,21 @@
package org.apache.spark.ui.jobs
+import java.net.URLEncoder
import java.util.Date
import javax.servlet.http.HttpServletRequest
+import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, ListBuffer}
import scala.xml._
import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.JobExecutionStatus
-import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
-import org.apache.spark.ui.jobs.UIData.{ExecutorUIData, JobUIData}
+import org.apache.spark.scheduler.StageInfo
+import org.apache.spark.ui._
+import org.apache.spark.ui.jobs.UIData.{ExecutorUIData, JobUIData, StageUIData}
+import org.apache.spark.util.Utils
/** Page showing list of all ongoing and recently finished jobs */
private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
@@ -210,64 +214,69 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
</script>
}
- private def jobsTable(jobs: Seq[JobUIData]): Seq[Node] = {
+ private def jobsTable(
+ request: HttpServletRequest,
+ jobTag: String,
+ jobs: Seq[JobUIData]): Seq[Node] = {
+ val allParameters = request.getParameterMap.asScala.toMap
+ val parameterOtherTable = allParameters.filterNot(_._1.startsWith(jobTag))
+ .map(para => para._1 + "=" + para._2(0))
+
val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined)
+ val jobIdTitle = if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id"
- val columns: Seq[Node] = {
- <th>{if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id"}</th>
- <th>Description</th>
- <th>Submitted</th>
- <th>Duration</th>
- <th class="sorttable_nosort">Stages: Succeeded/Total</th>
- <th class="sorttable_nosort">Tasks (for all stages): Succeeded/Total</th>
- }
+ val parameterJobPage = request.getParameter(jobTag + ".page")
+ val parameterJobSortColumn = request.getParameter(jobTag + ".sort")
+ val parameterJobSortDesc = request.getParameter(jobTag + ".desc")
+ val parameterJobPageSize = request.getParameter(jobTag + ".pageSize")
+ val parameterJobPrevPageSize = request.getParameter(jobTag + ".prevPageSize")
- def makeRow(job: JobUIData): Seq[Node] = {
- val (lastStageName, lastStageDescription) = getLastStageNameAndDescription(job)
- val duration: Option[Long] = {
- job.submissionTime.map { start =>
- val end = job.completionTime.getOrElse(System.currentTimeMillis())
- end - start
- }
+ val jobPage = Option(parameterJobPage).map(_.toInt).getOrElse(1)
+ val jobSortColumn = Option(parameterJobSortColumn).map { sortColumn =>
+ UIUtils.decodeURLParameter(sortColumn)
+ }.getOrElse(jobIdTitle)
+ val jobSortDesc = Option(parameterJobSortDesc).map(_.toBoolean).getOrElse(
+ // New jobs should be shown above old jobs by default.
+ if (jobSortColumn == jobIdTitle) true else false
+ )
+ val jobPageSize = Option(parameterJobPageSize).map(_.toInt).getOrElse(100)
+ val jobPrevPageSize = Option(parameterJobPrevPageSize).map(_.toInt).getOrElse(jobPageSize)
+
+ val page: Int = {
+ // If the user has changed to a larger page size, then go to page 1 in order to avoid
+ // IndexOutOfBoundsException.
+ if (jobPageSize <= jobPrevPageSize) {
+ jobPage
+ } else {
+ 1
}
- val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
- val formattedSubmissionTime = job.submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
- val basePathUri = UIUtils.prependBaseUri(parent.basePath)
- val jobDescription =
- UIUtils.makeDescription(lastStageDescription, basePathUri, plainText = false)
-
- val detailUrl = "%s/jobs/job?id=%s".format(basePathUri, job.jobId)
- <tr id={"job-" + job.jobId}>
- <td sorttable_customkey={job.jobId.toString}>
- {job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}
- </td>
- <td>
- {jobDescription}
- <a href={detailUrl} class="name-link">{lastStageName}</a>
- </td>
- <td sorttable_customkey={job.submissionTime.getOrElse(-1).toString}>
- {formattedSubmissionTime}
- </td>
- <td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
- <td class="stage-progress-cell">
- {job.completedStageIndices.size}/{job.stageIds.size - job.numSkippedStages}
- {if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"}
- {if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"}
- </td>
- <td class="progress-cell">
- {UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks,
- failed = job.numFailedTasks, skipped = job.numSkippedTasks, killed = job.numKilledTasks,
- total = job.numTasks - job.numSkippedTasks)}
- </td>
- </tr>
}
+ val currentTime = System.currentTimeMillis()
- <table class="table table-bordered table-striped table-condensed sortable">
- <thead>{columns}</thead>
- <tbody>
- {jobs.map(makeRow)}
- </tbody>
- </table>
+ try {
+ new JobPagedTable(
+ jobs,
+ jobTag,
+ UIUtils.prependBaseUri(parent.basePath),
+ "jobs", // subPath
+ parameterOtherTable,
+ parent.jobProgresslistener.stageIdToInfo,
+ parent.jobProgresslistener.stageIdToData,
+ currentTime,
+ jobIdTitle,
+ pageSize = jobPageSize,
+ sortColumn = jobSortColumn,
+ desc = jobSortDesc
+ ).table(page)
+ } catch {
+ case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) =>
+ <div class="alert alert-error">
+ <p>Error while rendering job table:</p>
+ <pre>
+ {Utils.exceptionString(e)}
+ </pre>
+ </div>
+ }
}
def render(request: HttpServletRequest): Seq[Node] = {
@@ -279,12 +288,9 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
val completedJobs = listener.completedJobs.reverse.toSeq
val failedJobs = listener.failedJobs.reverse.toSeq
- val activeJobsTable =
- jobsTable(activeJobs.sortBy(_.submissionTime.getOrElse(-1L)).reverse)
- val completedJobsTable =
- jobsTable(completedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse)
- val failedJobsTable =
- jobsTable(failedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse)
+ val activeJobsTable = jobsTable(request, "activeJob", activeJobs)
+ val completedJobsTable = jobsTable(request, "completedJob", completedJobs)
+ val failedJobsTable = jobsTable(request, "failedJob", failedJobs)
val shouldShowActiveJobs = activeJobs.nonEmpty
val shouldShowCompletedJobs = completedJobs.nonEmpty
@@ -369,3 +375,246 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
}
}
}
+
+private[ui] class JobTableRowData(
+ val jobData: JobUIData,
+ val lastStageName: String,
+ val lastStageDescription: String,
+ val duration: Long,
+ val formattedDuration: String,
+ val submissionTime: Long,
+ val formattedSubmissionTime: String,
+ val jobDescription: NodeSeq,
+ val detailUrl: String)
+
+private[ui] class JobDataSource(
+ jobs: Seq[JobUIData],
+ stageIdToInfo: HashMap[Int, StageInfo],
+ stageIdToData: HashMap[(Int, Int), StageUIData],
+ basePath: String,
+ currentTime: Long,
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean) extends PagedDataSource[JobTableRowData](pageSize) {
+
+ // Convert JobUIData to JobTableRowData which contains the final contents to show in the table
+ // so that we can avoid creating duplicate contents during sorting the data
+ private val data = jobs.map(jobRow).sorted(ordering(sortColumn, desc))
+
+ private var _slicedJobIds: Set[Int] = null
+
+ override def dataSize: Int = data.size
+
+ override def sliceData(from: Int, to: Int): Seq[JobTableRowData] = {
+ val r = data.slice(from, to)
+ _slicedJobIds = r.map(_.jobData.jobId).toSet
+ r
+ }
+
+ private def getLastStageNameAndDescription(job: JobUIData): (String, String) = {
+ val lastStageInfo = Option(job.stageIds)
+ .filter(_.nonEmpty)
+ .flatMap { ids => stageIdToInfo.get(ids.max)}
+ val lastStageData = lastStageInfo.flatMap { s =>
+ stageIdToData.get((s.stageId, s.attemptId))
+ }
+ val name = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
+ val description = lastStageData.flatMap(_.description).getOrElse("")
+ (name, description)
+ }
+
+ private def jobRow(jobData: JobUIData): JobTableRowData = {
+ val (lastStageName, lastStageDescription) = getLastStageNameAndDescription(jobData)
+ val duration: Option[Long] = {
+ jobData.submissionTime.map { start =>
+ val end = jobData.completionTime.getOrElse(System.currentTimeMillis())
+ end - start
+ }
+ }
+ val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
+ val submissionTime = jobData.submissionTime
+ val formattedSubmissionTime = submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
+ val jobDescription = UIUtils.makeDescription(lastStageDescription, basePath, plainText = false)
+
+ val detailUrl = "%s/jobs/job?id=%s".format(basePath, jobData.jobId)
+
+ new JobTableRowData (
+ jobData,
+ lastStageName,
+ lastStageDescription,
+ duration.getOrElse(-1),
+ formattedDuration,
+ submissionTime.getOrElse(-1),
+ formattedSubmissionTime,
+ jobDescription,
+ detailUrl
+ )
+ }
+
+ /**
+ * Return Ordering according to sortColumn and desc
+ */
+ private def ordering(sortColumn: String, desc: Boolean): Ordering[JobTableRowData] = {
+ val ordering = sortColumn match {
+ case "Job Id" | "Job Id (Job Group)" => new Ordering[JobTableRowData] {
+ override def compare(x: JobTableRowData, y: JobTableRowData): Int =
+ Ordering.Int.compare(x.jobData.jobId, y.jobData.jobId)
+ }
+ case "Description" => new Ordering[JobTableRowData] {
+ override def compare(x: JobTableRowData, y: JobTableRowData): Int =
+ Ordering.String.compare(x.lastStageDescription, y.lastStageDescription)
+ }
+ case "Submitted" => new Ordering[JobTableRowData] {
+ override def compare(x: JobTableRowData, y: JobTableRowData): Int =
+ Ordering.Long.compare(x.submissionTime, y.submissionTime)
+ }
+ case "Duration" => new Ordering[JobTableRowData] {
+ override def compare(x: JobTableRowData, y: JobTableRowData): Int =
+ Ordering.Long.compare(x.duration, y.duration)
+ }
+ case "Stages: Succeeded/Total" | "Tasks (for all stages): Succeeded/Total" =>
+ throw new IllegalArgumentException(s"Unsortable column: $sortColumn")
+ case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
+ }
+ if (desc) {
+ ordering.reverse
+ } else {
+ ordering
+ }
+ }
+
+}
+private[ui] class JobPagedTable(
+ data: Seq[JobUIData],
+ jobTag: String,
+ basePath: String,
+ subPath: String,
+ parameterOtherTable: Iterable[String],
+ stageIdToInfo: HashMap[Int, StageInfo],
+ stageIdToData: HashMap[(Int, Int), StageUIData],
+ currentTime: Long,
+ jobIdTitle: String,
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean
+ ) extends PagedTable[JobTableRowData] {
+ val parameterPath = UIUtils.prependBaseUri(basePath) + s"/$subPath/?" +
+ parameterOtherTable.mkString("&")
+
+ override def tableId: String = jobTag + "-table"
+
+ override def tableCssClass: String =
+ "table table-bordered table-condensed table-striped table-head-clickable"
+
+ override def pageSizeFormField: String = jobTag + ".pageSize"
+
+ override def prevPageSizeFormField: String = jobTag + ".prevPageSize"
+
+ override def pageNumberFormField: String = jobTag + ".page"
+
+ override val dataSource = new JobDataSource(
+ data,
+ stageIdToInfo,
+ stageIdToData,
+ basePath,
+ currentTime,
+ pageSize,
+ sortColumn,
+ desc)
+
+ override def pageLink(page: Int): String = {
+ val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
+ parameterPath +
+ s"&$pageNumberFormField=$page" +
+ s"&$jobTag.sort=$encodedSortColumn" +
+ s"&$jobTag.desc=$desc" +
+ s"&$pageSizeFormField=$pageSize"
+ }
+
+ override def goButtonFormPath: String = {
+ val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
+ s"$parameterPath&$jobTag.sort=$encodedSortColumn&$jobTag.desc=$desc"
+ }
+
+ override def headers: Seq[Node] = {
+ // Information for each header: title, cssClass, and sortable
+ val jobHeadersAndCssClasses: Seq[(String, String, Boolean)] =
+ Seq(
+ (jobIdTitle, "", true),
+ ("Description", "", true), ("Submitted", "", true), ("Duration", "", true),
+ ("Stages: Succeeded/Total", "", false),
+ ("Tasks (for all stages): Succeeded/Total", "", false)
+ )
+
+ if (!jobHeadersAndCssClasses.filter(_._3).map(_._1).contains(sortColumn)) {
+ throw new IllegalArgumentException(s"Unknown column: $sortColumn")
+ }
+
+ val headerRow: Seq[Node] = {
+ jobHeadersAndCssClasses.map { case (header, cssClass, sortable) =>
+ if (header == sortColumn) {
+ val headerLink = Unparsed(
+ parameterPath +
+ s"&$jobTag.sort=${URLEncoder.encode(header, "UTF-8")}" +
+ s"&$jobTag.desc=${!desc}" +
+ s"&$jobTag.pageSize=$pageSize")
+ val arrow = if (desc) "&#x25BE;" else "&#x25B4;" // UP or DOWN
+
+ <th class={cssClass}>
+ <a href={headerLink}>
+ {header}<span>
+ &nbsp;{Unparsed(arrow)}
+ </span>
+ </a>
+ </th>
+ } else {
+ if (sortable) {
+ val headerLink = Unparsed(
+ parameterPath +
+ s"&$jobTag.sort=${URLEncoder.encode(header, "UTF-8")}" +
+ s"&$jobTag.pageSize=$pageSize")
+
+ <th class={cssClass}>
+ <a href={headerLink}>
+ {header}
+ </a>
+ </th>
+ } else {
+ <th class={cssClass}>
+ {header}
+ </th>
+ }
+ }
+ }
+ }
+ <thead>{headerRow}</thead>
+ }
+
+ override def row(jobTableRow: JobTableRowData): Seq[Node] = {
+ val job = jobTableRow.jobData
+
+ <tr id={"job-" + job.jobId}>
+ <td>
+ {job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}
+ </td>
+ <td>
+ {jobTableRow.jobDescription}
+ <a href={jobTableRow.detailUrl} class="name-link">{jobTableRow.lastStageName}</a>
+ </td>
+ <td>
+ {jobTableRow.formattedSubmissionTime}
+ </td>
+ <td>{jobTableRow.formattedDuration}</td>
+ <td class="stage-progress-cell">
+ {job.completedStageIndices.size}/{job.stageIds.size - job.numSkippedStages}
+ {if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"}
+ {if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"}
+ </td>
+ <td class="progress-cell">
+ {UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks,
+ failed = job.numFailedTasks, skipped = job.numSkippedTasks, killed = job.numKilledTasks,
+ total = job.numTasks - job.numSkippedTasks)}
+ </td>
+ </tr>
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index b0a35fe8c3..fd12a21b79 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -218,7 +218,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
eventually(timeout(5 seconds), interval(50 milliseconds)) {
goToUi(sc, "/jobs")
val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq
- tableHeaders should not contain "Job Id (Job Group)"
+ tableHeaders(0) should not startWith "Job Id (Job Group)"
}
// Once at least one job has been run in a job group, then we should display the group name:
sc.setJobGroup("my-job-group", "my-job-group-description")
@@ -226,7 +226,8 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
eventually(timeout(5 seconds), interval(50 milliseconds)) {
goToUi(sc, "/jobs")
val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq
- tableHeaders should contain ("Job Id (Job Group)")
+ // Can suffix up/down arrow in the header
+ tableHeaders(0) should startWith ("Job Id (Job Group)")
}
val jobJson = getJson(sc.ui.get, "jobs")