aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
blob: 40bee325b2abfd4282c1770e97f561591058bbeb (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package spark.ui.exec


import javax.servlet.http.HttpServletRequest

import org.eclipse.jetty.server.Handler

import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.Properties

import spark.{ExceptionFailure, Logging, SparkContext, Success, Utils}
import spark.executor.TaskMetrics
import spark.scheduler.cluster.TaskInfo
import spark.scheduler._
import spark.SparkContext
import spark.storage.{StorageStatus, StorageUtils}
import spark.ui.JettyUtils._
import spark.ui.Page.Executors
import spark.ui.UIUtils.headerSparkPage
import spark.ui.UIUtils
import spark.Utils

import scala.xml.{Node, XML}

private[spark] class ExecutorsUI(val sc: SparkContext) {

  private var _listener: Option[ExecutorsListener] = None
  def listener = _listener.get

  def start() {
    _listener = Some(new ExecutorsListener)
    sc.addSparkListener(listener)
  }

  def getHandlers = Seq[(String, Handler)](
    ("/executors", (request: HttpServletRequest) => render(request))
  )

  def render(request: HttpServletRequest): Seq[Node] = {
    val storageStatusList = sc.getExecutorStorageStatus

    val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
    val memUsed = storageStatusList.map(_.memUsed()).reduce(_+_)
    val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
      .reduceOption(_+_).getOrElse(0L)

    val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used",
      "Tasks: Complete/Total")
    def execRow(kv: Seq[String]) =
      <tr>
        <td>{kv(0)}</td>
        <td>{kv(1)}</td>
        <td>{kv(2)}</td>
        <td>{kv(3)}</td>
        <td>{kv(4)}</td>
        <td>{kv(5)}</td>
      </tr>
    val execInfo =
      for (b <- 0 until storageStatusList.size)
        yield getExecInfo(b)
    val execTable = UIUtils.listingTable(execHead, execRow, execInfo)

    val content =
      <div class="row">
        <div class="span12">
          <ul class="unstyled">
            <li><strong>Memory:</strong>
              {Utils.memoryBytesToString(memUsed)} Used
              ({Utils.memoryBytesToString(maxMem)} Total) </li>
            <li><strong>Disk:</strong> {Utils.memoryBytesToString(diskSpaceUsed)} Used </li>
          </ul>
        </div>
      </div>
      <div class = "row">
        <div class="span12">
          {execTable}
        </div>
      </div>;

    headerSparkPage(content, sc, "Executors", Executors)
  }

  def getExecInfo(a: Int): Seq[String] = {
    val execId = sc.getExecutorStorageStatus(a).blockManagerId.executorId
    val hostPort = sc.getExecutorStorageStatus(a).blockManagerId.hostPort
    val memUsed = Utils.memoryBytesToString(sc.getExecutorStorageStatus(a).memUsed())
    val maxMem = Utils.memoryBytesToString(sc.getExecutorStorageStatus(a).maxMem)
    val diskUsed = Utils.memoryBytesToString(sc.getExecutorStorageStatus(a).diskUsed())
    val rddBlocks = sc.getExecutorStorageStatus(a).blocks.size.toString
    val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0)
    val totalTasks = listener.executorToTaskInfos(a.toString).size
    val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0) match {
      case f if f > 0 => " (%s failed)".format(f)
      case _ => ""
    }

    Seq(
      execId,
      hostPort,
      rddBlocks,
      "%s / %s".format(memUsed, maxMem),
      diskUsed,
      "%s / %s".format(completedTasks, totalTasks) + failedTasks
    )
  }

  private[spark] class ExecutorsListener extends SparkListener with Logging {
    val executorToTasksComplete = HashMap[String, Int]()
    val executorToTasksFailed = HashMap[String, Int]()
    val executorToTaskInfos =
      HashMap[String, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()

    override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
      val eid = taskEnd.taskInfo.executorId
      val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
        taskEnd.reason match {
          case e: ExceptionFailure =>
            executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1
            (Some(e), e.metrics)
          case _ =>
            executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
            (None, Some(taskEnd.taskMetrics))
        }
      val taskList = executorToTaskInfos.getOrElse(
        eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
      taskList += ((taskEnd.taskInfo, metrics, failureInfo))
      executorToTaskInfos(eid) = taskList
    }
  }
}