diff options
Diffstat (limited to 'core/src')
4 files changed, 42 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 293f1438b8..133c3b1b9a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -114,7 +114,17 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage case Some(stageData: StageUIData) => stageData.executorSummary.toSeq.sortBy(_._1).map { case (k, v) => <tr> - <td>{k}</td> + <td> + <div style="float: left">{k}</div> + <div style="float: right"> + { + val logs = parent.executorsListener.executorToLogUrls.getOrElse(k, Map.empty) + logs.map { + case (logName, logUrl) => <div><a href={logUrl}>{logName}</a></div> + } + } + </div> + </td> <td>{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}</td> <td sorttable_customkey={v.taskTime.toString}>{UIUtils.formatDuration(v.taskTime)}</td> <td>{v.failedTasks + v.succeededTasks + v.killedTasks}</td> diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index a5e2a20689..ea7acc4734 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -30,6 +30,7 @@ import org.apache.spark.SparkConf import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo, TaskLocality} import org.apache.spark.ui._ +import org.apache.spark.ui.exec.ExecutorsListener import org.apache.spark.ui.jobs.UIData._ import org.apache.spark.util.{Distribution, Utils} @@ -39,6 +40,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { private val progressListener = parent.progressListener private val operationGraphListener = parent.operationGraphListener + private val executorsListener = parent.executorsListener private val TIMELINE_LEGEND = { <div class="legend-area"> @@ -296,7 +298,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { currentTime, pageSize = taskPageSize, sortColumn = taskSortColumn, - desc = taskSortDesc + desc = taskSortDesc, + executorsListener = executorsListener ) (_taskTable, _taskTable.table(page)) } catch { @@ -847,7 +850,8 @@ private[ui] class TaskTableRowData( val shuffleRead: Option[TaskTableRowShuffleReadData], val shuffleWrite: Option[TaskTableRowShuffleWriteData], val bytesSpilled: Option[TaskTableRowBytesSpilledData], - val error: String) + val error: String, + val logs: Map[String, String]) private[ui] class TaskDataSource( tasks: Seq[TaskUIData], @@ -860,7 +864,8 @@ private[ui] class TaskDataSource( currentTime: Long, pageSize: Int, sortColumn: String, - desc: Boolean) extends PagedDataSource[TaskTableRowData](pageSize) { + desc: Boolean, + executorsListener: ExecutorsListener) extends PagedDataSource[TaskTableRowData](pageSize) { import StagePage._ // Convert TaskUIData to TaskTableRowData which contains the final contents to show in the table @@ -1004,6 +1009,8 @@ private[ui] class TaskDataSource( None } + val logs = executorsListener.executorToLogUrls.getOrElse(info.executorId, Map.empty) + new TaskTableRowData( info.index, info.taskId, @@ -1027,7 +1034,8 @@ private[ui] class TaskDataSource( shuffleRead, shuffleWrite, bytesSpilled, - taskData.errorMessage.getOrElse("")) + taskData.errorMessage.getOrElse(""), + logs) } /** @@ -1229,7 +1237,8 @@ private[ui] class TaskPagedTable( currentTime: Long, pageSize: Int, sortColumn: String, - desc: Boolean) extends PagedTable[TaskTableRowData] { + desc: Boolean, + executorsListener: ExecutorsListener) extends PagedTable[TaskTableRowData] { // We only track peak memory used for unsafe operators private val displayPeakExecutionMemory = conf.getBoolean("spark.sql.unsafe.enabled", true) @@ -1256,7 +1265,8 @@ private[ui] class TaskPagedTable( currentTime, pageSize, sortColumn, - desc) + desc, + executorsListener) override def pageLink(page: Int): String = { val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") @@ -1353,7 +1363,16 @@ private[ui] class TaskPagedTable( <td>{if (task.speculative) s"${task.attempt} (speculative)" else task.attempt.toString}</td> <td>{task.status}</td> <td>{task.taskLocality}</td> - <td>{task.executorIdAndHost}</td> + <td> + <div style="float: left">{task.executorIdAndHost}</div> + <div style="float: right"> + { + task.logs.map { + case (logName, logUrl) => <div><a href={logUrl}>{logName}</a></div> + } + } + </div> + </td> <td>{UIUtils.formatDate(new Date(task.launchTime))}</td> <td>{task.formatDuration}</td> <td class={TaskDetailsClassNames.SCHEDULER_DELAY}> diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala index bd5f16d25b..573192ac17 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala @@ -29,6 +29,7 @@ private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages" val killEnabled = parent.killEnabled val progressListener = parent.jobProgressListener val operationGraphListener = parent.operationGraphListener + val executorsListener = parent.executorsListener attachPage(new AllStagesPage(this)) attachPage(new StagePage(this)) diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala index 6d726d3d59..d30b987d6c 100644 --- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala @@ -26,6 +26,8 @@ import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS} import org.apache.spark._ import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ +import org.apache.spark.storage.StorageStatusListener +import org.apache.spark.ui.exec.ExecutorsListener import org.apache.spark.ui.jobs.{JobProgressListener, StagePage, StagesTab} import org.apache.spark.ui.scope.RDDOperationGraphListener @@ -64,11 +66,13 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext { private def renderStagePage(conf: SparkConf): Seq[Node] = { val jobListener = new JobProgressListener(conf) val graphListener = new RDDOperationGraphListener(conf) + val executorsListener = new ExecutorsListener(new StorageStatusListener(conf), conf) val tab = mock(classOf[StagesTab], RETURNS_SMART_NULLS) val request = mock(classOf[HttpServletRequest]) when(tab.conf).thenReturn(conf) when(tab.progressListener).thenReturn(jobListener) when(tab.operationGraphListener).thenReturn(graphListener) + when(tab.executorsListener).thenReturn(executorsListener) when(tab.appName).thenReturn("testing") when(tab.headerTabs).thenReturn(Seq.empty) when(request.getParameter("id")).thenReturn("0") |