aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala33
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala4
4 files changed, 42 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 293f1438b8..133c3b1b9a 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -114,7 +114,17 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
case Some(stageData: StageUIData) =>
stageData.executorSummary.toSeq.sortBy(_._1).map { case (k, v) =>
<tr>
- <td>{k}</td>
+ <td>
+ <div style="float: left">{k}</div>
+ <div style="float: right">
+ {
+ val logs = parent.executorsListener.executorToLogUrls.getOrElse(k, Map.empty)
+ logs.map {
+ case (logName, logUrl) => <div><a href={logUrl}>{logName}</a></div>
+ }
+ }
+ </div>
+ </td>
<td>{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}</td>
<td sorttable_customkey={v.taskTime.toString}>{UIUtils.formatDuration(v.taskTime)}</td>
<td>{v.failedTasks + v.succeededTasks + v.killedTasks}</td>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index a5e2a20689..ea7acc4734 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -30,6 +30,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo, TaskLocality}
import org.apache.spark.ui._
+import org.apache.spark.ui.exec.ExecutorsListener
import org.apache.spark.ui.jobs.UIData._
import org.apache.spark.util.{Distribution, Utils}
@@ -39,6 +40,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
private val progressListener = parent.progressListener
private val operationGraphListener = parent.operationGraphListener
+ private val executorsListener = parent.executorsListener
private val TIMELINE_LEGEND = {
<div class="legend-area">
@@ -296,7 +298,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
currentTime,
pageSize = taskPageSize,
sortColumn = taskSortColumn,
- desc = taskSortDesc
+ desc = taskSortDesc,
+ executorsListener = executorsListener
)
(_taskTable, _taskTable.table(page))
} catch {
@@ -847,7 +850,8 @@ private[ui] class TaskTableRowData(
val shuffleRead: Option[TaskTableRowShuffleReadData],
val shuffleWrite: Option[TaskTableRowShuffleWriteData],
val bytesSpilled: Option[TaskTableRowBytesSpilledData],
- val error: String)
+ val error: String,
+ val logs: Map[String, String])
private[ui] class TaskDataSource(
tasks: Seq[TaskUIData],
@@ -860,7 +864,8 @@ private[ui] class TaskDataSource(
currentTime: Long,
pageSize: Int,
sortColumn: String,
- desc: Boolean) extends PagedDataSource[TaskTableRowData](pageSize) {
+ desc: Boolean,
+ executorsListener: ExecutorsListener) extends PagedDataSource[TaskTableRowData](pageSize) {
import StagePage._
// Convert TaskUIData to TaskTableRowData which contains the final contents to show in the table
@@ -1004,6 +1009,8 @@ private[ui] class TaskDataSource(
None
}
+ val logs = executorsListener.executorToLogUrls.getOrElse(info.executorId, Map.empty)
+
new TaskTableRowData(
info.index,
info.taskId,
@@ -1027,7 +1034,8 @@ private[ui] class TaskDataSource(
shuffleRead,
shuffleWrite,
bytesSpilled,
- taskData.errorMessage.getOrElse(""))
+ taskData.errorMessage.getOrElse(""),
+ logs)
}
/**
@@ -1229,7 +1237,8 @@ private[ui] class TaskPagedTable(
currentTime: Long,
pageSize: Int,
sortColumn: String,
- desc: Boolean) extends PagedTable[TaskTableRowData] {
+ desc: Boolean,
+ executorsListener: ExecutorsListener) extends PagedTable[TaskTableRowData] {
// We only track peak memory used for unsafe operators
private val displayPeakExecutionMemory = conf.getBoolean("spark.sql.unsafe.enabled", true)
@@ -1256,7 +1265,8 @@ private[ui] class TaskPagedTable(
currentTime,
pageSize,
sortColumn,
- desc)
+ desc,
+ executorsListener)
override def pageLink(page: Int): String = {
val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
@@ -1353,7 +1363,16 @@ private[ui] class TaskPagedTable(
<td>{if (task.speculative) s"${task.attempt} (speculative)" else task.attempt.toString}</td>
<td>{task.status}</td>
<td>{task.taskLocality}</td>
- <td>{task.executorIdAndHost}</td>
+ <td>
+ <div style="float: left">{task.executorIdAndHost}</div>
+ <div style="float: right">
+ {
+ task.logs.map {
+ case (logName, logUrl) => <div><a href={logUrl}>{logName}</a></div>
+ }
+ }
+ </div>
+ </td>
<td>{UIUtils.formatDate(new Date(task.launchTime))}</td>
<td>{task.formatDuration}</td>
<td class={TaskDetailsClassNames.SCHEDULER_DELAY}>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
index bd5f16d25b..573192ac17 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
@@ -29,6 +29,7 @@ private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages"
val killEnabled = parent.killEnabled
val progressListener = parent.jobProgressListener
val operationGraphListener = parent.operationGraphListener
+ val executorsListener = parent.executorsListener
attachPage(new AllStagesPage(this))
attachPage(new StagePage(this))
diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
index 6d726d3d59..d30b987d6c 100644
--- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
@@ -26,6 +26,8 @@ import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
+import org.apache.spark.storage.StorageStatusListener
+import org.apache.spark.ui.exec.ExecutorsListener
import org.apache.spark.ui.jobs.{JobProgressListener, StagePage, StagesTab}
import org.apache.spark.ui.scope.RDDOperationGraphListener
@@ -64,11 +66,13 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
private def renderStagePage(conf: SparkConf): Seq[Node] = {
val jobListener = new JobProgressListener(conf)
val graphListener = new RDDOperationGraphListener(conf)
+ val executorsListener = new ExecutorsListener(new StorageStatusListener(conf), conf)
val tab = mock(classOf[StagesTab], RETURNS_SMART_NULLS)
val request = mock(classOf[HttpServletRequest])
when(tab.conf).thenReturn(conf)
when(tab.progressListener).thenReturn(jobListener)
when(tab.operationGraphListener).thenReturn(graphListener)
+ when(tab.executorsListener).thenReturn(executorsListener)
when(tab.appName).thenReturn("testing")
when(tab.headerTabs).thenReturn(Seq.empty)
when(request.getParameter("id")).thenReturn("0")